You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/03/05 00:08:08 UTC
svn commit: r1296893 - in /incubator/hama/trunk: ./ conf/
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/bsp/
yarn/src/main/java/org/apache/hama/bsp/
Author: edwardyoon
Date: Sun Mar 4 23:08:07 2012
New Revision: 1296893
URL: http://svn.apache.org/viewvc?rev=1296893&view=rev
Log:
BSPTask should periodically ping its parent
Added:
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (with props)
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Mar 4 23:08:07 2012
@@ -12,6 +12,7 @@ Release 0.5 - Unreleased
IMPROVEMENTS
+ HAMA-498: BSPTask should periodically ping its parent (Suraj Menon via edwardyoon)
HAMA-513: Move message classes to somewhere from bsp package. (tjungblut)
HAMA-484: Counters should be accessible in client (tjungblut)
HAMA-483: Remove old and deprecated BSP API (tjungblut)
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Sun Mar 4 23:08:07 2012
@@ -60,6 +60,11 @@
interface.</description>
</property>
<property>
+ <name>bsp.groomserver.pingperiod</name>
+ <value>5000</value>
+ <description>Periodicity in milliseconds that every BSP task should send its hearbeat ping latest. If a task fails to do so, the groom server would deem the task as failed.</description>
+ </property>
+ <property>
<name>bsp.system.dir</name>
<value>${hadoop.tmp.dir}/bsp/system</value>
<description>The shared directory where BSP stores control files.
@@ -104,6 +109,17 @@
<description>The maximum number of BSP tasks that will be run simultaneously
by a groom server.</description>
</property>
+ <property>
+ <name>bsp.checkpoint.enabled</name>
+ <value>false</value>
+ <description>Enable Hama to checkpoint the messages transferred among BSP tasks during the BSP synchronization period.</description>
+ </property>
+ <property>
+ <name>bsp.checkpoint.interval</name>
+ <value>1</value>
+ <description>If bsp.checkpoint.enabled is set to true, the checkpointing is initiated on the valueth synchronization process of BSP tasks.</description>
+ </property>
+
<!--
Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.
@@ -161,7 +177,7 @@
</property>
<property>
- <name>hama.messanger.class</name>
+ <name>hama.messenger.class</name>
<value>org.apache.hama.bsp.message.AvroMessageManagerImpl</value>
</property>
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java Sun Mar 4 23:08:07 2012
@@ -23,8 +23,8 @@ package org.apache.hama;
* Some constants used in the Hama.
*/
public interface Constants {
-
- public static final String GROOM_RPC_HOST = "bsp.groom.rpc.hostname";
+
+ public static final String GROOM_RPC_HOST = "bsp.groom.rpc.hostname";
public static final String DEFAULT_GROOM_RPC_HOST = "0.0.0.0";
@@ -32,13 +32,12 @@ public interface Constants {
/** Default port region rpc server listens on. */
public static final int DEFAULT_GROOM_RPC_PORT = 50000;
-
- ///////////////////////////////////////
+ // /////////////////////////////////////
// Constants for BSP Package
- ///////////////////////////////////////
+ // /////////////////////////////////////
/** default host address */
- public static final String PEER_HOST = "bsp.peer.hostname";
+ public static final String PEER_HOST = "bsp.peer.hostname";
/** default host address */
public static final String DEFAULT_PEER_HOST = "0.0.0.0";
@@ -47,18 +46,35 @@ public interface Constants {
public static final int DEFAULT_PEER_PORT = 61000;
public static final String PEER_ID = "bsp.peer.id";
-
+
/** Parameter name for what groom server implementation to use. */
- public static final String GROOM_SERVER_IMPL= "hama.groomserver.impl";
-
+ public static final String GROOM_SERVER_IMPL = "hama.groomserver.impl";
+
+ /** Parameter name for interval at which bsp peer should ping groomserver */
+ public static final String GROOM_PING_PERIOD = "bsp.groomserver.pingperiod";
+
+ /** Default value of ping period in milliseconds. */
+ public static final long DEFAULT_GROOM_PING_PERIOD = 5000;
+
/** When we encode strings, we always specify UTF8 encoding */
public static final String UTF8_ENCODING = "UTF-8";
-
+
public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
-
- ///////////////////////////////////////
+
+ // //////////////////////////////////////
+ // Checkpointing related constants
+ // //////////////////////////////////////
+
+ // Set to true to enable checkpointing.
+ public static final String CHECKPOINT_ENABLED = "bsp.checkpoint.enabled";
+ // Superstep interval at which BSPPeer should initiate a checkpoint.
+ public static final String CHECKPOINT_INTERVAL = "bsp.checkpoint.interval";
+ // By default checkpointing when enabled would checkpoint on every superstep
+ public static final short DEFAULT_CHECKPOINT_INTERVAL = 1;
+
+ // /////////////////////////////////////
// Constants for ZooKeeper
- ///////////////////////////////////////
+ // /////////////////////////////////////
/** zookeeper root */
public static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
/** zookeeper default root */
@@ -76,7 +92,7 @@ public interface Constants {
public static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
/** Default ZooKeeper pause value. In milliseconds. */
public static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
-
+
static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
static final String ZOOKEEPER_CLIENT_PORT = "hama.zookeeper.property.clientPort";
static final String ZOOKEEPER_SESSION_TIMEOUT = "hama.zookeeper.session.timeout";
@@ -87,11 +103,10 @@ public interface Constants {
/** Cluster is fully-distributed */
static final String CLUSTER_IS_DISTRIBUTED = "true";
-
// Other constants
/**
* An empty instance.
*/
- static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
+ static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sun Mar 4 23:08:07 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
@@ -381,4 +382,12 @@ public class BSPJob extends BSPJobContex
public void setMaxIteration(int maxIteration) {
conf.setInt("hama.graph.max.iteration", maxIteration);
}
+
+ public void setCheckPointInterval(int checkPointInterval) {
+ conf.setInt(Constants.CHECKPOINT_INTERVAL, checkPointInterval);
+ }
+
+ public void setCheckPointFlag(boolean enableCheckPoint) {
+ conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
+ }
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sun Mar 4 23:08:07 2012
@@ -70,6 +70,10 @@ public final class BSPPeerImpl<K1, V1, K
private SyncClient syncClient;
private MessageManager<M> messenger;
+ // A checkpoint is initiated at the <checkPointInterval>th interval.
+ private int checkPointInterval;
+ private long lastCheckPointStep;
+
// IO
private int partition;
private String splitClass;
@@ -125,6 +129,10 @@ public final class BSPPeerImpl<K1, V1, K
this.fs = FileSystem.get(conf);
+ this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+ Constants.DEFAULT_CHECKPOINT_INTERVAL);
+ this.lastCheckPointStep = 0;
+
String bindAddress = conf.get(Constants.PEER_HOST,
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
@@ -206,6 +214,25 @@ public final class BSPPeerImpl<K1, V1, K
messenger.send(peerName, msg);
}
+ /*
+ * returns true if the peer would checkpoint in the next sync.
+ */
+ public final boolean isReadyToCheckpoint() {
+
+ checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
+ if (LOG.isDebugEnabled())
+ LOG.debug(new StringBuffer(1000).append("Enabled = ")
+ .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+ .append(" checkPointInterval = ").append(checkPointInterval)
+ .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+ .append(" getSuperstepCount() = ").append(getSuperstepCount())
+ .toString());
+
+ return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
+ && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep)) >= checkPointInterval));
+
+ }
+
private final String checkpointedPath() {
String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
String ckptPath = backup + bspJob.getJobID().toString() + "/"
@@ -243,6 +270,12 @@ public final class BSPPeerImpl<K1, V1, K
Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
.getMessageIterator();
+ boolean shouldCheckPoint = false;
+
+ if ((shouldCheckPoint = isReadyToCheckpoint())) {
+ lastCheckPointStep = getSuperstepCount();
+ }
+
while (it.hasNext()) {
Entry<InetSocketAddress, LinkedList<M>> entry = it.next();
final InetSocketAddress addr = entry.getKey();
@@ -250,7 +283,7 @@ public final class BSPPeerImpl<K1, V1, K
final BSPMessageBundle<M> bundle = combineMessages(messages);
- if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
+ if (shouldCheckPoint) {
checkpoint(checkpointedPath(), bundle);
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sun Mar 4 23:08:07 2012
@@ -20,6 +20,10 @@ package org.apache.hama.bsp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Calendar;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,7 +31,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.Constants;
import org.apache.hama.ipc.BSPPeerProtocol;
/**
@@ -41,7 +45,53 @@ public final class BSPTask extends Task
BytesWritable split;
String splitClass;
+ // Schedule Heartbeats to GroomServer
+ private ScheduledExecutorService pingService;
+
+ /**
+ * This thread is responsible for sending a heartbeat ping to GroomServer.
+ */
+ private static class PingGroomServer implements Runnable {
+
+ private BSPPeerProtocol pingRPC;
+ private TaskAttemptID taskId;
+ private Thread bspThread;
+
+ public PingGroomServer(BSPPeerProtocol umbilical, TaskAttemptID id) {
+ pingRPC = umbilical;
+ taskId = id;
+ bspThread = Thread.currentThread();
+ }
+
+ @Override
+ public void run() {
+
+ boolean shouldKillSelf = false;
+ try {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Pinging at time " + Calendar.getInstance().toString());
+ // if the RPC call returns false, it means that groomserver does not
+ // have knowledge of this task.
+ shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
+ } catch (IOException e) {
+ LOG.error(new StringBuilder(
+ "IOException pinging GroomServer from task - ").append(taskId), e);
+ shouldKillSelf = true;
+ } catch (Exception e) {
+ LOG.error(new StringBuilder(
+ "Exception pinging GroomServer from task - ").append(taskId), e);
+ shouldKillSelf = true;
+ }
+ if (shouldKillSelf) {
+ LOG.error("Killing self. No connection to groom.");
+ System.exit(69);
+ }
+
+ }
+ }
+
public BSPTask() {
+ this.pingService = Executors.newScheduledThreadPool(1);
}
public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid,
@@ -53,6 +103,8 @@ public final class BSPTask extends Task
this.splitClass = splitClass;
this.split = split;
+
+ this.pingService = Executors.newScheduledThreadPool(1);
}
@Override
@@ -60,31 +112,82 @@ public final class BSPTask extends Task
return new BSPTaskRunner(this, groom, this.conf);
}
+ private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) {
+
+ long pingPeriod = job.getConf().getLong(Constants.GROOM_PING_PERIOD,
+ Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
+
+ try {
+ if (pingPeriod > 0) {
+ pingService.scheduleWithFixedDelay(new PingGroomServer(umbilical,
+ taskId), 0, pingPeriod, TimeUnit.MILLISECONDS);
+ LOG.error("Started pinging to groom");
+ }
+ } catch (Exception e) {
+ LOG.error("Error scheduling ping service", e);
+ }
+ }
+
+ private void stopPingingGroom() {
+ if (pingService != null) {
+ LOG.error("Shutting down ping service.");
+ pingService.shutdownNow();
+ }
+ }
+
@Override
public final void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?, ?> bspPeer,
- BSPPeerProtocol umbilical) throws IOException, SyncException,
- ClassNotFoundException, InterruptedException {
- runBSP(job, bspPeer, split, umbilical);
+ BSPPeerProtocol umbilical) throws Exception {
+
+ startPingingGroom(job, umbilical);
+ try {
+ runBSP(job, bspPeer, split, umbilical);
+ done(umbilical);
+ } finally {
+ stopPingingGroom();
+ }
- done(umbilical);
}
@SuppressWarnings("unchecked")
private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP(
- final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
+ final BSPJob job,
+ BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
- throws IOException, SyncException, ClassNotFoundException,
- InterruptedException {
+ throws Exception {
BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
.newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
job.getConf());
- bsp.setup(bspPeer);
- bsp.bsp(bspPeer);
-
- bsp.cleanup(bspPeer);
- bspPeer.close();
+ // The policy is to throw the first exception and log the remaining.
+ Exception firstException = null;
+ try {
+ bsp.setup(bspPeer);
+ bsp.bsp(bspPeer);
+ } catch (Exception e) {
+ LOG.error("Error running bsp setup and bsp function.", e);
+ firstException = e;
+ } finally {
+ try {
+ bsp.cleanup(bspPeer);
+ } catch (Exception e) {
+ LOG.error("Error cleaning up after bsp executed.", e);
+ if (firstException == null)
+ firstException = e;
+ } finally {
+
+ try {
+ bspPeer.close();
+ } catch (Exception e) {
+ LOG.error("Error closing BSP Peer.", e);
+ if (firstException == null)
+ firstException = e;
+ }
+ if (firstException != null)
+ throw firstException;
+ }
+ }
}
public final BSPJob getConf() {
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sun Mar 4 23:08:07 2012
@@ -24,8 +24,10 @@ import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -34,7 +36,10 @@ import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,9 +53,9 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
@@ -133,6 +138,9 @@ public class GroomServer implements Runn
// private BlockingQueue<GroomServerAction> tasksToCleanup = new
// LinkedBlockingQueue<GroomServerAction>();
+ // Schedule Heartbeats to GroomServer
+ private ScheduledExecutorService taskMonitorService;
+
private class DispatchTasksHandler implements DirectiveHandler {
public void handle(Directive directive) throws DirectiveException {
@@ -213,6 +221,49 @@ public class GroomServer implements Runn
}
}
+ /*
+ * This thread is responsible for monitoring the pings from peers. If any peer
+ * fails to ping this groom for a pre-defined period, the task is purged from
+ * the records.
+ */
+ private class BSPTasksMonitor extends Thread {
+
+ private List<TaskInProgress> outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
+ conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
+
+ private BSPTasksMonitor() {
+
+ outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
+ conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
+ }
+
+ public void run() {
+
+ getObliviousTasks(outOfContactTasks);
+
+ if (outOfContactTasks.size() > 0) {
+ LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
+ }
+
+ Iterator<TaskInProgress> taskIter = outOfContactTasks.iterator();
+
+ while (taskIter.hasNext()) {
+ TaskInProgress tip = taskIter.next();
+ try {
+ LOG.debug("Purging task " + tip);
+ purgeTask(tip, true);
+ } catch (Exception e) {
+ LOG.error(
+ new StringBuilder("Error while removing a timed-out task - ")
+ .append(tip.toString()), e);
+
+ }
+ }
+ outOfContactTasks.clear();
+
+ }
+ }
+
public GroomServer(Configuration conf) throws IOException {
LOG.info("groom start");
this.conf = conf;
@@ -322,7 +373,17 @@ public class GroomServer implements Runn
new DispatchTasksHandler());
instructor.start();
- if(conf.getBoolean("bsp.monitor.enabled", true)) {
+ if (this.taskMonitorService == null) {
+ this.taskMonitorService = Executors.newScheduledThreadPool(1);
+ long monitorPeriod = this.conf.getLong(Constants.GROOM_PING_PERIOD,
+ Constants.DEFAULT_GROOM_PING_PERIOD);
+ if (monitorPeriod > 0) {
+ this.taskMonitorService.scheduleWithFixedDelay(new BSPTasksMonitor(),
+ 1000, monitorPeriod, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ if (conf.getBoolean("bsp.monitor.enabled", true)) {
// TODO: conf.get("bsp.monitor.class.impl", "Monitor.class")
// so user can switch to customized monitor impl if necessary.
new Monitor(conf, zk, this.groomServerName).start();
@@ -608,6 +669,50 @@ public class GroomServer implements Runn
}
}
+ private synchronized void getObliviousTasks(
+ List<TaskInProgress> outOfContactTasks) {
+
+ if (runningTasks == null) {
+ LOG.debug("returning null");
+ return;
+ }
+
+ long currentTime = Calendar.getInstance().getTimeInMillis();
+ long monitorPeriod = conf.getLong(Constants.GROOM_PING_PERIOD,
+ Constants.DEFAULT_GROOM_PING_PERIOD);
+
+ for (Map.Entry<TaskAttemptID, TaskInProgress> entry : runningTasks
+ .entrySet()) {
+ TaskInProgress tip = entry.getValue();
+ if (LOG.isDebugEnabled())
+ LOG.debug("checking task: "
+ + tip.getTask().getTaskID()
+ + " starttime ="
+ + tip.startTime
+ + " lastping = "
+ + tip.lastPingedTimestamp
+ + " run state = "
+ + tip.taskStatus.getRunState().toString()
+ + " monitorPeriod = "
+ + monitorPeriod
+ + " check = "
+ + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+
+ // Task is out of contact if it has not pinged since more than
+ // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
+ // to get started.
+ if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
+ && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
+
+ LOG.info("adding purge task: " + tip.getTask().getTaskID());
+
+ outOfContactTasks.add(tip);
+ }
+
+ }
+
+ }
+
/**
* The datastructure for initializing a job
*/
@@ -711,6 +816,11 @@ public class GroomServer implements Runn
taskReportServer.stop();
taskReportServer = null;
}
+
+ if (taskMonitorService != null) {
+ taskMonitorService.shutdownNow();
+ taskMonitorService = null;
+ }
}
public static Thread startGroomServer(final GroomServer hrs) {
@@ -738,6 +848,9 @@ public class GroomServer implements Runn
volatile boolean wasKilled = false;
private TaskStatus taskStatus;
+ private long startTime = 0L;
+ private volatile long lastPingedTimestamp = 0L;
+
public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
this.task = task;
this.jobConf = jobConf;
@@ -782,6 +895,7 @@ public class GroomServer implements Runn
taskStatus.setRunState(TaskStatus.State.RUNNING);
this.runner = task.createRunner(GroomServer.this);
this.runner.start();
+ startTime = Calendar.getInstance().getTimeInMillis();
LOG.info("Task '" + task.getTaskID().toString() + "' has started.");
}
@@ -873,6 +987,10 @@ public class GroomServer implements Runn
}
}
}
+
+ public synchronized void ping(long timestamp) {
+ this.lastPingedTimestamp = timestamp;
+ }
}
public boolean isRunning() {
@@ -983,6 +1101,7 @@ public class GroomServer implements Runn
} catch (FSError e) {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage());
+ e.printStackTrace();
} catch (SyncException e) {
LOG.fatal("SyncError from child", e);
umbilical.fatalError(taskid, e.toString());
@@ -990,11 +1109,13 @@ public class GroomServer implements Runn
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(baos));
+ e.printStackTrace();
} catch (Throwable throwable) {
- LOG.warn("Error running child", throwable);
+ LOG.fatal("Error running child", throwable);
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
+ throwable.printStackTrace();
} finally {
RPC.stopProxy(umbilical);
// Shutting down log4j of the child-vm...
@@ -1017,7 +1138,11 @@ public class GroomServer implements Runn
@Override
public boolean ping(TaskAttemptID taskid) throws IOException {
- // TODO Auto-generated method stub
+ TaskInProgress tip = runningTasks.get(taskid);
+ if (tip != null) {
+ tip.ping(Calendar.getInstance().getTimeInMillis());
+ return true;
+ }
return false;
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sun Mar 4 23:08:07 2012
@@ -246,18 +246,39 @@ public class LocalBSPRunner implements J
realBytes = splits[id].getBytes();
}
- peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
- new TaskID(job.getJobID(), id), id), new LocalUmbilical(), id,
- splitname, realBytes, new Counters());
+ peer = new BSPPeerImpl(job, conf, new TaskAttemptID(new TaskID(
+ job.getJobID(), id), id), new LocalUmbilical(), id, splitname,
+ realBytes, new Counters());
+ // Throw the first exception and log all the other exception.
+ Exception firstException = null;
try {
bsp.setup(peer);
bsp.bsp(peer);
} catch (Exception e) {
LOG.error("Exception during BSP execution!", e);
+ firstException = e;
+ } finally {
+ try {
+ bsp.cleanup(peer);
+ } catch (Exception e) {
+ LOG.error("Error cleaning up after bsp execution.", e);
+ if (firstException == null)
+ firstException = e;
+ } finally {
+ try {
+ peer.clear();
+ peer.close();
+ } catch (Exception e) {
+ LOG.error("Exception closing BSP peer,", e);
+ if (firstException == null)
+ firstException = e;
+ } finally {
+ if (firstException != null)
+ throw firstException;
+ }
+ }
+
}
- bsp.cleanup(peer);
- peer.clear();
- peer.close();
}
@Override
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Sun Mar 4 23:08:07 2012
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.ipc.BSPPeerProtocol;
/**
@@ -48,7 +47,7 @@ public abstract class Task implements Wr
// Current counters
private transient Counters counters = new Counters();
-
+
public Task() {
jobId = new BSPJobID();
taskId = new TaskAttemptID();
@@ -98,15 +97,17 @@ public abstract class Task implements Wr
public int getPartition() {
return partition;
}
-
- /** Construct output file names so that, when an output directory listing is
- * sorted lexicographically, positions correspond to output partitions.*/
+
+ /**
+ * Construct output file names so that, when an output directory listing is
+ * sorted lexicographically, positions correspond to output partitions.
+ */
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
-
+
static synchronized String getOutputName(int partition) {
return "part-" + NUMBER_FORMAT.format(partition);
}
@@ -142,18 +143,21 @@ public abstract class Task implements Wr
* @param bspPeer for communications
* @param umbilical for communications with GroomServer
*/
- public abstract void run(BSPJob job, BSPPeerImpl<?,?,?,?,?> bspPeer, BSPPeerProtocol umbilical)
- throws IOException, SyncException, ClassNotFoundException, InterruptedException;
+ public abstract void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?, ?> bspPeer,
+ BSPPeerProtocol umbilical) throws Exception;
public abstract BSPTaskRunner createRunner(GroomServer groom);
public void done(BSPPeerProtocol umbilical) throws IOException {
umbilical.done(getTaskID());
}
-
+
public abstract BSPJob getConf();
+
public abstract void setConf(BSPJob localJobConf);
- Counters getCounters() { return counters; }
-
+ Counters getCounters() {
+ return counters;
+ }
+
}
Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1296893&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sun Mar 4 23:08:07 2012
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.HamaTestCase;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
+
+public class TestBSPTaskFaults extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(HamaTestCase.class);
+
+ private static final int PORT = 54321;
+ public static final String TEST_POINT = "bsp.ft.test.point";
+
+ private volatile MinimalGroomServer groom;
+ private volatile BSPPeerProtocol umbilical;
+ private Server workerServer;
+ private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID(
+ "job_201110302255", 1), 1), 1);
+
+ public volatile static HamaConfiguration conf;
+
+ private ScheduledExecutorService testBSPTaskService;
+
+ @SuppressWarnings("unused")
+ public static class MinimalGroomServer implements BSPPeerProtocol {
+
+ private volatile int pingCount;
+ private volatile long firstPingTime;
+ private volatile long lastPingTime;
+ private boolean isShutDown = false;
+ private boolean taskComplete = false;
+ private boolean errorCondition = false;
+ private Configuration conf;
+
+ public MinimalGroomServer(Configuration config) throws IOException {
+ conf = config;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return BSPPeerProtocol.versionID;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isShutDown = true;
+
+ }
+
+ @Override
+ public Task getTask(TaskAttemptID taskid) throws IOException {
+ return new BSPTask();
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ LOG.error("Pinged");
+ ++pingCount;
+ if (pingCount == 1) {
+ firstPingTime = System.currentTimeMillis();
+ }
+ lastPingTime = System.currentTimeMillis();
+
+ return true;
+ }
+
+ @Override
+ public void done(TaskAttemptID taskid) throws IOException {
+ taskComplete = true;
+
+ }
+
+ @Override
+ public void fsError(TaskAttemptID taskId, String message)
+ throws IOException {
+ errorCondition = true;
+
+ }
+
+ @Override
+ public void fatalError(TaskAttemptID taskId, String message)
+ throws IOException {
+ errorCondition = true;
+
+ }
+
+ @Override
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ return true;
+ }
+
+ @Override
+ public int getAssignedPortNum(TaskAttemptID taskid) {
+ return 0;
+ }
+
+ public synchronized int getPingCount() {
+ return pingCount;
+ }
+
+ public void setPingCount(int pingCount) {
+ this.pingCount = pingCount;
+ if (pingCount == 0) {
+ firstPingTime = 0L;
+ lastPingTime = 0L;
+ }
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private class TestBSPTaskThreadRunner extends Thread {
+
+ BSPJob job;
+
+ TestBSPTaskThreadRunner(BSPJob jobConf) {
+ job = jobConf;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void run() {
+ BSPTask task = new BSPTask();
+ task.setConf(job);
+
+ try {
+ BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, taskid,
+ umbilical, 0, null, null, new Counters());
+ task.run(job, bspPeer, umbilical); // run the task
+ } catch (Exception e) {
+ LOG.error("Error in BSPTask execution.", e);
+ }
+
+ }
+ }
+
+ /*
+ * Test BSP exiting its own process. Hence we need a minimal process runner.
+ */
+
+ public static class TestBSPProcessRunner implements Callable<Integer> {
+ private final ScheduledExecutorService sched;
+ private final AtomicReference<ScheduledFuture<Integer>> future;
+ private Process bspTaskProcess;
+ private Thread errorLog;
+ private Thread infoLog;
+
+ TestBSPProcessRunner() {
+ sched = Executors.newScheduledThreadPool(1);
+ future = new AtomicReference<ScheduledFuture<Integer>>();
+ bspTaskProcess = null;
+ }
+
+ private void readStream(InputStream input) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ LOG.info(line);
+ }
+ }
+
+ public void startBSPProcess() {
+ this.future.set(this.sched.schedule(this, 0, SECONDS));
+ LOG.debug("Start building BSPPeer process.");
+ }
+
+ public int getBSPExitCode() {
+ try {
+ return this.future.get().get();
+ } catch (Exception e) {
+ LOG.error("Error while fetching exit status from BSPTask", e);
+ } finally {
+
+ }
+ return -1;
+ }
+
+ public void destroyProcess() {
+
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+ List<String> commands = new ArrayList<String>();
+ String workDir = new File(".").getAbsolutePath();
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+ commands.add(jvm.toString());
+
+ StringBuffer classPath = new StringBuffer();
+ // start with same classpath as parent process
+ classPath.append(System.getProperty("java.class.path"));
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(new File(workDir, "core/target/test-classes"));
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(workDir);
+
+ commands.add("-classpath");
+ commands.add(classPath.toString());
+
+ commands.add(TestBSPProcessRunner.class.getName());
+
+ LOG.info("starting process for failure case - "
+ + conf.getInt(TEST_POINT, 0));
+ commands.add("" + conf.getInt(TEST_POINT, 0));
+
+ LOG.info(commands.toString());
+
+ ProcessBuilder builder = new ProcessBuilder(commands);
+
+ try {
+ bspTaskProcess = builder.start();
+
+ // We have errorLog and infoLog to prevent block on pipe between
+ // child and parent process.
+ errorLog = new Thread() {
+ public void run() {
+ try {
+ readStream(bspTaskProcess.getErrorStream());
+ } catch (Exception e) {
+
+ }
+ }
+ };
+ errorLog.start();
+
+ infoLog = new Thread() {
+ public void run() {
+ try {
+ readStream(bspTaskProcess.getInputStream());
+ } catch (Exception e) {
+
+ }
+ }
+ };
+ infoLog.start();
+
+ int exit_code = bspTaskProcess.waitFor();
+ return exit_code;
+ } catch (Exception e) {
+ LOG.error("Error getting exit code of child process", e);
+ }
+ return -1;
+ }
+
+ public static void main(String[] args) {
+
+ HamaConfiguration hamaConf = new HamaConfiguration();
+ hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
+ hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
+ hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+ LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+
+ hamaConf.setInt("bsp.master.port", 610002);
+
+ TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
+ "job_201110102255", 1), 1), 1);
+
+ if (args.length > 0) {
+ hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
+
+ }
+
+ try {
+ BSPJob job = new BSPJob(hamaConf);
+ final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
+ BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+ new InetSocketAddress("127.0.0.1", 54321), hamaConf);
+
+ BSPTask task = new BSPTask();
+ task.setConf(job);
+
+ LOG.info("Testing failure case in process - "
+ + hamaConf.getInt(TEST_POINT, 0));
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ proto.close();
+ } catch (Exception e) {
+ // too late to log!
+ }
+ }
+ });
+
+ @SuppressWarnings("rawtypes")
+ BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, hamaConf,
+ tid, proto, 0, null, null, new Counters());
+ task.run(job, bspPeer, proto); // run the task
+
+ } catch (Exception e) {
+ LOG.error("Error in bsp child process.", e);
+ }
+
+ }
+
+ }
+
+ /*
+ * Test BSP class that has faults injected in each phase.
+ */
+ private static class FaulTestBSP extends
+ BSP<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> {
+
+ @Override
+ public void setup(
+ BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ if (peer.getConfiguration().getInt(TEST_POINT, 0) == 1) {
+ throw new RuntimeException("Error injected in setup");
+ }
+ Thread.sleep(500);
+ super.setup(peer);
+ LOG.info("Succesfully completed setup for bsp.");
+ }
+
+ @Override
+ public void cleanup(
+ BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
+ throws IOException {
+ if (peer.getConfiguration().getInt(TEST_POINT, 0) == 3) {
+ throw new RuntimeException("Error injected in cleanup");
+ }
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ LOG.error("Interrupted BSP thread.", e);
+ }
+ super.cleanup(peer);
+ LOG.info("Succesfully cleaned up after bsp.");
+ }
+
+ @Override
+ public void bsp(
+ BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ if (peer.getConfiguration().getInt(TEST_POINT, 0) == 2) {
+ throw new RuntimeException("Error injected in bsp function");
+ }
+ Thread.sleep(500);
+ LOG.info("Succesfully completed bsp.");
+ }
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ conf = new HamaConfiguration();
+
+ conf.setInt(Constants.GROOM_PING_PERIOD, 200);
+ conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
+ conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+ LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+
+ InetSocketAddress inetAddress = new InetSocketAddress(PORT);
+ groom = new MinimalGroomServer(conf);
+ workerServer = RPC.getServer(groom, inetAddress.getHostName(),
+ inetAddress.getPort(), conf);
+ workerServer.start();
+
+ LOG.info("Started RPC server");
+ conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
+
+ umbilical = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class,
+ BSPPeerProtocol.versionID, inetAddress, conf);
+ LOG.info("Started the proxy connections");
+
+ this.testBSPTaskService = Executors.newScheduledThreadPool(1);
+ }
+
+ private int getExpectedPingCounts() {
+ return ((int) (2 * (groom.lastPingTime - groom.firstPingTime) / (conf
+ .getInt(Constants.GROOM_PING_PERIOD, 5000))));
+ }
+
+ private void checkIfPingTestPassed() {
+ int expectedPingCounts = getExpectedPingCounts();
+ LOG.info("Counted " + groom.pingCount + " pings and expected "
+ + expectedPingCounts + " pings.");
+ boolean testPass = groom.getPingCount() >= expectedPingCounts;
+ assertEquals(true, testPass);
+ }
+
+ /*
+ * Test if we get the expected counts of ping.
+ */
+ public void testPing() {
+ conf.setInt(TEST_POINT, 0);
+
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ Future<Integer> future = completionService
+ .submit(new TestBSPProcessRunner());
+
+ try {
+ future.get(20000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted Exception.", e1);
+ } catch (ExecutionException e1) {
+ LOG.error("ExecutionException Exception.", e1);
+ } catch (TimeoutException e) {
+ LOG.error("TimeoutException Exception.", e);
+ }
+
+ checkIfPingTestPassed();
+ groom.setPingCount(0);
+ this.testBSPTaskService.shutdownNow();
+
+ }
+
+ /*
+ * Inject failure and different points and sense if the pings are coming or
+ * not.
+ */
+ public void testPingOnTaskSetupFailure() {
+
+ LOG.info("Testing ping failure case - 1");
+ conf.setInt(TEST_POINT, 1);
+
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ Future<Integer> future = completionService
+ .submit(new TestBSPProcessRunner());
+
+ try {
+ future.get(20000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted Exception.", e1);
+ } catch (ExecutionException e1) {
+ LOG.error("ExecutionException Exception.", e1);
+ } catch (TimeoutException e) {
+ LOG.error("TimeoutException Exception.", e);
+ }
+
+ checkIfPingTestPassed();
+ groom.setPingCount(0);
+ this.testBSPTaskService.shutdownNow();
+
+ }
+
+ public void testPingOnTaskExecFailure() {
+
+ LOG.info("Testing ping failure case - 2");
+ conf.setInt(TEST_POINT, 2);
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ Future<Integer> future = completionService
+ .submit(new TestBSPProcessRunner());
+
+ try {
+ future.get(20000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted Exception.", e1);
+ } catch (ExecutionException e1) {
+ LOG.error("ExecutionException Exception.", e1);
+ } catch (TimeoutException e) {
+ LOG.error("TimeoutException Exception.", e);
+ }
+
+ checkIfPingTestPassed();
+ groom.setPingCount(0);
+ this.testBSPTaskService.shutdownNow();
+
+ }
+
+ public void testPingOnTaskCleanupFailure() {
+
+ LOG.info("Testing ping failure case - 3");
+
+ conf.setInt(TEST_POINT, 3);
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ Future<Integer> future = completionService
+ .submit(new TestBSPProcessRunner());
+
+ try {
+ future.get(20000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted Exception.", e1);
+ } catch (ExecutionException e1) {
+ LOG.error("ExecutionException Exception.", e1);
+ } catch (TimeoutException e) {
+ LOG.error("TimeoutException Exception.", e);
+ }
+
+ checkIfPingTestPassed();
+ groom.setPingCount(0);
+ this.testBSPTaskService.shutdownNow();
+
+ }
+
+ public void testBSPTaskSelfDestroy() {
+ LOG.info("Testing self kill on lost contact.");
+
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ Future<Integer> future = completionService
+ .submit(new TestBSPProcessRunner());
+
+ try {
+ while (groom.pingCount == 0) {
+ Thread.sleep(100);
+ }
+ } catch (Exception e) {
+ LOG.error("Interrupted the timer for 1 sec.", e);
+ }
+
+ workerServer.stop();
+ umbilical = null;
+ workerServer = null;
+ Integer exitValue = -1;
+ try {
+ exitValue = future.get(20000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted Exception.", e1);
+ } catch (ExecutionException e1) {
+ LOG.error("ExecutionException Exception.", e1);
+ } catch (TimeoutException e) {
+ LOG.error("TimeoutException Exception.", e);
+ }
+
+ assertEquals(69, exitValue.intValue());
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ super.tearDown();
+ if (groom != null)
+ groom.setPingCount(0);
+ if (umbilical != null) {
+ umbilical.close();
+ Thread.sleep(2000);
+ }
+ if (workerServer != null)
+ workerServer.stop();
+ testBSPTaskService.shutdownNow();
+ Thread.sleep(2000);
+ }
+
+}
Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Mar 4 23:08:07 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hama.bsp;
+import java.net.InetSocketAddress;
+
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
@@ -25,8 +27,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
import org.apache.hama.bsp.messages.ByteMessage;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
public class TestCheckpoint extends TestCase {
@@ -34,36 +43,123 @@ public class TestCheckpoint extends Test
static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public void testCheckpoint() throws Exception {
Configuration config = new HamaConfiguration();
FileSystem dfs = FileSystem.get(config);
BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
+ bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
+ new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
+ "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
assertNotNull("BSPPeerImpl should not be null.", bspTask);
- if(dfs.mkdirs(new Path("checkpoint"))) {
- if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
- if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")));
+ if (dfs.mkdirs(new Path("checkpoint"))) {
+ if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
+ if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
+ ;
}
}
- assertTrue("Make sure directory is created.",
- dfs.exists(new Path(checkpointedDir)));
+ assertTrue("Make sure directory is created.",
+ dfs.exists(new Path(checkpointedDir)));
byte[] tmpData = "data".getBytes();
BSPMessageBundle bundle = new BSPMessageBundle();
bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
assertNotNull("Message bundle can not be null.", bundle);
assertNotNull("Configuration should not be null.", config);
- bspTask.checkpoint(checkpointedDir+"/attempt_201110302255_0001_000000_0",
- bundle);
- FSDataInputStream in = dfs.open(new Path(checkpointedDir+
- "/attempt_201110302255_0001_000000_0"));
+ bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
+ bundle);
+ FSDataInputStream in = dfs.open(new Path(checkpointedDir
+ + "/attempt_201110302255_0001_000000_0"));
BSPMessageBundle bundleRead = new BSPMessageBundle();
bundleRead.readFields(in);
in.close();
- ByteMessage byteMsg = (ByteMessage)(bundleRead.getMessages()).get(0);
+ ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
String content = new String(byteMsg.getData());
- LOG.info("Saved checkpointed content is "+content);
- assertTrue("Message content should be the same.", "data".equals(content));
+ LOG.info("Saved checkpointed content is " + content);
+ assertTrue("Message content should be the same.", "data".equals(content));
dfs.delete(new Path("checkpoint"), true);
}
+
+ public void testCheckpointInterval() throws Exception {
+
+ HamaConfiguration conf = new HamaConfiguration();
+
+ conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+ LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+
+ conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
+
+ int port = 54321;
+ InetSocketAddress inetAddress = new InetSocketAddress(port);
+ MinimalGroomServer groom = new MinimalGroomServer(conf);
+ Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
+ inetAddress.getPort(), conf);
+ workerServer.start();
+
+ LOG.info("Started RPC server");
+ conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
+
+ BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+ BSPPeerProtocol.class, BSPPeerProtocol.versionID, inetAddress, conf);
+ LOG.info("Started the proxy connections");
+
+ TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
+ "job_201110102255", 1), 1), 1);
+
+ try {
+ BSPJob job = new BSPJob(conf);
+ final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
+ BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+ new InetSocketAddress("127.0.0.1", port), conf);
+
+ BSPTask task = new BSPTask();
+ task.setConf(job);
+
+ @SuppressWarnings("rawtypes")
+ BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
+ proto, 0, null, null, new Counters());
+
+ bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
+ TaskStatus.State.RUNNING, "running", "127.0.0.1",
+ TaskStatus.Phase.STARTING, new Counters()));
+
+ assertEquals(bspPeer.isReadyToCheckpoint(), false);
+
+ conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
+
+ bspPeer.sync();
+
+ LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+ + bspPeer.getSuperstepCount());
+ assertEquals(bspPeer.isReadyToCheckpoint(), false);
+ bspPeer.sync();
+ LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+ + bspPeer.getSuperstepCount());
+ assertEquals(bspPeer.isReadyToCheckpoint(), false);
+ bspPeer.sync();
+ LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+ + bspPeer.getSuperstepCount());
+ assertEquals(bspPeer.isReadyToCheckpoint(), true);
+
+ job.setCheckPointInterval(5);
+ bspPeer.sync();
+ LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+ + bspPeer.getSuperstepCount());
+ assertEquals(bspPeer.isReadyToCheckpoint(), false);
+ bspPeer.sync();
+ LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+ + bspPeer.getSuperstepCount());
+ assertEquals(bspPeer.isReadyToCheckpoint(), true);
+
+ } catch (Exception e) {
+ LOG.error("Error testing BSPPeer.", e);
+ } finally {
+ umbilical.close();
+ Thread.sleep(2000);
+ workerServer.stop();
+ Thread.sleep(2000);
+ }
+
+ }
}
Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Sun Mar 4 23:08:07 2012
@@ -40,7 +40,7 @@ public class BSPRunner {
private TaskAttemptID id;
private BSPPeerImpl<?, ?, ?, ?, ? extends Writable> peer;
private Counters counters = new Counters();
-
+
@SuppressWarnings("rawtypes")
Class<? extends BSP> bspClass;
@@ -87,14 +87,34 @@ public class BSPRunner {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void startComputation() throws Exception {
BSP bspInstance = ReflectionUtils.newInstance(bspClass, conf);
+ // Throw the first exception and log the remaining.
+ Exception firstException = null;
try {
bspInstance.setup(peer);
bspInstance.bsp(peer);
} catch (Exception e) {
- throw e;
+ LOG.error("Error occured while running bsp function.", e);
+ firstException = e;
} finally {
- bspInstance.cleanup(peer);
- peer.close();
+ try {
+ bspInstance.cleanup(peer);
+ } catch (Exception e) {
+ LOG.error("Cleaning up after bsp function.", e);
+ if (firstException == null) {
+ firstException = e;
+ }
+ } finally {
+ try {
+ peer.close();
+ } catch (Exception e) {
+ LOG.error("Error closing the bsp peer", e);
+ if (firstException == null)
+ firstException = e;
+ } finally {
+ if (firstException != null)
+ throw firstException;
+ }
+ }
}
}