You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Edward J. Yoon" <ed...@apache.org> on 2012/11/21 12:30:32 UTC

Re: svn commit: r1411991 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java

Hi,

I saw most failures are caused by too-sensitive monitoring. Please
check whether any problem can be occurred with this change.

+          && (((tip.lastPingedTimestamp == 0 && ((currentTime -
tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp >
0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod))))
{


On Wed, Nov 21, 2012 at 3:04 PM,  <ed...@apache.org> wrote:
> Author: edwardyoon
> Date: Wed Nov 21 06:04:26 2012
> New Revision: 1411991
>
> URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
> Log:
> Monitoring of tasks is too sensitive.
>
> Modified:
>     hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
>
> Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
> ==============================================================================
> --- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
> +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Wed Nov 21 06:04:26 2012
> @@ -207,10 +207,9 @@ public class GroomServer implements Runn
>              try {
>                startRecoveryTask(recoverAction);
>              } catch (IOException e) {
> -              throw new DirectiveException(
> -                  new StringBuffer().append("Error starting the recovery task")
> -                  .append(t.getTaskID()).toString(),
> -                  e);
> +              throw new DirectiveException(new StringBuffer()
> +                  .append("Error starting the recovery task")
> +                  .append(t.getTaskID()).toString(), e);
>              }
>            }
>          }
> @@ -617,17 +616,17 @@ public class GroomServer implements Runn
>        }
>
>        Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
> -      while(taskIterator.hasNext()){
> +      while (taskIterator.hasNext()) {
>          TaskAttemptID taskAttId = taskIterator.next();
> -        if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
> -          if(LOG.isDebugEnabled()){
> +        if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
> +          if (LOG.isDebugEnabled()) {
>              LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
>            }
>            taskIterator.remove();
>            runningTasks.remove(taskAttId);
>          }
>        }
> -
> +
>        tasks.put(t.getTaskID(), tip);
>        runningTasks.put(t.getTaskID(), tip);
>      }
> @@ -637,14 +636,14 @@ public class GroomServer implements Runn
>        String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
>            .stringifyException(e));
>        LOG.warn(msg);
> -
> +
>        try {
>          tip.killAndCleanup(true);
>        } catch (IOException ie2) {
>          LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
>              + StringUtils.stringifyException(ie2));
>        }
> -      throw new IOException("Errro localizing the job.",e);
> +      throw new IOException("Errro localizing the job.", e);
>      }
>    }
>
> @@ -807,20 +806,17 @@ public class GroomServer implements Runn
>              + " monitorPeriod = "
>              + monitorPeriod
>              + " check = "
> -            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
> -                (((tip.lastPingedTimestamp == 0 &&
> -                ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
> -                ((tip.lastPingedTimestamp > 0) &&
> -                    (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
> +            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
>
>        // Task is out of contact if it has not pinged since more than
>        // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
>        // to get started.
> +
> +      // TODO Please refactor this conditions
> +      // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
> +
>        if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> -          && (((tip.lastPingedTimestamp == 0
> -          && ((currentTime - tip.startTime) > 10 * monitorPeriod))
> -            || ((tip.lastPingedTimestamp > 0)
> -                && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
> +          && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
>
>          LOG.info("adding purge task: " + tip.getTask().getTaskID());
>
> @@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
>
>        // runner could be null if task-cleanup attempt is not localized yet
>        if (runner != null) {
> -        if(LOG.isDebugEnabled()){
> +        if (LOG.isDebugEnabled()) {
>            LOG.debug("Killing process for " + this.task.getTaskID());
>          }
>          runner.killBsp();
> @@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
>
>      public synchronized void killRunner() throws IOException {
>        if (runner != null) {
> -        if(LOG.isDebugEnabled()){
> +        if (LOG.isDebugEnabled()) {
>            LOG.debug("Killing process for " + this.task.getTaskID());
>          }
>          runner.killBsp();
> @@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
>          defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
>        }
>        defaultConf.setInt(Constants.PEER_PORT, peerPort);
> -
> +
>        long superstep = Long.parseLong(args[4]);
>        TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
>        LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
>
> -
>        try {
>          // use job-specified working directory
>          FileSystem.get(job.getConfiguration()).setWorkingDirectory(
>
>



-- 
Best Regards, Edward J. Yoon
@eddieyoon

Re: svn commit: r1411991 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java

Posted by Suraj Menon <me...@gmail.com>.
Hi, It shouldn't matter, just that we will be a little late to find out a
dead task. With this, you have kept the pinging rate same, but have given
the task more leeway to miss his pings. Are you facing a situation where
the ping is not happening as per the timer? What is the difference between
increasing monitor period by 6 instead of increasing the ping interval by 6?

On Wed, Nov 21, 2012 at 6:30 AM, Edward J. Yoon <ed...@apache.org>wrote:

> Hi,
>
> I saw most failures are caused by too-sensitive monitoring. Please
> check whether any problem can be occurred with this change.
>
> +          && (((tip.lastPingedTimestamp == 0 && ((currentTime -
> tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp >
> 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod))))
> {
>
>
> On Wed, Nov 21, 2012 at 3:04 PM,  <ed...@apache.org> wrote:
> > Author: edwardyoon
> > Date: Wed Nov 21 06:04:26 2012
> > New Revision: 1411991
> >
> > URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
> > Log:
> > Monitoring of tasks is too sensitive.
> >
> > Modified:
> >     hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> >
> > Modified:
> hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> > URL:
> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
> >
> ==============================================================================
> > --- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> (original)
> > +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
> Wed Nov 21 06:04:26 2012
> > @@ -207,10 +207,9 @@ public class GroomServer implements Runn
> >              try {
> >                startRecoveryTask(recoverAction);
> >              } catch (IOException e) {
> > -              throw new DirectiveException(
> > -                  new StringBuffer().append("Error starting the
> recovery task")
> > -                  .append(t.getTaskID()).toString(),
> > -                  e);
> > +              throw new DirectiveException(new StringBuffer()
> > +                  .append("Error starting the recovery task")
> > +                  .append(t.getTaskID()).toString(), e);
> >              }
> >            }
> >          }
> > @@ -617,17 +616,17 @@ public class GroomServer implements Runn
> >        }
> >
> >        Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
> > -      while(taskIterator.hasNext()){
> > +      while (taskIterator.hasNext()) {
> >          TaskAttemptID taskAttId = taskIterator.next();
> > -        if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
> > -          if(LOG.isDebugEnabled()){
> > +        if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
> > +          if (LOG.isDebugEnabled()) {
> >              LOG.debug("Removing tasks with id = " +
> t.getTaskID().getTaskID());
> >            }
> >            taskIterator.remove();
> >            runningTasks.remove(taskAttId);
> >          }
> >        }
> > -
> > +
> >        tasks.put(t.getTaskID(), tip);
> >        runningTasks.put(t.getTaskID(), tip);
> >      }
> > @@ -637,14 +636,14 @@ public class GroomServer implements Runn
> >        String msg = ("Error initializing " + tip.getTask().getTaskID() +
> ":\n" + StringUtils
> >            .stringifyException(e));
> >        LOG.warn(msg);
> > -
> > +
> >        try {
> >          tip.killAndCleanup(true);
> >        } catch (IOException ie2) {
> >          LOG.info("Error cleaning up " + tip.getTask().getTaskID() +
> ":\n"
> >              + StringUtils.stringifyException(ie2));
> >        }
> > -      throw new IOException("Errro localizing the job.",e);
> > +      throw new IOException("Errro localizing the job.", e);
> >      }
> >    }
> >
> > @@ -807,20 +806,17 @@ public class GroomServer implements Runn
> >              + " monitorPeriod = "
> >              + monitorPeriod
> >              + " check = "
> > -            +
> (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
> > -                (((tip.lastPingedTimestamp == 0 &&
> > -                ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
> > -                ((tip.lastPingedTimestamp > 0) &&
> > -                    (currentTime - tip.lastPingedTimestamp) >
> monitorPeriod)))));
> > +            +
> (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
> (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 *
> monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime -
> tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
> >
> >        // Task is out of contact if it has not pinged since more than
> >        // monitorPeriod. A task is given a leeway of 10 times
> monitorPeriod
> >        // to get started.
> > +
> > +      // TODO Please refactor this conditions
> > +      // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 *
> monitorPeriod
> > +
> >        if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
> > -          && (((tip.lastPingedTimestamp == 0
> > -          && ((currentTime - tip.startTime) > 10 * monitorPeriod))
> > -            || ((tip.lastPingedTimestamp > 0)
> > -                && (currentTime - tip.lastPingedTimestamp) >
> monitorPeriod)))) {
> > +          && (((tip.lastPingedTimestamp == 0 && ((currentTime -
> tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) &&
> (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
> >
> >          LOG.info("adding purge task: " + tip.getTask().getTaskID());
> >
> > @@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
> >
> >        // runner could be null if task-cleanup attempt is not localized
> yet
> >        if (runner != null) {
> > -        if(LOG.isDebugEnabled()){
> > +        if (LOG.isDebugEnabled()) {
> >            LOG.debug("Killing process for " + this.task.getTaskID());
> >          }
> >          runner.killBsp();
> > @@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
> >
> >      public synchronized void killRunner() throws IOException {
> >        if (runner != null) {
> > -        if(LOG.isDebugEnabled()){
> > +        if (LOG.isDebugEnabled()) {
> >            LOG.debug("Killing process for " + this.task.getTaskID());
> >          }
> >          runner.killBsp();
> > @@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
> >          defaultConf.setInt("bsp.checkpoint.port",
> Integer.parseInt(args[4]));
> >        }
> >        defaultConf.setInt(Constants.PEER_PORT, peerPort);
> > -
> > +
> >        long superstep = Long.parseLong(args[4]);
> >        TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
> >        LOG.debug("Starting peer for sstep " + superstep + " state = " +
> state);
> >
> > -
> >        try {
> >          // use job-specified working directory
> >          FileSystem.get(job.getConfiguration()).setWorkingDirectory(
> >
> >
>
>
>
> --
> Best Regards, Edward J. Yoon
> @eddieyoon
>