You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/10/31 08:52:46 UTC

svn commit: r1195365 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/checkpoint/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/checkpoint/

Author: edwardyoon
Date: Mon Oct 31 07:52:46 2011
New Revision: 1195365

URL: http://svn.apache.org/viewvc?rev=1195365&view=rev
Log:
Integrate checkpoint with bsp task

Added:
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 31 07:52:46 2011
@@ -23,6 +23,7 @@ Release 0.4 - Unreleased
 
   IMPROVEMENTS
   
+    HAMA-463: Integrate checkpoint with bsp task (chl501)
     HAMA-457: Refactoring of BSPPeerImpl (tjungblut)
     HAMA-448: Restructure BSP API (Thomas Jungblut via edwardyoon)
     HAMA-441: Logging tasks to distinct files (Thomas Jungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java Mon Oct 31 07:52:46 2011
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-public class BSPMessageSerializer {
-
-  private static final Log LOG = LogFactory.getLog(BSPMessageSerializer.class);
-
-  final Socket client;
-  final ScheduledExecutorService sched;
-  final Configuration conf;
-
-  public BSPMessageSerializer(final Configuration conf, final int port) {
-    this.conf = conf;
-    Socket tmp = null;
-    int cnt = 0;
-    do {
-      tmp = init(port);
-      cnt++;
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ie) {
-        LOG.warn("Thread is interrupted.", ie);
-        Thread.currentThread().interrupt();
-      }
-    } while (null == tmp && 10 > cnt);
-    this.client = tmp;
-    if (null == this.client)
-      throw new NullPointerException("Client socket is null.");
-    this.sched = Executors.newScheduledThreadPool(conf.getInt(
-        "bsp.checkpoint.serializer_thread", 10));
-    LOG.info(BSPMessageSerializer.class.getName()
-        + " is ready to serialize message.");
-  }
-
-  private Socket init(final int port) {
-    Socket tmp = null;
-    try {
-      tmp = new Socket("localhost", port);
-    } catch (UnknownHostException uhe) {
-      LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
-    } catch (IOException ioe) {
-      LOG.warn("Fail to create socket.", ioe);
-    }
-    return tmp;
-  }
-
-  void serialize(final BSPSerializableMessage tmp) throws IOException {
-    if (LOG.isDebugEnabled())
-      LOG.debug("Messages are saved to " + tmp.checkpointedPath());
-    final DataOutput out = new DataOutputStream(client.getOutputStream());
-    this.sched.schedule(new Callable<Object>() {
-      public Object call() throws Exception {
-        tmp.write(out);
-        return null;
-      }
-    }, 0, SECONDS);
-  }
-
-  public void close() {
-    try {
-      this.client.close();
-      this.sched.shutdown();
-    } catch (IOException io) {
-      LOG.error("Fail to close client socket.", io);
-    }
-  }
-
-}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Oct 31 07:52:46 2011
@@ -28,13 +28,15 @@ import java.util.concurrent.ConcurrentLi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.checkpoint.CheckpointRunner;
 import org.apache.hama.ipc.BSPPeerProtocol;
 
 /**
@@ -45,6 +47,7 @@ public class BSPPeerImpl implements BSPP
   public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   private final Configuration conf;
+  private final FileSystem dfs;
   private BSPJob bspJob;
 
   private volatile Server server = null;
@@ -61,8 +64,6 @@ public class BSPPeerImpl implements BSPP
   private TaskAttemptID taskId;
   private BSPPeerProtocol umbilical;
 
-  private final BSPMessageSerializer messageSerializer;
-
   private String[] allPeers;
 
   private SyncClient syncClient;
@@ -71,8 +72,18 @@ public class BSPPeerImpl implements BSPP
    * Protected default constructor for LocalBSPRunner.
    */
   protected BSPPeerImpl() {
-    messageSerializer = null;
     conf = null;
+    dfs = null;
+  }
+
+  /**
+   * For unit test.
+   * @param conf is the configuration file. 
+   * @param dfs is the Hadoop FileSystem. 
+   */
+  protected BSPPeerImpl(final Configuration conf, FileSystem dfs) {
+    this.conf = conf;
+    this.dfs = dfs;
   }
 
   /**
@@ -92,18 +103,17 @@ public class BSPPeerImpl implements BSPP
     this.umbilical = umbilical;
     this.bspJob = job;
 
+    FileSystem fs = null;
+    if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      fs = FileSystem.get(conf);
+    }
+    this.dfs = fs;
+
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
-    BSPMessageSerializer msgSerializer = null;
-    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      msgSerializer = new BSPMessageSerializer(conf,
-          conf.getInt("bsp.checkpoint.port",
-              Integer.valueOf(CheckpointRunner.DEFAULT_PORT)));
-    }
-    this.messageSerializer = msgSerializer;
     initialize();
     syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
         peerAddress.getPort());
@@ -173,6 +183,20 @@ public class BSPPeerImpl implements BSPP
     return ckptPath;
   }
 
+  void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
+    FSDataOutputStream out = null;
+    try {
+      out = this.dfs.create(new Path(checkpointedPath));
+      bundle.write(out);
+    } catch(IOException ioe) {
+      LOG.warn("Fail checkpointing messages to "+checkpointedPath, ioe);
+    } finally { 
+      try { if(null != out) out.close(); } catch(IOException e) {
+        LOG.warn("Fail to close dfs output stream while checkpointing.", e); 
+      } 
+    }
+  }
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.BSPPeerInterface#sync()
@@ -203,10 +227,8 @@ public class BSPPeerImpl implements BSPP
           }
         }
 
-        // checkpointing
-        if (null != this.messageSerializer) {
-          this.messageSerializer.serialize(new BSPSerializableMessage(
-              checkpointedPath(), bundle));
+        if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
+          checkpoint(checkpointedPath(), bundle);
         }
 
         peer.put(bundle);
@@ -255,8 +277,6 @@ public class BSPPeerImpl implements BSPP
     syncClient.close();
     if (server != null)
       server.stop();
-    if (null != messageSerializer)
-      this.messageSerializer.close();
   }
 
   @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java Mon Oct 31 07:52:46 2011
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.io.Writable;
-
-public class BSPSerializableMessage implements Writable {
-  final AtomicReference<String> path = new AtomicReference<String>();
-  final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
-
-  public BSPSerializableMessage() {
-  }
-
-  public BSPSerializableMessage(final String path, final BSPMessageBundle bundle) {
-    if (null == path)
-      throw new NullPointerException("No path provided for checkpointing.");
-    if (null == bundle)
-      throw new NullPointerException("No data provided for checkpointing.");
-    this.path.set(path);
-    this.bundle.set(bundle);
-  }
-
-  public final String checkpointedPath() {
-    return this.path.get();
-  }
-
-  public final BSPMessageBundle messageBundle() {
-    return this.bundle.get();
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    out.writeUTF(this.path.get());
-    this.bundle.get().write(out);
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    this.path.set(in.readUTF());
-    BSPMessageBundle pack = new BSPMessageBundle();
-    pack.readFields(in);
-    this.bundle.set(pack);
-  }
-}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Oct 31 07:52:46 2011
@@ -53,8 +53,6 @@ import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.checkpoint.CheckpointRunner;
-import org.apache.hama.checkpoint.Checkpointer;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.MasterProtocol;
@@ -132,8 +130,6 @@ public class GroomServer implements Runn
   // private BlockingQueue<GroomServerAction> tasksToCleanup = new
   // LinkedBlockingQueue<GroomServerAction>();
 
-  private final CheckpointRunner checkpointRunner;
-
   private class DispatchTasksHandler implements DirectiveHandler {
 
     public void handle(Directive directive) throws DirectiveException {
@@ -233,13 +229,6 @@ public class GroomServer implements Runn
     // FileSystem local = FileSystem.getLocal(conf);
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
 
-    CheckpointRunner ckptRunner = null;
-    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      ckptRunner = new CheckpointRunner(
-          CheckpointRunner.buildCommands(this.conf));
-    }
-    this.checkpointRunner = ckptRunner;
-
     try {
       zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
           conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
@@ -334,10 +323,6 @@ public class GroomServer implements Runn
     this.instructor.bind(DispatchTasksDirective.class,
         new DispatchTasksHandler());
     instructor.start();
-    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)
-        && null != this.checkpointRunner && !this.checkpointRunner.isAlive()) {
-      this.checkpointRunner.start();
-    }
     this.running = true;
     this.initialized = true;
   }
@@ -707,10 +692,6 @@ public class GroomServer implements Runn
     cleanupStorage();
     this.workerServer.stop();
     RPC.stopProxy(masterClient);
-    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)
-        && null != this.checkpointRunner && this.checkpointRunner.isAlive()) {
-      this.checkpointRunner.stop();
-    }
     if (taskReportServer != null) {
       taskReportServer.stop();
       taskReportServer = null;
@@ -876,33 +857,6 @@ public class GroomServer implements Runn
   }
 
   /**
-   * Checkpointer child process.
-   */
-  public static final class CheckpointerChild {
-
-    public static void main(String[] args) throws Throwable {
-      if (LOG.isDebugEnabled())
-        LOG.debug("Starting Checkpointer child process.");
-      HamaConfiguration defaultConf = new HamaConfiguration();
-      // int ret = 0;
-      if (null != args && 1 == args.length) {
-        int port = Integer.parseInt(args[0]);
-        defaultConf.setInt("bsp.checkpoint.port",
-            Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
-        if (LOG.isDebugEnabled())
-          LOG.debug("Supplied checkpointer port value:" + port);
-        Checkpointer ckpt = new Checkpointer(defaultConf);
-        ckpt.start();
-        ckpt.join();
-        LOG.info("Checkpoint finishes its execution.");
-      } else {
-        throw new IllegalArgumentException(
-            "Port value is not provided for checkpointing service.");
-      }
-    }
-  }
-
-  /**
    * The main() for BSPPeer child processes.
    */
   public static class BSPPeerChild {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Oct 31 07:52:46 2011
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.RunJar;
-import org.apache.hama.checkpoint.CheckpointRunner;
 
 /**
  * Base class that runs a task in a separate process.
@@ -114,7 +113,7 @@ public class TaskRunner extends Thread {
               + exit_code + ".");
         }
       } catch (InterruptedException e) {
-        LOG.warn("Thread is interrupted when execeuting Checkpointer process.",
+        LOG.warn("Thread is interrupted when execeuting BSP process.",
             e);
       } catch (IOException ioe) {
         LOG.error("Error when executing BSPPeer process.", ioe);
@@ -204,7 +203,7 @@ public class TaskRunner extends Thread {
     vargs.add(classPath.toString());
     // Add main class and its arguments
     LOG.debug("Executing child Process " + child.getName());
-    vargs.add(child.getName()); // main of bsp or checkpointer Child
+    vargs.add(child.getName()); // bsp class name 
 
     if (GroomServer.BSPPeerChild.class.equals(child)) {
       InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
@@ -213,21 +212,11 @@ public class TaskRunner extends Thread {
       vargs.add(task.getTaskID().toString());
       vargs.add(groomServer.groomHostName);
     }
-
-    if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
-      String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
-          CheckpointRunner.DEFAULT_PORT);
-      LOG.debug("Checkpointer's port:" + ckptPort);
-      vargs.add(ckptPort);
-    }
-
     return vargs;
   }
 
   /**
-   * Build working environment and launch BSPPeer and Checkpointer processes.
-   * And transmit data from BSPPeer's inputstream to Checkpointer's
-   * OutputStream.
+   * Build working environment and launch BSPPeer processes.
    */
   public void run() {
     File workDir = createWorkDirectory();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java Mon Oct 31 07:52:46 2011
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.checkpoint;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.GroomServer.CheckpointerChild;
-
-
-public final class CheckpointRunner implements Callable<Object> {
-
-  public static final Log LOG = LogFactory.getLog(CheckpointRunner.class);
-  public static final String DEFAULT_PORT = "1590";
-  
-  private final List<String> commands;
-  private final ScheduledExecutorService sched;
-  private final AtomicReference<Process> process;
-  private final AtomicBoolean isAlive = new AtomicBoolean(false);
-
-  public CheckpointRunner(List<String> commands) {
-    this.commands = commands;
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Command for executing Checkpoint runner:"+
-      Arrays.toString(this.commands.toArray()));
-    }
-    this.sched = Executors.newScheduledThreadPool(10);
-    this.process = new AtomicReference<Process>();
-  }
-
-  public static final List<String> buildCommands(final Configuration config) {
-    List<String> vargs = new ArrayList<String>();
-    File jvm =
-      new File(new File(System.getProperty("java.home"), "bin"), "java");
-    vargs.add(jvm.toString());
-
-    String javaOpts = config.get("bsp.checkpoint.child.java.opts", "-Xmx50m");
-    String[] javaOptsSplit = javaOpts.split(" ");
-    for (int i = 0; i < javaOptsSplit.length; i++) {
-      vargs.add(javaOptsSplit[i]);
-    }
-    vargs.add("-classpath");
-    vargs.add(System.getProperty("java.class.path"));
-    vargs.add(CheckpointerChild.class.getName());
-    String port = config.get("bsp.checkpoint.port", DEFAULT_PORT);
-    if(LOG.isDebugEnabled())
-      LOG.debug("Checkpointer's port:"+port);
-    vargs.add(port);
-
-    return vargs;
-  }
-
-  public void start() {
-    if(!isAlive.compareAndSet(false, true)) {
-      throw new IllegalStateException(this.getClass().getName()+
-      " is already running.");
-    }
-    this.sched.schedule(this, 0, SECONDS);
-    LOG.info("Start building checkpointer process.");
-  }
-
-  public void stop() {
-    kill();
-    this.sched.shutdown();
-    LOG.info("Stop checkpointer process.");
-  }
-
-  public Process getProcess() {
-    return this.process.get();
-  }
-
-  public void kill() {
-    if (this.process.get() != null) {
-      this.process.get().destroy();
-    }
-    isAlive.set(false);
-  }
-
-  public boolean isAlive() {
-    return isAlive.get();
-  }
-
-  public Object call() throws Exception {
-    ProcessBuilder builder = new ProcessBuilder(commands);
-    try{
-      this.process.set(builder.start());
-      new Thread() {
-        @Override
-        public void run() {
-          logStream(process.get().getErrorStream());
-        }
-      }.start();
-
-      new Thread() {
-        @Override
-        public void run() {
-          logStream(process.get().getInputStream());
-        }
-      }.start();
-
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        @Override
-        public void run() {
-          LOG.info("Destroying checkpointer process.");
-          getProcess().destroy();
-        }
-      });
-
-      int exit_code = this.process.get().waitFor();
-      if (!isAlive() && exit_code != 0) {
-        throw new IOException("Checkpointer process exit with nonzero status of "
-            + exit_code + ".");
-      }
-    } catch(InterruptedException e){
-      LOG.warn("Thread is interrupted when execeuting Checkpointer process.", e);
-    } catch(IOException ioe) {
-      LOG.error("Error when executing Checkpointer process.", ioe);
-    } finally {
-      kill();
-    }
-    return null;
-  }
-
-  private void logStream(InputStream output) {
-    try {
-      BufferedReader in = new BufferedReader(new InputStreamReader(output));
-      String line;
-      while ((line = in.readLine()) != null) {
-        LOG.info(line);
-      }
-    } catch (IOException e) {
-      LOG.warn("Error reading checkpoint process's inputstream.", e);
-    } finally {
-      try {
-        output.close();
-      } catch (IOException e) {
-        LOG.warn("Error closing checkpoint's inputstream.", e);
-      }
-    }
-  }
-}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java Mon Oct 31 07:52:46 2011
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.checkpoint;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.bsp.BSPSerializableMessage;
-
-/**
- * This class is responsible for checkpointing messages to hdfs. 
- */
-public final class Checkpointer implements Callable<Object> {
-
-  public static Log LOG = LogFactory.getLog(Checkpointer.class);
-
-  private final ScheduledExecutorService scheduler = 
-    Executors.newScheduledThreadPool(1);
-  private final FileSystem dfs; 
-  private final AtomicBoolean ckptState = new AtomicBoolean(false);
-  private final BSPMessageDeserializer messageDeserializer;
-  private final AtomicReference<ScheduledFuture<Object>> future =  
-    new AtomicReference<ScheduledFuture<Object>>(); 
-
-  /** 
-   * Reading from socket inputstream as DataInput.
-   */
-  public static final class BSPMessageDeserializer implements Callable<Object> {
-    final BlockingQueue<BSPSerializableMessage> messageQueue = 
-      new LinkedBlockingQueue<BSPSerializableMessage>();
-    final ScheduledExecutorService sched; 
-    final ScheduledExecutorService workers; 
-    final AtomicBoolean serializerState = new AtomicBoolean(false);
-    final ServerSocket server;
-    
-    public BSPMessageDeserializer(final int port) throws IOException { 
-      this.sched = Executors.newScheduledThreadPool(1);
-      this.workers = Executors.newScheduledThreadPool(10);
-      this.server = new ServerSocket(port); 
-      LOG.info("Deserializer's port is opened at "+port);
-    }
-
-    public int port() {
-      return this.server.getLocalPort(); 
-    }
-
-    public void start() {
-      if(!serializerState.compareAndSet(false, true)) {
-        throw new IllegalStateException("BSPMessageDeserializer has been "+
-        "started up.");
-      }
-      this.sched.schedule(this, 0, SECONDS);
-      LOG.info("BSPMessageDeserializer is started.");
-    }
-
-    public void stop() {
-      try {
-        this.server.close();
-      } catch(IOException ioe) {
-        LOG.error("Unable to close message serializer server socket.", ioe);
-      }
-      this.sched.shutdown();
-      this.workers.shutdown();
-      this.serializerState.set(false);
-      LOG.info("BSPMessageDeserializer is stopped.");
-    }
-
-    public boolean state(){
-      return this.serializerState.get();
-    }
-
-    /**
-     * Message is enqueued for communcating data sent from BSPPeer.
-     */
-    public BlockingQueue<BSPSerializableMessage> messageQueue() {
-      return this.messageQueue;
-    }
-
-    public Object call() throws Exception {
-      try {
-        while(state()) {
-          Socket connection = server.accept();
-          final DataInput in = new DataInputStream(connection.getInputStream());
-          this.workers.schedule(new Callable<Object>() {
-            public Object call() throws Exception {
-              BSPSerializableMessage tmp = new BSPSerializableMessage();
-              tmp.readFields(in);
-              messageQueue().put(tmp);
-              return null;
-            }
-          }, 0, SECONDS);
-        }
-      } catch(EOFException eofe) {
-        LOG.info("Closing checkpointer's input stream.", eofe);
-      } catch(IOException ioe) {
-        LOG.error("Error when reconstructing BSPSerializableMessage.", ioe);
-      }
-      return null;
-    }
-  }
-
-  public Checkpointer(final Configuration conf) throws IOException {
-    this.dfs = FileSystem.get(conf); 
-    if(null == this.dfs) 
-      throw new NullPointerException("HDFS instance not found.");
-    int port = conf.getInt("bsp.checkpoint.port", 
-      Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
-    if(LOG.isDebugEnabled()) 
-      LOG.debug("Checkpoint port value:"+port); 
-    this.messageDeserializer = new BSPMessageDeserializer(port);
-  }
-
-  /**
-   * Activate the checkpoint thread.
-   */
-  public void start(){
-    if(!ckptState.compareAndSet(false, true)) {
-      throw new IllegalStateException("Checkpointer has been started up.");
-    }
-    this.messageDeserializer.start();
-    this.future.set(this.scheduler.schedule(this, 0, SECONDS));
-    LOG.info("Checkpointer is started.");
-  }
-
-  /**
-   * Stop checkpoint thread.
-   */
-  public void stop(){
-    this.messageDeserializer.stop();
-    this.scheduler.shutdown();
-    this.ckptState.set(false);
-    LOG.info("Checkpointer is stopped.");
-  }
-  
-  /**
-   * Check if checkpointer is running.
-   * @return true if checkpointer is runing; false otherwise.
-   */
-  public boolean isAlive(){
-    return !this.scheduler.isShutdown() && this.ckptState.get();
-  }
-
-  public void join() throws InterruptedException, ExecutionException {
-    this.future.get().get();
-  }
-
-  public Boolean call() throws Exception {
-    BlockingQueue<BSPSerializableMessage> queue = 
-      this.messageDeserializer.messageQueue();
-    while(isAlive()) {
-      BSPSerializableMessage msg = queue.take();
-      String path = msg.checkpointedPath();
-      if(null == path || path.toString().isEmpty()) 
-        throw new NullPointerException("File dest is not provided.");
-      FSDataOutputStream out = this.dfs.create(new Path(path)); 
-      msg.messageBundle().write(out);
-      try { } finally { try { out.close(); } catch(IOException e) {
-        LOG.error("Fail to close hdfs output stream.", e); } 
-      } 
-    }
-    try {  } finally { LOG.info("Stop checkpointing."); this.stop(); }
-    return new Boolean(true);
-  }
-}

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Mon Oct 31 07:52:46 2011
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-public final class BSPSerializerWrapper {
-
-  private final BSPMessageSerializer serializer;
-
-  public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
-    this.serializer = new BSPMessageSerializer(conf, conf.getInt(
-        "bsp.checkpoint.port", port));
-  }
-
-  public final void serialize(final BSPSerializableMessage tmp)
-      throws IOException {
-    this.serializer.serialize(tmp);
-  }
-}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1195365&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Oct 31 07:52:46 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+
+public class TestCheckpoint extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
+
+  static final String checkpointedDir = "/tmp/checkpoint/job_201110302255_0001/0/";
+
+  public void testCheckpoint() throws Exception {
+    Configuration config = new HamaConfiguration();
+    FileSystem dfs = FileSystem.get(config);
+    BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
+    assertNotNull("BSPPeerImpl should not be null.", bspTask);
+    if(dfs.mkdirs(new Path("checkpoint"))) {
+      if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
+        if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")));
+      }
+    }
+    assertTrue("Make sure directory is created.", 
+               dfs.exists(new Path(checkpointedDir)));
+    byte[] tmpData = "data".getBytes();
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
+    assertNotNull("Message bundle can not be null.", bundle);
+    assertNotNull("Configuration should not be null.", config);
+    bspTask.checkpoint(checkpointedDir+"/attempt_201110302255_0001_000000_0", 
+                       bundle);
+    FSDataInputStream in = dfs.open(new Path(checkpointedDir+
+      "/attempt_201110302255_0001_000000_0"));
+    BSPMessageBundle bundleRead = new BSPMessageBundle();
+    bundleRead.readFields(in);
+    in.close();
+    ByteMessage byteMsg = (ByteMessage)(bundleRead.getMessages()).get(0);
+    String content = new String(byteMsg.getData());
+    LOG.info("Saved checkpointed content is "+content);
+    assertTrue("Message content should be the same.",  "data".equals(content)); 
+  }
+}

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Mon Oct 31 07:52:46 2011
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.checkpoint;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPMessage;
-import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.BSPSerializableMessage;
-import org.apache.hama.bsp.BSPSerializerWrapper;
-import org.apache.hama.bsp.DoubleMessage;
-
-public class TestCheckpoint extends TestCase {
-
-  public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
-
-  private CheckpointRunner runner;
-  private BSPSerializerWrapper serializer;
-  static final String TEST_STRING = "Test String";
-  private FileSystem hdfs;
-  static final DoubleMessage estimate = 
-    new DoubleMessage("192.168.1.123:61000", 3.1415926d);
-
-  public void setUp() throws Exception {
-    Configuration conf = new HamaConfiguration();
-    this.hdfs = FileSystem.get(conf);
-    assertNotNull("File system object should exist.", this.hdfs);
-    this.runner =  
-      new CheckpointRunner(CheckpointRunner.buildCommands(conf));
-    assertNotNull("Checkpoint instance should exist.", this.runner);
-    this.runner.start();
-    Thread.sleep(1000*1);
-    Process process = this.runner.getProcess();
-    assertNotNull("Checkpoint process should be created.", process);
-    this.serializer = new BSPSerializerWrapper(conf, 
-      Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
-  }
-
-  private BSPMessageBundle createMessageBundle() {
-    BSPMessageBundle bundle = new BSPMessageBundle();
-    bundle.addMessage(estimate);
-    return bundle;
-  }
-
-  private String checkpointedPath() {
-      return "/tmp/" + "job_201108221205_000" + "/" + "0" +
-      "/" + "attempt_201108221205_0001_000000_0";
-  }
-
-  public void testCheckpoint() throws Exception {
-    this.serializer.serialize(new BSPSerializableMessage(
-    checkpointedPath(), createMessageBundle()));
-    Thread.sleep(1000); 
-    Path path = new Path(checkpointedPath());
-    boolean exists = this.hdfs.exists(path);
-    assertTrue("Check if file is actually written to hdfs.", exists); 
-    BSPMessageBundle bundle = new BSPMessageBundle(); 
-    DataInput in = new DataInputStream(this.hdfs.open(path));
-    bundle.readFields(in);
-    List<BSPMessage> messages = bundle.getMessages();
-    assertEquals("Only one message exists.", 1,  messages.size());
-    for(BSPMessage message: messages) {
-      String peer = (String)message.getTag();
-      assertEquals("BSPPeer value in form of <ip>:<port>.", peer, estimate.getTag());
-      Double pi = (Double)message.getData();
-      assertEquals("Message content.", pi, estimate.getData());
-    }
-  }
-
-  public void tearDown() throws Exception {
-    this.runner.stop();
-  }
-  
-}