You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Edward J. Yoon" <ed...@apache.org> on 2012/11/21 12:30:32 UTC
Re: svn commit: r1411991 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Hi,
I saw most failures are caused by too-sensitive monitoring. Please
check whether any problem can be occurred with this change.
+ && (((tip.lastPingedTimestamp == 0 && ((currentTime -
tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp >
0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod))))
{
On Wed, Nov 21, 2012 at 3:04 PM, <ed...@apache.org> wrote:
> Author: edwardyoon
> Date: Wed Nov 21 06:04:26 2012
> New Revision: 1411991
>
> URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
> Log:
> Monitoring of tasks is too sensitive.
>
> Modified:
> hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
>
> Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
> ==============================================================================
> --- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
> +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Wed Nov 21 06:04:26 2012
> @@ -207,10 +207,9 @@ public class GroomServer implements Runn
> try {
> startRecoveryTask(recoverAction);
> } catch (IOException e) {
> - throw new DirectiveException(
> - new StringBuffer().append("Error starting the recovery task")
> - .append(t.getTaskID()).toString(),
> - e);
> + throw new DirectiveException(new StringBuffer()
> + .append("Error starting the recovery task")
> + .append(t.getTaskID()).toString(), e);
> }
> }
> }
> @@ -617,17 +616,17 @@ public class GroomServer implements Runn
> }
>
> Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
> - while(taskIterator.hasNext()){
> + while (taskIterator.hasNext()) {
> TaskAttemptID taskAttId = taskIterator.next();
> - if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
> - if(LOG.isDebugEnabled()){
> + if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
> + if (LOG.isDebugEnabled()) {
> LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
> }
> taskIterator.remove();
> runningTasks.remove(taskAttId);
> }
> }
> -
> +
> tasks.put(t.getTaskID(), tip);
> runningTasks.put(t.getTaskID(), tip);
> }
> @@ -637,14 +636,14 @@ public class GroomServer implements Runn
> String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
> .stringifyException(e));
> LOG.warn(msg);
> -
> +
> try {
> tip.killAndCleanup(true);
> } catch (IOException ie2) {
> LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
> + StringUtils.stringifyException(ie2));
> }
> - throw new IOException("Errro localizing the job.",e);
> + throw new IOException("Errro localizing the job.", e);
> }
> }
>
> @@ -807,20 +806,17 @@ public class GroomServer implements Runn
> + " monitorPeriod = "
> + monitorPeriod
> + " check = "
> - + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
> - (((tip.lastPingedTimestamp == 0 &&
> - ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
> - ((tip.lastPingedTimestamp > 0) &&
> - (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
> + + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
>
> // Task is out of contact if it has not pinged since more than
> // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
> // to get started.
> +
> + // TODO Please refactor this conditions
> + // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
> +
> if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> - && (((tip.lastPingedTimestamp == 0
> - && ((currentTime - tip.startTime) > 10 * monitorPeriod))
> - || ((tip.lastPingedTimestamp > 0)
> - && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
> + && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
>
> LOG.info("adding purge task: " + tip.getTask().getTaskID());
>
> @@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
>
> // runner could be null if task-cleanup attempt is not localized yet
> if (runner != null) {
> - if(LOG.isDebugEnabled()){
> + if (LOG.isDebugEnabled()) {
> LOG.debug("Killing process for " + this.task.getTaskID());
> }
> runner.killBsp();
> @@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
>
> public synchronized void killRunner() throws IOException {
> if (runner != null) {
> - if(LOG.isDebugEnabled()){
> + if (LOG.isDebugEnabled()) {
> LOG.debug("Killing process for " + this.task.getTaskID());
> }
> runner.killBsp();
> @@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
> defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
> }
> defaultConf.setInt(Constants.PEER_PORT, peerPort);
> -
> +
> long superstep = Long.parseLong(args[4]);
> TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
> LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
>
> -
> try {
> // use job-specified working directory
> FileSystem.get(job.getConfiguration()).setWorkingDirectory(
>
>
--
Best Regards, Edward J. Yoon
@eddieyoon
Re: svn commit: r1411991 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Posted by Suraj Menon <me...@gmail.com>.
Hi, It shouldn't matter, just that we will be a little late to find out a
dead task. With this, you have kept the pinging rate same, but have given
the task more leeway to miss his pings. Are you facing a situation where
the ping is not happening as per the timer? What is the difference between
increasing monitor period by 6 instead of increasing the ping interval by 6?
On Wed, Nov 21, 2012 at 6:30 AM, Edward J. Yoon <ed...@apache.org>wrote:
> Hi,
>
> I saw most failures are caused by too-sensitive monitoring. Please
> check whether any problem can be occurred with this change.
>
> + && (((tip.lastPingedTimestamp == 0 && ((currentTime -
> tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp >
> 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod))))
> {
>
>
> On Wed, Nov 21, 2012 at 3:04 PM, <ed...@apache.org> wrote:
> > Author: edwardyoon
> > Date: Wed Nov 21 06:04:26 2012
> > New Revision: 1411991
> >
> > URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
> > Log:
> > Monitoring of tasks is too sensitive.
> >
> > Modified:
> > hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> >
> > Modified:
> hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> > URL:
> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
> >
> ==============================================================================
> > --- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> (original)
> > +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> Wed Nov 21 06:04:26 2012
> > @@ -207,10 +207,9 @@ public class GroomServer implements Runn
> > try {
> > startRecoveryTask(recoverAction);
> > } catch (IOException e) {
> > - throw new DirectiveException(
> > - new StringBuffer().append("Error starting the
> recovery task")
> > - .append(t.getTaskID()).toString(),
> > - e);
> > + throw new DirectiveException(new StringBuffer()
> > + .append("Error starting the recovery task")
> > + .append(t.getTaskID()).toString(), e);
> > }
> > }
> > }
> > @@ -617,17 +616,17 @@ public class GroomServer implements Runn
> > }
> >
> > Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
> > - while(taskIterator.hasNext()){
> > + while (taskIterator.hasNext()) {
> > TaskAttemptID taskAttId = taskIterator.next();
> > - if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
> > - if(LOG.isDebugEnabled()){
> > + if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
> > + if (LOG.isDebugEnabled()) {
> > LOG.debug("Removing tasks with id = " +
> t.getTaskID().getTaskID());
> > }
> > taskIterator.remove();
> > runningTasks.remove(taskAttId);
> > }
> > }
> > -
> > +
> > tasks.put(t.getTaskID(), tip);
> > runningTasks.put(t.getTaskID(), tip);
> > }
> > @@ -637,14 +636,14 @@ public class GroomServer implements Runn
> > String msg = ("Error initializing " + tip.getTask().getTaskID() +
> ":\n" + StringUtils
> > .stringifyException(e));
> > LOG.warn(msg);
> > -
> > +
> > try {
> > tip.killAndCleanup(true);
> > } catch (IOException ie2) {
> > LOG.info("Error cleaning up " + tip.getTask().getTaskID() +
> ":\n"
> > + StringUtils.stringifyException(ie2));
> > }
> > - throw new IOException("Errro localizing the job.",e);
> > + throw new IOException("Errro localizing the job.", e);
> > }
> > }
> >
> > @@ -807,20 +806,17 @@ public class GroomServer implements Runn
> > + " monitorPeriod = "
> > + monitorPeriod
> > + " check = "
> > - +
> (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
> > - (((tip.lastPingedTimestamp == 0 &&
> > - ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
> > - ((tip.lastPingedTimestamp > 0) &&
> > - (currentTime - tip.lastPingedTimestamp) >
> monitorPeriod)))));
> > + +
> (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
> (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 *
> monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime -
> tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
> >
> > // Task is out of contact if it has not pinged since more than
> > // monitorPeriod. A task is given a leeway of 10 times
> monitorPeriod
> > // to get started.
> > +
> > + // TODO Please refactor this conditions
> > + // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 *
> monitorPeriod
> > +
> > if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> > - && (((tip.lastPingedTimestamp == 0
> > - && ((currentTime - tip.startTime) > 10 * monitorPeriod))
> > - || ((tip.lastPingedTimestamp > 0)
> > - && (currentTime - tip.lastPingedTimestamp) >
> monitorPeriod)))) {
> > + && (((tip.lastPingedTimestamp == 0 && ((currentTime -
> tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) &&
> (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
> >
> > LOG.info("adding purge task: " + tip.getTask().getTaskID());
> >
> > @@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
> >
> > // runner could be null if task-cleanup attempt is not localized
> yet
> > if (runner != null) {
> > - if(LOG.isDebugEnabled()){
> > + if (LOG.isDebugEnabled()) {
> > LOG.debug("Killing process for " + this.task.getTaskID());
> > }
> > runner.killBsp();
> > @@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
> >
> > public synchronized void killRunner() throws IOException {
> > if (runner != null) {
> > - if(LOG.isDebugEnabled()){
> > + if (LOG.isDebugEnabled()) {
> > LOG.debug("Killing process for " + this.task.getTaskID());
> > }
> > runner.killBsp();
> > @@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
> > defaultConf.setInt("bsp.checkpoint.port",
> Integer.parseInt(args[4]));
> > }
> > defaultConf.setInt(Constants.PEER_PORT, peerPort);
> > -
> > +
> > long superstep = Long.parseLong(args[4]);
> > TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
> > LOG.debug("Starting peer for sstep " + superstep + " state = " +
> state);
> >
> > -
> > try {
> > // use job-specified working directory
> > FileSystem.get(job.getConfiguration()).setWorkingDirectory(
> >
> >
>
>
>
> --
> Best Regards, Edward J. Yoon
> @eddieyoon
>