You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@giraph.apache.org by Nick West <ni...@benchmarksolutions.com> on 2012/08/20 22:26:05 UTC

Adding MasterCompute object causes "failed to report status" errors

Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]






Re: Adding MasterCompute object causes "failed to report status" errors

Posted by Nick West <ni...@benchmarksolutions.com>.
Thanks for the reply Maja.  I must have been referring to the old API/JavaDocs.  Some of the new ones weren't available online yesterday (404 not found) nor in the snapshot I checked out.

In any event, my application is running as expected now.  Thank you all for the help!

-Nick

On Aug 22, 2012, at 6:02 AM, Maja Kabiljo wrote:

Hi Nick,

There was some discussion about whether master compute should be executed before or after the vertex computes, but we decided to keep it as it is. In any case you can do all the same things, just in a slightly different way. How was it before I don't know, but documentation of MasterCompute is correct:
"This class will be instantiated on the master node and will run every superstep before the workers do."

On the other note, I do see there is some problem with master halting computation in super step 0, I'll investigate that, thanks!

Let me know if you have any other questions,
Maja

From: Nick West <ni...@benchmarksolutions.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 10:43 PM
To: "<us...@giraph.apache.org>>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

After some more experimenting, I was able to get it to work.  I figured I'd share my results.

In my setup, I have a BooleanAndAggregator, which by default has a value of true.  I want to stop when all vertices vote to stop (even if there are new messages sent).  So, in a standard superstep each vertex will do some computation, vote to stop (or not), and then AND its vote to the aggregator.  When the MasterCompute detects that all vertices have voted to stop (the aggregator has a true value) it calls halt computation.

What it seems like is happening is the following:

1)  MasterCompute's compute method gets called at the beginning of each superstep - not at the end of it as (I think) is stated in the documentation and what it used to do.
2)  In my case, at start of superstep 0, the BooleanAndAggregator has an initial value of true (I couldn't get this to change by calling the set value method in the initialize method - I think this is because the aggregator is reinitialized every superstep) so when the MasterCompute's compute method is called at the beginning of superstep zero, it immediately halts computation.
3)  This immediate shutdown seems to cause a race condition with the other task (the one assigned to the vertex computations) which doesn't know it should quit, and the zookeeper task seems to wait for it to stop.

To address this, I currently just have the MasterCompute's compute method do nothing in superstep 0.  However, I think the behavior described above is not correct.  If there is some other cause for the behavior I'm seeing please let me know.

Thanks,
Nick


On Aug 21, 2012, at 2:28 PM, Nick West wrote:

Thank you both for the replies.

I have checked out the most recent version of the code and am still having the same problem.  (I also tried with the previous aggregator code with no luck either.)  Looking in the logs it appears that It gets through superstep -1, and the block seems to occur on superstep 0.  This is the end of the log file for the non-master task:

2012-08-21 14:14:15,620 INFO org.apache.giraph.graph.BspServiceWorker: finishSuperstep: Completed superstep -1 with global stats (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=true)
2012-08-21 14:14:15,621 INFO org.apache.giraph.comm.BasicRPCCommunications: prepareSuperstep: Superstep 0 totalMem = 81.0625M, maxMem = 197.5M, freeMem = 68.69868M
2012-08-21 14:14:15,627 WARN org.apache.giraph.graph.BspService: process: Unknown and unprocessed event (path=/_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir, type=NodeChildrenChanged, state=SyncConnected)
2012-08-21 14:14:15,629 INFO org.apache.giraph.graph.BspServiceWorker: registerHealth: Created my health node for attempt=0, superstep=0 with /_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir/0/_workerHealthyDir/nwest-mac.benchmark.local_1 and workerInfo= Worker(hostname=nwest-mac.benchmark.local, MRpartition=1, port=30001)
2012-08-21 14:14:15,657 INFO org.apache.giraph.graph.BspServiceWorker: processEvent: Job state changed, checking to see if it needs to restart
2012-08-21 14:14:15,658 INFO org.apache.giraph.graph.BspService: getJobState: Job state already exists (/_hadoopBsp/job_201208211408_0002/_masterJobState)


and these are the last lines of the master log:

012-08-21 14:14:15,617 INFO org.apache.giraph.graph.BspServiceMaster: aggregateWorkerStats: Aggregation found (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=false) on superstep = -1
2012-08-21 14:14:15,652 INFO org.apache.giraph.graph.MasterThread: masterThread: Coordination of superstep -1 took 0.945 seconds ended with state ALL_SUPERSTEPS_DONE and is now on superstep 0
2012-08-21 14:14:15,654 INFO org.apache.giraph.graph.BspServiceMaster: setJobState: {"_stateKey":"FINISHED","_applicationAttemptKey":-1,"_superstepKey":-1} on superstep 0
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanup: Notifying master its okay to cleanup with /_hadoopBsp/job_201208211408_0002/_cleanedUpDir/0_master
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Node /_hadoopBsp/job_201208211408_0002/_cleanedUpDir already exists, no need to create.
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Got 1 of 2 desired children from /_hadoopBsp/job_201208211408_0002/_cleanedUpDir
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanedUpZooKeeper: Waiting for the children of /_hadoopBsp/job_201208211408_0002/_cleanedUpDir to change since only got 1 nodes.

Again, the only addition is the use of the aggregator, otherwise the code runs perfectly fine.  Thoughts?

Thanks,
Nick

On Aug 21, 2012, at 4:06 AM, Maja Kabiljo wrote:

Hi Nick,

There were some very recent changes in the way aggregators are used. If your code below compiles it means that you are using the version before the changes, and looking at the example after them. The code which Kaushik attached shows how you should do it if you are not using the newest Giraph code.

If you want to use newest code, here is how aggregators work there:

  *   You have to register aggregators only on master, just like you are doing now. You can use registerAggregator or registerPersistentAggregator, depending on whether or not you want it to be reset on every super step.
  *   You don't have getAggregator method anymore. During vertex computation you can only call aggregate(name, value), and in master.compute you have setAggregatedValue(name, value).
  *   There is no more registerAggregator and useAggregator on workers, therefore you don't have to use WorkerContext in order to use aggregators.

Hope this helps,
Maja

From: KAUSHIK SARKAR <co...@gmail.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 7:39 AM
To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that I am using. It is approx. 1 month old. It seems that the WorkerContext class is different from the current svn version. I am not aware if this change made in the current version to reflect some change in the API behaviour, but I followed the WorkerContext definition from the attached file and my code worked.)

You will see that you need to register the aggregator twice - in the initialize() method of MasterCompute (which you have done) and in the preApplication() method of the WorkerContext. Moreover in the preSuperstep() method of the WorkerContext, you need to call useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous value to it and stores the result as the new vertex value; the result is also stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on the 10th superstep

The worker context seems to serve no other purpose but to hold the value of FINAL_SUM (not related to the aggregator) at each iteration.  It also seems like the aggregator is registered in the initialize method of the MasterCompute object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override the preApplication() method in WorkerContext to register the aggregator and then override the preSuperstep() method to to tell the workers to use the aggregator (the useAggregator() method). Check the MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>






<image001.png>


Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>







Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>





<image001.png>


Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]






Re: Adding MasterCompute object causes "failed to report status" errors

Posted by Maja Kabiljo <ma...@fb.com>.
Hi Nick,

There was some discussion about whether master compute should be executed before or after the vertex computes, but we decided to keep it as it is. In any case you can do all the same things, just in a slightly different way. How was it before I don't know, but documentation of MasterCompute is correct:
"This class will be instantiated on the master node and will run every superstep before the workers do."

On the other note, I do see there is some problem with master halting computation in super step 0, I'll investigate that, thanks!

Let me know if you have any other questions,
Maja

From: Nick West <ni...@benchmarksolutions.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 10:43 PM
To: "<us...@giraph.apache.org>>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

After some more experimenting, I was able to get it to work.  I figured I'd share my results.

In my setup, I have a BooleanAndAggregator, which by default has a value of true.  I want to stop when all vertices vote to stop (even if there are new messages sent).  So, in a standard superstep each vertex will do some computation, vote to stop (or not), and then AND its vote to the aggregator.  When the MasterCompute detects that all vertices have voted to stop (the aggregator has a true value) it calls halt computation.

What it seems like is happening is the following:

1)  MasterCompute's compute method gets called at the beginning of each superstep - not at the end of it as (I think) is stated in the documentation and what it used to do.
2)  In my case, at start of superstep 0, the BooleanAndAggregator has an initial value of true (I couldn't get this to change by calling the set value method in the initialize method - I think this is because the aggregator is reinitialized every superstep) so when the MasterCompute's compute method is called at the beginning of superstep zero, it immediately halts computation.
3)  This immediate shutdown seems to cause a race condition with the other task (the one assigned to the vertex computations) which doesn't know it should quit, and the zookeeper task seems to wait for it to stop.

To address this, I currently just have the MasterCompute's compute method do nothing in superstep 0.  However, I think the behavior described above is not correct.  If there is some other cause for the behavior I'm seeing please let me know.

Thanks,
Nick


On Aug 21, 2012, at 2:28 PM, Nick West wrote:

Thank you both for the replies.

I have checked out the most recent version of the code and am still having the same problem.  (I also tried with the previous aggregator code with no luck either.)  Looking in the logs it appears that It gets through superstep -1, and the block seems to occur on superstep 0.  This is the end of the log file for the non-master task:

2012-08-21 14:14:15,620 INFO org.apache.giraph.graph.BspServiceWorker: finishSuperstep: Completed superstep -1 with global stats (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=true)
2012-08-21 14:14:15,621 INFO org.apache.giraph.comm.BasicRPCCommunications: prepareSuperstep: Superstep 0 totalMem = 81.0625M, maxMem = 197.5M, freeMem = 68.69868M
2012-08-21 14:14:15,627 WARN org.apache.giraph.graph.BspService: process: Unknown and unprocessed event (path=/_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir, type=NodeChildrenChanged, state=SyncConnected)
2012-08-21 14:14:15,629 INFO org.apache.giraph.graph.BspServiceWorker: registerHealth: Created my health node for attempt=0, superstep=0 with /_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir/0/_workerHealthyDir/nwest-mac.benchmark.local_1 and workerInfo= Worker(hostname=nwest-mac.benchmark.local, MRpartition=1, port=30001)
2012-08-21 14:14:15,657 INFO org.apache.giraph.graph.BspServiceWorker: processEvent: Job state changed, checking to see if it needs to restart
2012-08-21 14:14:15,658 INFO org.apache.giraph.graph.BspService: getJobState: Job state already exists (/_hadoopBsp/job_201208211408_0002/_masterJobState)


and these are the last lines of the master log:

012-08-21 14:14:15,617 INFO org.apache.giraph.graph.BspServiceMaster: aggregateWorkerStats: Aggregation found (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=false) on superstep = -1
2012-08-21 14:14:15,652 INFO org.apache.giraph.graph.MasterThread: masterThread: Coordination of superstep -1 took 0.945 seconds ended with state ALL_SUPERSTEPS_DONE and is now on superstep 0
2012-08-21 14:14:15,654 INFO org.apache.giraph.graph.BspServiceMaster: setJobState: {"_stateKey":"FINISHED","_applicationAttemptKey":-1,"_superstepKey":-1} on superstep 0
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanup: Notifying master its okay to cleanup with /_hadoopBsp/job_201208211408_0002/_cleanedUpDir/0_master
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Node /_hadoopBsp/job_201208211408_0002/_cleanedUpDir already exists, no need to create.
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Got 1 of 2 desired children from /_hadoopBsp/job_201208211408_0002/_cleanedUpDir
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanedUpZooKeeper: Waiting for the children of /_hadoopBsp/job_201208211408_0002/_cleanedUpDir to change since only got 1 nodes.

Again, the only addition is the use of the aggregator, otherwise the code runs perfectly fine.  Thoughts?

Thanks,
Nick

On Aug 21, 2012, at 4:06 AM, Maja Kabiljo wrote:

Hi Nick,

There were some very recent changes in the way aggregators are used. If your code below compiles it means that you are using the version before the changes, and looking at the example after them. The code which Kaushik attached shows how you should do it if you are not using the newest Giraph code.

If you want to use newest code, here is how aggregators work there:

  *   You have to register aggregators only on master, just like you are doing now. You can use registerAggregator or registerPersistentAggregator, depending on whether or not you want it to be reset on every super step.
  *   You don't have getAggregator method anymore. During vertex computation you can only call aggregate(name, value), and in master.compute you have setAggregatedValue(name, value).
  *   There is no more registerAggregator and useAggregator on workers, therefore you don't have to use WorkerContext in order to use aggregators.

Hope this helps,
Maja

From: KAUSHIK SARKAR <co...@gmail.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 7:39 AM
To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that I am using. It is approx. 1 month old. It seems that the WorkerContext class is different from the current svn version. I am not aware if this change made in the current version to reflect some change in the API behaviour, but I followed the WorkerContext definition from the attached file and my code worked.)

You will see that you need to register the aggregator twice - in the initialize() method of MasterCompute (which you have done) and in the preApplication() method of the WorkerContext. Moreover in the preSuperstep() method of the WorkerContext, you need to call useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous value to it and stores the result as the new vertex value; the result is also stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on the 10th superstep

The worker context seems to serve no other purpose but to hold the value of FINAL_SUM (not related to the aggregator) at each iteration.  It also seems like the aggregator is registered in the initialize method of the MasterCompute object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override the preApplication() method in WorkerContext to register the aggregator and then override the preSuperstep() method to to tell the workers to use the aggregator (the useAggregator() method). Check the MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>






<image001.png>


Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>







Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]






Re: Adding MasterCompute object causes "failed to report status" errors

Posted by Nick West <ni...@benchmarksolutions.com>.
After some more experimenting, I was able to get it to work.  I figured I'd share my results.

In my setup, I have a BooleanAndAggregator, which by default has a value of true.  I want to stop when all vertices vote to stop (even if there are new messages sent).  So, in a standard superstep each vertex will do some computation, vote to stop (or not), and then AND its vote to the aggregator.  When the MasterCompute detects that all vertices have voted to stop (the aggregator has a true value) it calls halt computation.

What it seems like is happening is the following:

1)  MasterCompute's compute method gets called at the beginning of each superstep - not at the end of it as (I think) is stated in the documentation and what it used to do.
2)  In my case, at start of superstep 0, the BooleanAndAggregator has an initial value of true (I couldn't get this to change by calling the set value method in the initialize method - I think this is because the aggregator is reinitialized every superstep) so when the MasterCompute's compute method is called at the beginning of superstep zero, it immediately halts computation.
3)  This immediate shutdown seems to cause a race condition with the other task (the one assigned to the vertex computations) which doesn't know it should quit, and the zookeeper task seems to wait for it to stop.

To address this, I currently just have the MasterCompute's compute method do nothing in superstep 0.  However, I think the behavior described above is not correct.  If there is some other cause for the behavior I'm seeing please let me know.

Thanks,
Nick


On Aug 21, 2012, at 2:28 PM, Nick West wrote:

Thank you both for the replies.

I have checked out the most recent version of the code and am still having the same problem.  (I also tried with the previous aggregator code with no luck either.)  Looking in the logs it appears that It gets through superstep -1, and the block seems to occur on superstep 0.  This is the end of the log file for the non-master task:

2012-08-21 14:14:15,620 INFO org.apache.giraph.graph.BspServiceWorker: finishSuperstep: Completed superstep -1 with global stats (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=true)
2012-08-21 14:14:15,621 INFO org.apache.giraph.comm.BasicRPCCommunications: prepareSuperstep: Superstep 0 totalMem = 81.0625M, maxMem = 197.5M, freeMem = 68.69868M
2012-08-21 14:14:15,627 WARN org.apache.giraph.graph.BspService: process: Unknown and unprocessed event (path=/_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir, type=NodeChildrenChanged, state=SyncConnected)
2012-08-21 14:14:15,629 INFO org.apache.giraph.graph.BspServiceWorker: registerHealth: Created my health node for attempt=0, superstep=0 with /_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir/0/_workerHealthyDir/nwest-mac.benchmark.local_1 and workerInfo= Worker(hostname=nwest-mac.benchmark.local, MRpartition=1, port=30001)
2012-08-21 14:14:15,657 INFO org.apache.giraph.graph.BspServiceWorker: processEvent: Job state changed, checking to see if it needs to restart
2012-08-21 14:14:15,658 INFO org.apache.giraph.graph.BspService: getJobState: Job state already exists (/_hadoopBsp/job_201208211408_0002/_masterJobState)


and these are the last lines of the master log:

012-08-21 14:14:15,617 INFO org.apache.giraph.graph.BspServiceMaster: aggregateWorkerStats: Aggregation found (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=false) on superstep = -1
2012-08-21 14:14:15,652 INFO org.apache.giraph.graph.MasterThread: masterThread: Coordination of superstep -1 took 0.945 seconds ended with state ALL_SUPERSTEPS_DONE and is now on superstep 0
2012-08-21 14:14:15,654 INFO org.apache.giraph.graph.BspServiceMaster: setJobState: {"_stateKey":"FINISHED","_applicationAttemptKey":-1,"_superstepKey":-1} on superstep 0
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanup: Notifying master its okay to cleanup with /_hadoopBsp/job_201208211408_0002/_cleanedUpDir/0_master
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Node /_hadoopBsp/job_201208211408_0002/_cleanedUpDir already exists, no need to create.
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Got 1 of 2 desired children from /_hadoopBsp/job_201208211408_0002/_cleanedUpDir
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanedUpZooKeeper: Waiting for the children of /_hadoopBsp/job_201208211408_0002/_cleanedUpDir to change since only got 1 nodes.

Again, the only addition is the use of the aggregator, otherwise the code runs perfectly fine.  Thoughts?

Thanks,
Nick

On Aug 21, 2012, at 4:06 AM, Maja Kabiljo wrote:

Hi Nick,

There were some very recent changes in the way aggregators are used. If your code below compiles it means that you are using the version before the changes, and looking at the example after them. The code which Kaushik attached shows how you should do it if you are not using the newest Giraph code.

If you want to use newest code, here is how aggregators work there:

  *   You have to register aggregators only on master, just like you are doing now. You can use registerAggregator or registerPersistentAggregator, depending on whether or not you want it to be reset on every super step.
  *   You don't have getAggregator method anymore. During vertex computation you can only call aggregate(name, value), and in master.compute you have setAggregatedValue(name, value).
  *   There is no more registerAggregator and useAggregator on workers, therefore you don't have to use WorkerContext in order to use aggregators.

Hope this helps,
Maja

From: KAUSHIK SARKAR <co...@gmail.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 7:39 AM
To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that I am using. It is approx. 1 month old. It seems that the WorkerContext class is different from the current svn version. I am not aware if this change made in the current version to reflect some change in the API behaviour, but I followed the WorkerContext definition from the attached file and my code worked.)

You will see that you need to register the aggregator twice - in the initialize() method of MasterCompute (which you have done) and in the preApplication() method of the WorkerContext. Moreover in the preSuperstep() method of the WorkerContext, you need to call useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous value to it and stores the result as the new vertex value; the result is also stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on the 10th superstep

The worker context seems to serve no other purpose but to hold the value of FINAL_SUM (not related to the aggregator) at each iteration.  It also seems like the aggregator is registered in the initialize method of the MasterCompute object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override the preApplication() method in WorkerContext to register the aggregator and then override the preSuperstep() method to to tell the workers to use the aggregator (the useAggregator() method). Check the MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>






<image001.png>


Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>







Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]






Re: Adding MasterCompute object causes "failed to report status" errors

Posted by Nick West <ni...@benchmarksolutions.com>.
Thank you both for the replies.

I have checked out the most recent version of the code and am still having the same problem.  (I also tried with the previous aggregator code with no luck either.)  Looking in the logs it appears that It gets through superstep -1, and the block seems to occur on superstep 0.  This is the end of the log file for the non-master task:

2012-08-21 14:14:15,620 INFO org.apache.giraph.graph.BspServiceWorker: finishSuperstep: Completed superstep -1 with global stats (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=true)
2012-08-21 14:14:15,621 INFO org.apache.giraph.comm.BasicRPCCommunications: prepareSuperstep: Superstep 0 totalMem = 81.0625M, maxMem = 197.5M, freeMem = 68.69868M
2012-08-21 14:14:15,627 WARN org.apache.giraph.graph.BspService: process: Unknown and unprocessed event (path=/_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir, type=NodeChildrenChanged, state=SyncConnected)
2012-08-21 14:14:15,629 INFO org.apache.giraph.graph.BspServiceWorker: registerHealth: Created my health node for attempt=0, superstep=0 with /_hadoopBsp/job_201208211408_0002/_applicationAttemptsDir/0/_superstepDir/0/_workerHealthyDir/nwest-mac.benchmark.local_1 and workerInfo= Worker(hostname=nwest-mac.benchmark.local, MRpartition=1, port=30001)
2012-08-21 14:14:15,657 INFO org.apache.giraph.graph.BspServiceWorker: processEvent: Job state changed, checking to see if it needs to restart
2012-08-21 14:14:15,658 INFO org.apache.giraph.graph.BspService: getJobState: Job state already exists (/_hadoopBsp/job_201208211408_0002/_masterJobState)


and these are the last lines of the master log:

012-08-21 14:14:15,617 INFO org.apache.giraph.graph.BspServiceMaster: aggregateWorkerStats: Aggregation found (vtx=1248,finVtx=0,edges=2944,msgCount=0,haltComputation=false) on superstep = -1
2012-08-21 14:14:15,652 INFO org.apache.giraph.graph.MasterThread: masterThread: Coordination of superstep -1 took 0.945 seconds ended with state ALL_SUPERSTEPS_DONE and is now on superstep 0
2012-08-21 14:14:15,654 INFO org.apache.giraph.graph.BspServiceMaster: setJobState: {"_stateKey":"FINISHED","_applicationAttemptKey":-1,"_superstepKey":-1} on superstep 0
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanup: Notifying master its okay to cleanup with /_hadoopBsp/job_201208211408_0002/_cleanedUpDir/0_master
2012-08-21 14:14:15,662 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Node /_hadoopBsp/job_201208211408_0002/_cleanedUpDir already exists, no need to create.
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanUpZooKeeper: Got 1 of 2 desired children from /_hadoopBsp/job_201208211408_0002/_cleanedUpDir
2012-08-21 14:14:15,663 INFO org.apache.giraph.graph.BspServiceMaster: cleanedUpZooKeeper: Waiting for the children of /_hadoopBsp/job_201208211408_0002/_cleanedUpDir to change since only got 1 nodes.

Again, the only addition is the use of the aggregator, otherwise the code runs perfectly fine.  Thoughts?

Thanks,
Nick

On Aug 21, 2012, at 4:06 AM, Maja Kabiljo wrote:

Hi Nick,

There were some very recent changes in the way aggregators are used. If your code below compiles it means that you are using the version before the changes, and looking at the example after them. The code which Kaushik attached shows how you should do it if you are not using the newest Giraph code.

If you want to use newest code, here is how aggregators work there:

  *   You have to register aggregators only on master, just like you are doing now. You can use registerAggregator or registerPersistentAggregator, depending on whether or not you want it to be reset on every super step.
  *   You don't have getAggregator method anymore. During vertex computation you can only call aggregate(name, value), and in master.compute you have setAggregatedValue(name, value).
  *   There is no more registerAggregator and useAggregator on workers, therefore you don't have to use WorkerContext in order to use aggregators.

Hope this helps,
Maja

From: KAUSHIK SARKAR <co...@gmail.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 7:39 AM
To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that I am using. It is approx. 1 month old. It seems that the WorkerContext class is different from the current svn version. I am not aware if this change made in the current version to reflect some change in the API behaviour, but I followed the WorkerContext definition from the attached file and my code worked.)

You will see that you need to register the aggregator twice - in the initialize() method of MasterCompute (which you have done) and in the preApplication() method of the WorkerContext. Moreover in the preSuperstep() method of the WorkerContext, you need to call useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous value to it and stores the result as the new vertex value; the result is also stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on the 10th superstep

The worker context seems to serve no other purpose but to hold the value of FINAL_SUM (not related to the aggregator) at each iteration.  It also seems like the aggregator is registered in the initialize method of the MasterCompute object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override the preApplication() method in WorkerContext to register the aggregator and then override the preSuperstep() method to to tell the workers to use the aggregator (the useAggregator() method). Check the MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>






<image001.png>


Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]






Re: Adding MasterCompute object causes "failed to report status" errors

Posted by Maja Kabiljo <ma...@fb.com>.
Hi Nick,

There were some very recent changes in the way aggregators are used. If your code below compiles it means that you are using the version before the changes, and looking at the example after them. The code which Kaushik attached shows how you should do it if you are not using the newest Giraph code.

If you want to use newest code, here is how aggregators work there:

  *   You have to register aggregators only on master, just like you are doing now. You can use registerAggregator or registerPersistentAggregator, depending on whether or not you want it to be reset on every super step.
  *   You don't have getAggregator method anymore. During vertex computation you can only call aggregate(name, value), and in master.compute you have setAggregatedValue(name, value).
  *   There is no more registerAggregator and useAggregator on workers, therefore you don't have to use WorkerContext in order to use aggregators.

Hope this helps,
Maja

From: KAUSHIK SARKAR <co...@gmail.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Tuesday, August 21, 2012 7:39 AM
To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Subject: Re: Adding MasterCompute object causes "failed to report status" errors

Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that I am using. It is approx. 1 month old. It seems that the WorkerContext class is different from the current svn version. I am not aware if this change made in the current version to reflect some change in the API behaviour, but I followed the WorkerContext definition from the attached file and my code worked.)

You will see that you need to register the aggregator twice - in the initialize() method of MasterCompute (which you have done) and in the preApplication() method of the WorkerContext. Moreover in the preSuperstep() method of the WorkerContext, you need to call useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous value to it and stores the result as the new vertex value; the result is also stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on the 10th superstep

The worker context seems to serve no other purpose but to hold the value of FINAL_SUM (not related to the aggregator) at each iteration.  It also seems like the aggregator is registered in the initialize method of the MasterCompute object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override the preApplication() method in WorkerContext to register the aggregator and then override the preSuperstep() method to to tell the workers to use the aggregator (the useAggregator() method). Check the MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]







Re: Adding MasterCompute object causes "failed to report status" errors

Posted by KAUSHIK SARKAR <co...@gmail.com>.
Hi Nick,

Please refer to the SimpleMasterComputeWorkerContext class in the attached
SimpleMasterComputeVertex.java file (This is from the snapshot of 0.2 that
I am using. It is approx. 1 month old. It seems that the WorkerContext
class is different from the current svn version. I am not aware if this
change made in the current version to reflect some change in the API
behaviour, but I followed the WorkerContext definition from the attached
file and my code worked.)

You will see that you need to register the aggregator twice - in the
initialize() method of MasterCompute (which you have done) and in the
preApplication() method of the WorkerContext. Moreover in the
preSuperstep() method of the WorkerContext, you need to call
useAggregator() method.

I am not sure if this is the problem with your code, but you can give it a
try and see if it solves your issue.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 3:04 PM, Nick West <nick.west@benchmarksolutions.com
> wrote:

>  I'm a little confused by the examples in SimpleMasterComputeVertex.java.
>
>
>  To me it looks like this is a simple example with one vertex and one
> aggregator with the following behavior:
> - The vertex gets the value stored in the aggregator and then adds its
> previous value to it and stores the result as the new vertex value; the
> result is also stored in the worker context
> - The aggregator sets its value to superstep/2 + 1 every iteration and
> stops on the 10th superstep
>
>  The worker context seems to serve no other purpose but to hold the value
> of FINAL_SUM (not related to the aggregator) at each iteration.  It also
> seems like the aggregator is registered in the initialize method of the
> MasterCompute object, much like I have in my code.
>
>  I see one difference between the example and my code:
>    1) I use the aggregate function in each vertex's compute method.  If
> this is not the way to have the vertices combine values, what is?
>
>  If you can provide insight to either how I'm not following the example,
> or what else might wrong, that'd be great.
>
>  Thanks,
> Nick
>
>
>  On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:
>
> Hi Nick,
>
>  Are you using WorkerContext to register the aggregator? You need to
> override the preApplication() method in WorkerContext to register the
> aggregator and then override the preSuperstep() method to to tell the
> workers to use the aggregator (the useAggregator() method). Check the
> MasterCompute and WorkerContext examples in Giraph.
>
>  Regards,
> Kaushik
>
> On Mon, Aug 20, 2012 at 1:26 PM, Nick West <
> nick.west@benchmarksolutions.com> wrote:
>
>>  Hi,
>>
>>  I have a giraph application that runs fine; however, when I add a
>> MasterCompute object (definition following) all of the map tasks time out.
>> I have hadoop configured to run with 8 map processes and giraph to use one
>> worker.
>>
>>  Here's the definition of the MasterCompute object:
>>
>>  class BPMasterComputer extends MasterCompute{
>>   override def compute() {
>>     val agg =
>> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>>     val res = agg.getAggregatedValue.get
>>     if (res) haltComputation
>>     agg.setAggregatedValue(true)
>>   }
>>   override def initialize() {
>>     registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
>>     val agg =
>> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>>     agg.setAggregatedValue(true)
>>   }
>>   override def write(out: DataOutput) {}
>>   override def readFields(in: DataInput) {}
>> }
>>
>>  (as far as I can tell, there is no state that needs to be
>> read/written.)  I then register this class as the MasterCompute class in
>> the giraph job:
>>
>>  job.setMasterComputeClass(classOf[BPMasterComputer])
>>
>>  and then use the aggregator in the compute method of my vertices:
>>
>>  class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text,
>> PackagedMessage] with Loggable {
>>    override def compute(msgs: java.util.Iterator[PackagedMessage]) {
>>     ...
>>     var stop = false
>>     val agg =
>> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>>      ... code to modify stop and vote to halt ...
>>      agg.aggregate(stop)
>>   }
>> }
>>
>>  Is there some other method that I am not calling that I should?  Or
>> some step that I'm missing?  Any suggestions as to why/how these additions
>> are causing the processes to block would be appreciated!
>>
>>  Thanks,
>> *Nick West
>> **
>> *Benchmark Solutions
>> 101 Park Avenue - 7th Floor
>> New York, NY 10178
>> Tel +1.212.220.4739 | Mobile +1.646.267.4324
>> *www.benchmarksolutions.com * <http://www.benchmarksolutions.com/>
>> **
>> *<image001.png>
>>
>>
>>
>>    *
>> **
>>
>
>
> *
> Nick West
> **
> *Benchmark Solutions
> 101 Park Avenue - 7th Floor
> New York, NY 10178
> Tel +1.212.220.4739 | Mobile +1.646.267.4324
> *www.benchmarksolutions.com * <http://www.benchmarksolutions.com/>
> ***
>
>
>
>    *
> **
>

Re: Adding MasterCompute object causes "failed to report status" errors

Posted by Nick West <ni...@benchmarksolutions.com>.
I'm a little confused by the examples in SimpleMasterComputeVertex.java.

To me it looks like this is a simple example with one vertex and one aggregator with the following behavior:
- The vertex gets the value stored in the aggregator and then adds its previous value to it and stores the result as the new vertex value; the result is also stored in the worker context
- The aggregator sets its value to superstep/2 + 1 every iteration and stops on the 10th superstep

The worker context seems to serve no other purpose but to hold the value of FINAL_SUM (not related to the aggregator) at each iteration.  It also seems like the aggregator is registered in the initialize method of the MasterCompute object, much like I have in my code.

I see one difference between the example and my code:
   1) I use the aggregate function in each vertex's compute method.  If this is not the way to have the vertices combine values, what is?

If you can provide insight to either how I'm not following the example, or what else might wrong, that'd be great.

Thanks,
Nick


On Aug 20, 2012, at 4:52 PM, KAUSHIK SARKAR wrote:

Hi Nick,

Are you using WorkerContext to register the aggregator? You need to override the preApplication() method in WorkerContext to register the aggregator and then override the preSuperstep() method to to tell the workers to use the aggregator (the useAggregator() method). Check the MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <ni...@benchmarksolutions.com>> wrote:
Hi,

I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.

Here's the definition of the MasterCompute object:

class BPMasterComputer extends MasterCompute{
  override def compute() {
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    val res = agg.getAggregatedValue.get
    if (res) haltComputation
    agg.setAggregatedValue(true)
  }
  override def initialize() {
    registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    agg.setAggregatedValue(true)
  }
  override def write(out: DataOutput) {}
  override def readFields(in: DataInput) {}
}

(as far as I can tell, there is no state that needs to be read/written.)  I then register this class as the MasterCompute class in the giraph job:

job.setMasterComputeClass(classOf[BPMasterComputer])

and then use the aggregator in the compute method of my vertices:

class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text, PackagedMessage] with Loggable {
   override def compute(msgs: java.util.Iterator[PackagedMessage]) {
    ...
    var stop = false
    val agg = getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
    ... code to modify stop and vote to halt ...
    agg.aggregate(stop)
  }
}

Is there some other method that I am not calling that I should?  Or some step that I'm missing?  Any suggestions as to why/how these additions are causing the processes to block would be appreciated!

Thanks,
Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739<tel:%2B1.212.220.4739> | Mobile +1.646.267.4324<tel:%2B1.646.267.4324>
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
<image001.png>








Nick West

Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:image001.png@01CCA50E.43B4A860]






Re: Adding MasterCompute object causes "failed to report status" errors

Posted by KAUSHIK SARKAR <co...@gmail.com>.
Hi Nick,

Are you using WorkerContext to register the aggregator? You need to
override the preApplication() method in WorkerContext to register the
aggregator and then override the preSuperstep() method to to tell the
workers to use the aggregator (the useAggregator() method). Check the
MasterCompute and WorkerContext examples in Giraph.

Regards,
Kaushik

On Mon, Aug 20, 2012 at 1:26 PM, Nick West <nick.west@benchmarksolutions.com
> wrote:

>  Hi,
>
>  I have a giraph application that runs fine; however, when I add a
> MasterCompute object (definition following) all of the map tasks time out.
> I have hadoop configured to run with 8 map processes and giraph to use one
> worker.
>
>  Here's the definition of the MasterCompute object:
>
>  class BPMasterComputer extends MasterCompute{
>   override def compute() {
>     val agg =
> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>     val res = agg.getAggregatedValue.get
>     if (res) haltComputation
>     agg.setAggregatedValue(true)
>   }
>   override def initialize() {
>     registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
>     val agg =
> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>     agg.setAggregatedValue(true)
>   }
>   override def write(out: DataOutput) {}
>   override def readFields(in: DataInput) {}
> }
>
>  (as far as I can tell, there is no state that needs to be read/written.)
>  I then register this class as the MasterCompute class in the giraph job:
>
>  job.setMasterComputeClass(classOf[BPMasterComputer])
>
>  and then use the aggregator in the compute method of my vertices:
>
>  class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text,
> PackagedMessage] with Loggable {
>    override def compute(msgs: java.util.Iterator[PackagedMessage]) {
>     ...
>     var stop = false
>     val agg =
> getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
>      ... code to modify stop and vote to halt ...
>      agg.aggregate(stop)
>   }
> }
>
>  Is there some other method that I am not calling that I should?  Or some
> step that I'm missing?  Any suggestions as to why/how these additions are
> causing the processes to block would be appreciated!
>
>  Thanks,
> *Nick West
> **
> *Benchmark Solutions
> 101 Park Avenue - 7th Floor
> New York, NY 10178
> Tel +1.212.220.4739 | Mobile +1.646.267.4324
> *www.benchmarksolutions.com * <http://www.benchmarksolutions.com/>
> ***
>
>
>
>    *
> **
>