You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/10/31 08:52:46 UTC
svn commit: r1195365 - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/checkpoint/
core/src/test/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/checkpoint/
Author: edwardyoon
Date: Mon Oct 31 07:52:46 2011
New Revision: 1195365
URL: http://svn.apache.org/viewvc?rev=1195365&view=rev
Log:
Integrate checkpoint with bsp task
Added:
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 31 07:52:46 2011
@@ -23,6 +23,7 @@ Release 0.4 - Unreleased
IMPROVEMENTS
+ HAMA-463: Integrate checkpoint with bsp task (chl501)
HAMA-457: Refactoring of BSPPeerImpl (tjungblut)
HAMA-448: Restructure BSP API (Thomas Jungblut via edwardyoon)
HAMA-441: Logging tasks to distinct files (Thomas Jungblut)
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java Mon Oct 31 07:52:46 2011
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-public class BSPMessageSerializer {
-
- private static final Log LOG = LogFactory.getLog(BSPMessageSerializer.class);
-
- final Socket client;
- final ScheduledExecutorService sched;
- final Configuration conf;
-
- public BSPMessageSerializer(final Configuration conf, final int port) {
- this.conf = conf;
- Socket tmp = null;
- int cnt = 0;
- do {
- tmp = init(port);
- cnt++;
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- LOG.warn("Thread is interrupted.", ie);
- Thread.currentThread().interrupt();
- }
- } while (null == tmp && 10 > cnt);
- this.client = tmp;
- if (null == this.client)
- throw new NullPointerException("Client socket is null.");
- this.sched = Executors.newScheduledThreadPool(conf.getInt(
- "bsp.checkpoint.serializer_thread", 10));
- LOG.info(BSPMessageSerializer.class.getName()
- + " is ready to serialize message.");
- }
-
- private Socket init(final int port) {
- Socket tmp = null;
- try {
- tmp = new Socket("localhost", port);
- } catch (UnknownHostException uhe) {
- LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
- } catch (IOException ioe) {
- LOG.warn("Fail to create socket.", ioe);
- }
- return tmp;
- }
-
- void serialize(final BSPSerializableMessage tmp) throws IOException {
- if (LOG.isDebugEnabled())
- LOG.debug("Messages are saved to " + tmp.checkpointedPath());
- final DataOutput out = new DataOutputStream(client.getOutputStream());
- this.sched.schedule(new Callable<Object>() {
- public Object call() throws Exception {
- tmp.write(out);
- return null;
- }
- }, 0, SECONDS);
- }
-
- public void close() {
- try {
- this.client.close();
- this.sched.shutdown();
- } catch (IOException io) {
- LOG.error("Fail to close client socket.", io);
- }
- }
-
-}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Oct 31 07:52:46 2011
@@ -28,13 +28,15 @@ import java.util.concurrent.ConcurrentLi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.checkpoint.CheckpointRunner;
import org.apache.hama.ipc.BSPPeerProtocol;
/**
@@ -45,6 +47,7 @@ public class BSPPeerImpl implements BSPP
public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
private final Configuration conf;
+ private final FileSystem dfs;
private BSPJob bspJob;
private volatile Server server = null;
@@ -61,8 +64,6 @@ public class BSPPeerImpl implements BSPP
private TaskAttemptID taskId;
private BSPPeerProtocol umbilical;
- private final BSPMessageSerializer messageSerializer;
-
private String[] allPeers;
private SyncClient syncClient;
@@ -71,8 +72,18 @@ public class BSPPeerImpl implements BSPP
* Protected default constructor for LocalBSPRunner.
*/
protected BSPPeerImpl() {
- messageSerializer = null;
conf = null;
+ dfs = null;
+ }
+
+ /**
+ * For unit test.
+ * @param conf is the configuration file.
+ * @param dfs is the Hadoop FileSystem.
+ */
+ protected BSPPeerImpl(final Configuration conf, FileSystem dfs) {
+ this.conf = conf;
+ this.dfs = dfs;
}
/**
@@ -92,18 +103,17 @@ public class BSPPeerImpl implements BSPP
this.umbilical = umbilical;
this.bspJob = job;
+ FileSystem fs = null;
+ if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
+ fs = FileSystem.get(conf);
+ }
+ this.dfs = fs;
+
String bindAddress = conf.get(Constants.PEER_HOST,
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
peerAddress = new InetSocketAddress(bindAddress, bindPort);
- BSPMessageSerializer msgSerializer = null;
- if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
- msgSerializer = new BSPMessageSerializer(conf,
- conf.getInt("bsp.checkpoint.port",
- Integer.valueOf(CheckpointRunner.DEFAULT_PORT)));
- }
- this.messageSerializer = msgSerializer;
initialize();
syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
peerAddress.getPort());
@@ -173,6 +183,20 @@ public class BSPPeerImpl implements BSPP
return ckptPath;
}
+ void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
+ FSDataOutputStream out = null;
+ try {
+ out = this.dfs.create(new Path(checkpointedPath));
+ bundle.write(out);
+ } catch(IOException ioe) {
+ LOG.warn("Fail checkpointing messages to "+checkpointedPath, ioe);
+ } finally {
+ try { if(null != out) out.close(); } catch(IOException e) {
+ LOG.warn("Fail to close dfs output stream while checkpointing.", e);
+ }
+ }
+ }
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.BSPPeerInterface#sync()
@@ -203,10 +227,8 @@ public class BSPPeerImpl implements BSPP
}
}
- // checkpointing
- if (null != this.messageSerializer) {
- this.messageSerializer.serialize(new BSPSerializableMessage(
- checkpointedPath(), bundle));
+ if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
+ checkpoint(checkpointedPath(), bundle);
}
peer.put(bundle);
@@ -255,8 +277,6 @@ public class BSPPeerImpl implements BSPP
syncClient.close();
if (server != null)
server.stop();
- if (null != messageSerializer)
- this.messageSerializer.close();
}
@Override
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java Mon Oct 31 07:52:46 2011
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.io.Writable;
-
-public class BSPSerializableMessage implements Writable {
- final AtomicReference<String> path = new AtomicReference<String>();
- final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
-
- public BSPSerializableMessage() {
- }
-
- public BSPSerializableMessage(final String path, final BSPMessageBundle bundle) {
- if (null == path)
- throw new NullPointerException("No path provided for checkpointing.");
- if (null == bundle)
- throw new NullPointerException("No data provided for checkpointing.");
- this.path.set(path);
- this.bundle.set(bundle);
- }
-
- public final String checkpointedPath() {
- return this.path.get();
- }
-
- public final BSPMessageBundle messageBundle() {
- return this.bundle.get();
- }
-
- @Override
- public final void write(DataOutput out) throws IOException {
- out.writeUTF(this.path.get());
- this.bundle.get().write(out);
- }
-
- @Override
- public final void readFields(DataInput in) throws IOException {
- this.path.set(in.readUTF());
- BSPMessageBundle pack = new BSPMessageBundle();
- pack.readFields(in);
- this.bundle.set(pack);
- }
-}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Oct 31 07:52:46 2011
@@ -53,8 +53,6 @@ import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.checkpoint.CheckpointRunner;
-import org.apache.hama.checkpoint.Checkpointer;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.MasterProtocol;
@@ -132,8 +130,6 @@ public class GroomServer implements Runn
// private BlockingQueue<GroomServerAction> tasksToCleanup = new
// LinkedBlockingQueue<GroomServerAction>();
- private final CheckpointRunner checkpointRunner;
-
private class DispatchTasksHandler implements DirectiveHandler {
public void handle(Directive directive) throws DirectiveException {
@@ -233,13 +229,6 @@ public class GroomServer implements Runn
// FileSystem local = FileSystem.getLocal(conf);
// this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
- CheckpointRunner ckptRunner = null;
- if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
- ckptRunner = new CheckpointRunner(
- CheckpointRunner.buildCommands(this.conf));
- }
- this.checkpointRunner = ckptRunner;
-
try {
zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
@@ -334,10 +323,6 @@ public class GroomServer implements Runn
this.instructor.bind(DispatchTasksDirective.class,
new DispatchTasksHandler());
instructor.start();
- if (this.conf.getBoolean("bsp.checkpoint.enabled", false)
- && null != this.checkpointRunner && !this.checkpointRunner.isAlive()) {
- this.checkpointRunner.start();
- }
this.running = true;
this.initialized = true;
}
@@ -707,10 +692,6 @@ public class GroomServer implements Runn
cleanupStorage();
this.workerServer.stop();
RPC.stopProxy(masterClient);
- if (this.conf.getBoolean("bsp.checkpoint.enabled", false)
- && null != this.checkpointRunner && this.checkpointRunner.isAlive()) {
- this.checkpointRunner.stop();
- }
if (taskReportServer != null) {
taskReportServer.stop();
taskReportServer = null;
@@ -876,33 +857,6 @@ public class GroomServer implements Runn
}
/**
- * Checkpointer child process.
- */
- public static final class CheckpointerChild {
-
- public static void main(String[] args) throws Throwable {
- if (LOG.isDebugEnabled())
- LOG.debug("Starting Checkpointer child process.");
- HamaConfiguration defaultConf = new HamaConfiguration();
- // int ret = 0;
- if (null != args && 1 == args.length) {
- int port = Integer.parseInt(args[0]);
- defaultConf.setInt("bsp.checkpoint.port",
- Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
- if (LOG.isDebugEnabled())
- LOG.debug("Supplied checkpointer port value:" + port);
- Checkpointer ckpt = new Checkpointer(defaultConf);
- ckpt.start();
- ckpt.join();
- LOG.info("Checkpoint finishes its execution.");
- } else {
- throw new IllegalArgumentException(
- "Port value is not provided for checkpointing service.");
- }
- }
- }
-
- /**
* The main() for BSPPeer child processes.
*/
public static class BSPPeerChild {
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Oct 31 07:52:46 2011
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.RunJar;
-import org.apache.hama.checkpoint.CheckpointRunner;
/**
* Base class that runs a task in a separate process.
@@ -114,7 +113,7 @@ public class TaskRunner extends Thread {
+ exit_code + ".");
}
} catch (InterruptedException e) {
- LOG.warn("Thread is interrupted when execeuting Checkpointer process.",
+ LOG.warn("Thread is interrupted when execeuting BSP process.",
e);
} catch (IOException ioe) {
LOG.error("Error when executing BSPPeer process.", ioe);
@@ -204,7 +203,7 @@ public class TaskRunner extends Thread {
vargs.add(classPath.toString());
// Add main class and its arguments
LOG.debug("Executing child Process " + child.getName());
- vargs.add(child.getName()); // main of bsp or checkpointer Child
+ vargs.add(child.getName()); // bsp class name
if (GroomServer.BSPPeerChild.class.equals(child)) {
InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
@@ -213,21 +212,11 @@ public class TaskRunner extends Thread {
vargs.add(task.getTaskID().toString());
vargs.add(groomServer.groomHostName);
}
-
- if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
- String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
- CheckpointRunner.DEFAULT_PORT);
- LOG.debug("Checkpointer's port:" + ckptPort);
- vargs.add(ckptPort);
- }
-
return vargs;
}
/**
- * Build working environment and launch BSPPeer and Checkpointer processes.
- * And transmit data from BSPPeer's inputstream to Checkpointer's
- * OutputStream.
+ * Build working environment and launch BSPPeer processes.
*/
public void run() {
File workDir = createWorkDirectory();
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java Mon Oct 31 07:52:46 2011
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.checkpoint;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.GroomServer.CheckpointerChild;
-
-
-public final class CheckpointRunner implements Callable<Object> {
-
- public static final Log LOG = LogFactory.getLog(CheckpointRunner.class);
- public static final String DEFAULT_PORT = "1590";
-
- private final List<String> commands;
- private final ScheduledExecutorService sched;
- private final AtomicReference<Process> process;
- private final AtomicBoolean isAlive = new AtomicBoolean(false);
-
- public CheckpointRunner(List<String> commands) {
- this.commands = commands;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Command for executing Checkpoint runner:"+
- Arrays.toString(this.commands.toArray()));
- }
- this.sched = Executors.newScheduledThreadPool(10);
- this.process = new AtomicReference<Process>();
- }
-
- public static final List<String> buildCommands(final Configuration config) {
- List<String> vargs = new ArrayList<String>();
- File jvm =
- new File(new File(System.getProperty("java.home"), "bin"), "java");
- vargs.add(jvm.toString());
-
- String javaOpts = config.get("bsp.checkpoint.child.java.opts", "-Xmx50m");
- String[] javaOptsSplit = javaOpts.split(" ");
- for (int i = 0; i < javaOptsSplit.length; i++) {
- vargs.add(javaOptsSplit[i]);
- }
- vargs.add("-classpath");
- vargs.add(System.getProperty("java.class.path"));
- vargs.add(CheckpointerChild.class.getName());
- String port = config.get("bsp.checkpoint.port", DEFAULT_PORT);
- if(LOG.isDebugEnabled())
- LOG.debug("Checkpointer's port:"+port);
- vargs.add(port);
-
- return vargs;
- }
-
- public void start() {
- if(!isAlive.compareAndSet(false, true)) {
- throw new IllegalStateException(this.getClass().getName()+
- " is already running.");
- }
- this.sched.schedule(this, 0, SECONDS);
- LOG.info("Start building checkpointer process.");
- }
-
- public void stop() {
- kill();
- this.sched.shutdown();
- LOG.info("Stop checkpointer process.");
- }
-
- public Process getProcess() {
- return this.process.get();
- }
-
- public void kill() {
- if (this.process.get() != null) {
- this.process.get().destroy();
- }
- isAlive.set(false);
- }
-
- public boolean isAlive() {
- return isAlive.get();
- }
-
- public Object call() throws Exception {
- ProcessBuilder builder = new ProcessBuilder(commands);
- try{
- this.process.set(builder.start());
- new Thread() {
- @Override
- public void run() {
- logStream(process.get().getErrorStream());
- }
- }.start();
-
- new Thread() {
- @Override
- public void run() {
- logStream(process.get().getInputStream());
- }
- }.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- LOG.info("Destroying checkpointer process.");
- getProcess().destroy();
- }
- });
-
- int exit_code = this.process.get().waitFor();
- if (!isAlive() && exit_code != 0) {
- throw new IOException("Checkpointer process exit with nonzero status of "
- + exit_code + ".");
- }
- } catch(InterruptedException e){
- LOG.warn("Thread is interrupted when execeuting Checkpointer process.", e);
- } catch(IOException ioe) {
- LOG.error("Error when executing Checkpointer process.", ioe);
- } finally {
- kill();
- }
- return null;
- }
-
- private void logStream(InputStream output) {
- try {
- BufferedReader in = new BufferedReader(new InputStreamReader(output));
- String line;
- while ((line = in.readLine()) != null) {
- LOG.info(line);
- }
- } catch (IOException e) {
- LOG.warn("Error reading checkpoint process's inputstream.", e);
- } finally {
- try {
- output.close();
- } catch (IOException e) {
- LOG.warn("Error closing checkpoint's inputstream.", e);
- }
- }
- }
-}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java Mon Oct 31 07:52:46 2011
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.checkpoint;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.bsp.BSPSerializableMessage;
-
-/**
- * This class is responsible for checkpointing messages to hdfs.
- */
-public final class Checkpointer implements Callable<Object> {
-
- public static Log LOG = LogFactory.getLog(Checkpointer.class);
-
- private final ScheduledExecutorService scheduler =
- Executors.newScheduledThreadPool(1);
- private final FileSystem dfs;
- private final AtomicBoolean ckptState = new AtomicBoolean(false);
- private final BSPMessageDeserializer messageDeserializer;
- private final AtomicReference<ScheduledFuture<Object>> future =
- new AtomicReference<ScheduledFuture<Object>>();
-
- /**
- * Reading from socket inputstream as DataInput.
- */
- public static final class BSPMessageDeserializer implements Callable<Object> {
- final BlockingQueue<BSPSerializableMessage> messageQueue =
- new LinkedBlockingQueue<BSPSerializableMessage>();
- final ScheduledExecutorService sched;
- final ScheduledExecutorService workers;
- final AtomicBoolean serializerState = new AtomicBoolean(false);
- final ServerSocket server;
-
- public BSPMessageDeserializer(final int port) throws IOException {
- this.sched = Executors.newScheduledThreadPool(1);
- this.workers = Executors.newScheduledThreadPool(10);
- this.server = new ServerSocket(port);
- LOG.info("Deserializer's port is opened at "+port);
- }
-
- public int port() {
- return this.server.getLocalPort();
- }
-
- public void start() {
- if(!serializerState.compareAndSet(false, true)) {
- throw new IllegalStateException("BSPMessageDeserializer has been "+
- "started up.");
- }
- this.sched.schedule(this, 0, SECONDS);
- LOG.info("BSPMessageDeserializer is started.");
- }
-
- public void stop() {
- try {
- this.server.close();
- } catch(IOException ioe) {
- LOG.error("Unable to close message serializer server socket.", ioe);
- }
- this.sched.shutdown();
- this.workers.shutdown();
- this.serializerState.set(false);
- LOG.info("BSPMessageDeserializer is stopped.");
- }
-
- public boolean state(){
- return this.serializerState.get();
- }
-
- /**
- * Message is enqueued for communcating data sent from BSPPeer.
- */
- public BlockingQueue<BSPSerializableMessage> messageQueue() {
- return this.messageQueue;
- }
-
- public Object call() throws Exception {
- try {
- while(state()) {
- Socket connection = server.accept();
- final DataInput in = new DataInputStream(connection.getInputStream());
- this.workers.schedule(new Callable<Object>() {
- public Object call() throws Exception {
- BSPSerializableMessage tmp = new BSPSerializableMessage();
- tmp.readFields(in);
- messageQueue().put(tmp);
- return null;
- }
- }, 0, SECONDS);
- }
- } catch(EOFException eofe) {
- LOG.info("Closing checkpointer's input stream.", eofe);
- } catch(IOException ioe) {
- LOG.error("Error when reconstructing BSPSerializableMessage.", ioe);
- }
- return null;
- }
- }
-
- public Checkpointer(final Configuration conf) throws IOException {
- this.dfs = FileSystem.get(conf);
- if(null == this.dfs)
- throw new NullPointerException("HDFS instance not found.");
- int port = conf.getInt("bsp.checkpoint.port",
- Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
- if(LOG.isDebugEnabled())
- LOG.debug("Checkpoint port value:"+port);
- this.messageDeserializer = new BSPMessageDeserializer(port);
- }
-
- /**
- * Activate the checkpoint thread.
- */
- public void start(){
- if(!ckptState.compareAndSet(false, true)) {
- throw new IllegalStateException("Checkpointer has been started up.");
- }
- this.messageDeserializer.start();
- this.future.set(this.scheduler.schedule(this, 0, SECONDS));
- LOG.info("Checkpointer is started.");
- }
-
- /**
- * Stop checkpoint thread.
- */
- public void stop(){
- this.messageDeserializer.stop();
- this.scheduler.shutdown();
- this.ckptState.set(false);
- LOG.info("Checkpointer is stopped.");
- }
-
- /**
- * Check if checkpointer is running.
- * @return true if checkpointer is runing; false otherwise.
- */
- public boolean isAlive(){
- return !this.scheduler.isShutdown() && this.ckptState.get();
- }
-
- public void join() throws InterruptedException, ExecutionException {
- this.future.get().get();
- }
-
- public Boolean call() throws Exception {
- BlockingQueue<BSPSerializableMessage> queue =
- this.messageDeserializer.messageQueue();
- while(isAlive()) {
- BSPSerializableMessage msg = queue.take();
- String path = msg.checkpointedPath();
- if(null == path || path.toString().isEmpty())
- throw new NullPointerException("File dest is not provided.");
- FSDataOutputStream out = this.dfs.create(new Path(path));
- msg.messageBundle().write(out);
- try { } finally { try { out.close(); } catch(IOException e) {
- LOG.error("Fail to close hdfs output stream.", e); }
- }
- }
- try { } finally { LOG.info("Stop checkpointing."); this.stop(); }
- return new Boolean(true);
- }
-}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Mon Oct 31 07:52:46 2011
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-public final class BSPSerializerWrapper {
-
- private final BSPMessageSerializer serializer;
-
- public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
- this.serializer = new BSPMessageSerializer(conf, conf.getInt(
- "bsp.checkpoint.port", port));
- }
-
- public final void serialize(final BSPSerializableMessage tmp)
- throws IOException {
- this.serializer.serialize(tmp);
- }
-}
Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1195365&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Oct 31 07:52:46 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+
+public class TestCheckpoint extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
+
+ static final String checkpointedDir = "/tmp/checkpoint/job_201110302255_0001/0/";
+
+ public void testCheckpoint() throws Exception {
+ Configuration config = new HamaConfiguration();
+ FileSystem dfs = FileSystem.get(config);
+ BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
+ assertNotNull("BSPPeerImpl should not be null.", bspTask);
+ if(dfs.mkdirs(new Path("checkpoint"))) {
+ if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
+ if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")));
+ }
+ }
+ assertTrue("Make sure directory is created.",
+ dfs.exists(new Path(checkpointedDir)));
+ byte[] tmpData = "data".getBytes();
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
+ assertNotNull("Message bundle can not be null.", bundle);
+ assertNotNull("Configuration should not be null.", config);
+ bspTask.checkpoint(checkpointedDir+"/attempt_201110302255_0001_000000_0",
+ bundle);
+ FSDataInputStream in = dfs.open(new Path(checkpointedDir+
+ "/attempt_201110302255_0001_000000_0"));
+ BSPMessageBundle bundleRead = new BSPMessageBundle();
+ bundleRead.readFields(in);
+ in.close();
+ ByteMessage byteMsg = (ByteMessage)(bundleRead.getMessages()).get(0);
+ String content = new String(byteMsg.getData());
+ LOG.info("Saved checkpointed content is "+content);
+ assertTrue("Message content should be the same.", "data".equals(content));
+ }
+}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1195365&r1=1195364&r2=1195365&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Mon Oct 31 07:52:46 2011
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.checkpoint;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPMessage;
-import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.BSPSerializableMessage;
-import org.apache.hama.bsp.BSPSerializerWrapper;
-import org.apache.hama.bsp.DoubleMessage;
-
-public class TestCheckpoint extends TestCase {
-
- public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
-
- private CheckpointRunner runner;
- private BSPSerializerWrapper serializer;
- static final String TEST_STRING = "Test String";
- private FileSystem hdfs;
- static final DoubleMessage estimate =
- new DoubleMessage("192.168.1.123:61000", 3.1415926d);
-
- public void setUp() throws Exception {
- Configuration conf = new HamaConfiguration();
- this.hdfs = FileSystem.get(conf);
- assertNotNull("File system object should exist.", this.hdfs);
- this.runner =
- new CheckpointRunner(CheckpointRunner.buildCommands(conf));
- assertNotNull("Checkpoint instance should exist.", this.runner);
- this.runner.start();
- Thread.sleep(1000*1);
- Process process = this.runner.getProcess();
- assertNotNull("Checkpoint process should be created.", process);
- this.serializer = new BSPSerializerWrapper(conf,
- Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
- }
-
- private BSPMessageBundle createMessageBundle() {
- BSPMessageBundle bundle = new BSPMessageBundle();
- bundle.addMessage(estimate);
- return bundle;
- }
-
- private String checkpointedPath() {
- return "/tmp/" + "job_201108221205_000" + "/" + "0" +
- "/" + "attempt_201108221205_0001_000000_0";
- }
-
- public void testCheckpoint() throws Exception {
- this.serializer.serialize(new BSPSerializableMessage(
- checkpointedPath(), createMessageBundle()));
- Thread.sleep(1000);
- Path path = new Path(checkpointedPath());
- boolean exists = this.hdfs.exists(path);
- assertTrue("Check if file is actually written to hdfs.", exists);
- BSPMessageBundle bundle = new BSPMessageBundle();
- DataInput in = new DataInputStream(this.hdfs.open(path));
- bundle.readFields(in);
- List<BSPMessage> messages = bundle.getMessages();
- assertEquals("Only one message exists.", 1, messages.size());
- for(BSPMessage message: messages) {
- String peer = (String)message.getTag();
- assertEquals("BSPPeer value in form of <ip>:<port>.", peer, estimate.getTag());
- Double pi = (Double)message.getData();
- assertEquals("Message content.", pi, estimate.getData());
- }
- }
-
- public void tearDown() throws Exception {
- this.runner.stop();
- }
-
-}