You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@giraph.apache.org by Nils Rethmeier <ni...@gmx.net> on 2012/05/28 15:04:23 UTC

correct usage of aggregators, pittfalls?

Hello everyone,

I ran into some issues while trying to figure out how to correctly use
aggregators, since I want to implement a global priority queue that
"schedules" processing on vertices. As a simple test to better
understand aggregator useage I ended up modifying the SimpleShortestPathsVertex example and added the SumAggregator code
from the SimplePageRankVertex example to it (Workercontext and
compute()) (code posted below).
Though this test code does not do anything useful I was surprised to
see the following worker NullPointerExceptions during execution.

2012-05-23 14:44:59,267 INFO
org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
Initialized cache for UID to User mapping with a cache timeout of
14400 seconds.
2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
Got UserName hadoop00 for UID 508 from the native implementation
2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.NullPointerException
    at org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
    at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
    at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:253)
2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
cleanup for the task


So my question is. What are the pitfalls (method call order, setup,
superstep count) of aggregator usage, as following the description in
useAggregator did
not seem to help, so I am obviously missing some detail.

JAVA Code:

 @Override
 public void compute(Iterator<DoubleWritable> msgIterator) {
 LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
 if (getSuperstep() == 0) {
   setVertexValue(new DoubleWritable(Double.MAX_VALUE));
 }
 double minDist = isSource() ? 0d : Double.MAX_VALUE;
 while (msgIterator.hasNext()) {
   minDist = Math.min(minDist, msgIterator.next().get());
 }
 if (LOG.isDebugEnabled()) {
   LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
       " vertex value = " + getVertexValue());
 }
 if (getSuperstep() >= 0) {
     sumAggreg.aggregate(1L);      // NPE at Line 104
 }

 if (minDist < getVertexValue().get()) {
   setVertexValue(new DoubleWritable(minDist));
   for (LongWritable targetVertexId : this) {
     FloatWritable edgeValue = getEdgeValue(targetVertexId);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Vertex " + getVertexId() + " sent to " +
           targetVertexId + " = " +
           (minDist + edgeValue.get()));
     }
     sendMsg(targetVertexId,
         new DoubleWritable(minDist + edgeValue.get()));
   }
 }
 voteToHalt();
 }

 public static class MyVertexWorkerContext extends
   WorkerContext {
 /** Final sum value for verification for local jobs */
 private static long FINAL_SUM;

 public static long getFinalSum() {
   return FINAL_SUM;
 }

 @Override
 public void preApplication()
   throws InstantiationException, IllegalAccessException {
   registerAggregator("sum", LongSumAggregator.class);
 }

 @Override
 public void postApplication() {
     System.out.println("PreApp");
   LongSumAggregator sumAggreg =
       (LongSumAggregator) getAggregator("sum");

   FINAL_SUM = sumAggreg.getAggregatedValue().get();
   LOG.info("aggregatedNumVertices=" + FINAL_SUM);
 }

 @Override
 public void preSuperstep() {
     System.out.println("PreSuperStep");
   LongSumAggregator sumAggreg =
       (LongSumAggregator) getAggregator("sum");
   this.useAggregator("sum");
   sumAggreg.setAggregatedValue(new LongWritable(0L));
 }

 @Override
 public void postSuperstep() {}
 }

Rest as in SimpleShortestPathsVertex.

Regards,
Nils
-- 
NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone!                                  
Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a

Re: correct usage of aggregators, pittfalls?

Posted by Nils Rethmeier <ni...@gmx.net>.
Thanks Paolo,

adding the line to my job setup fixed the issue!
Without this configuration the aggregator methods such as aggregate(), getAggregatedValue() aso. kept throwing a NullPointerExceptions when called.

Nils



-------- Original-Nachricht --------
> Datum: Mon, 28 May 2012 17:33:14 +0100
> Von: Paolo Castagna <ca...@googlemail.com>
> An: user@giraph.apache.org
> Betreff: Re: correct usage of aggregators, pittfalls?

> Hi Nils,
> I am not 100%, but...
> 
> Do you configure your GiraphJob propertly?
> 
> You need to tell Giraph you want to use your MyVertexWorkerContext.
> You can do that using the GiraphJob.setWorkerContextClass(...) method
> or the 'wc' option for the command line.
> 
> My 2 cents,
> Paolo
> 
> Nils Rethmeier wrote:
> > Hello everyone,
> > 
> > I ran into some issues while trying to figure out how to correctly use
> > aggregators, since I want to implement a global priority queue that
> > "schedules" processing on vertices. As a simple test to better
> > understand aggregator useage I ended up modifying the
> SimpleShortestPathsVertex example and added the SumAggregator code
> > from the SimplePageRankVertex example to it (Workercontext and
> > compute()) (code posted below).
> > Though this test code does not do anything useful I was surprised to
> > see the following worker NullPointerExceptions during execution.
> > 
> > 2012-05-23 14:44:59,267 INFO
> > org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
> > truncater with mapRetainSize=-1 and reduceRetainSize=-1
> > 2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
> > Initialized cache for UID to User mapping with a cache timeout of
> > 14400 seconds.
> > 2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
> > Got UserName hadoop00 for UID 508 from the native implementation
> > 2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error
> running child
> > java.lang.NullPointerException
> >     at
> org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
> >     at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
> >     at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
> >     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> >     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
> >     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:396)
> >     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
> >     at org.apache.hadoop.mapred.Child.main(Child.java:253)
> > 2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
> > cleanup for the task
> > 
> > 
> > So my question is. What are the pitfalls (method call order, setup,
> > superstep count) of aggregator usage, as following the description in
> > useAggregator did
> > not seem to help, so I am obviously missing some detail.
> > 
> > JAVA Code:
> > 
> >  @Override
> >  public void compute(Iterator<DoubleWritable> msgIterator) {
> >  LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
> >  if (getSuperstep() == 0) {
> >    setVertexValue(new DoubleWritable(Double.MAX_VALUE));
> >  }
> >  double minDist = isSource() ? 0d : Double.MAX_VALUE;
> >  while (msgIterator.hasNext()) {
> >    minDist = Math.min(minDist, msgIterator.next().get());
> >  }
> >  if (LOG.isDebugEnabled()) {
> >    LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
> >        " vertex value = " + getVertexValue());
> >  }
> >  if (getSuperstep() >= 0) {
> >      sumAggreg.aggregate(1L);      // NPE at Line 104
> >  }
> > 
> >  if (minDist < getVertexValue().get()) {
> >    setVertexValue(new DoubleWritable(minDist));
> >    for (LongWritable targetVertexId : this) {
> >      FloatWritable edgeValue = getEdgeValue(targetVertexId);
> >      if (LOG.isDebugEnabled()) {
> >        LOG.debug("Vertex " + getVertexId() + " sent to " +
> >            targetVertexId + " = " +
> >            (minDist + edgeValue.get()));
> >      }
> >      sendMsg(targetVertexId,
> >          new DoubleWritable(minDist + edgeValue.get()));
> >    }
> >  }
> >  voteToHalt();
> >  }
> > 
> >  public static class MyVertexWorkerContext extends
> >    WorkerContext {
> >  /** Final sum value for verification for local jobs */
> >  private static long FINAL_SUM;
> > 
> >  public static long getFinalSum() {
> >    return FINAL_SUM;
> >  }
> > 
> >  @Override
> >  public void preApplication()
> >    throws InstantiationException, IllegalAccessException {
> >    registerAggregator("sum", LongSumAggregator.class);
> >  }
> > 
> >  @Override
> >  public void postApplication() {
> >      System.out.println("PreApp");
> >    LongSumAggregator sumAggreg =
> >        (LongSumAggregator) getAggregator("sum");
> > 
> >    FINAL_SUM = sumAggreg.getAggregatedValue().get();
> >    LOG.info("aggregatedNumVertices=" + FINAL_SUM);
> >  }
> > 
> >  @Override
> >  public void preSuperstep() {
> >      System.out.println("PreSuperStep");
> >    LongSumAggregator sumAggreg =
> >        (LongSumAggregator) getAggregator("sum");
> >    this.useAggregator("sum");
> >    sumAggreg.setAggregatedValue(new LongWritable(0L));
> >  }
> > 
> >  @Override
> >  public void postSuperstep() {}
> >  }
> > 
> > Rest as in SimpleShortestPathsVertex.
> > 
> > Regards,
> > Nils
> 
-- 
NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone!                                  
Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a

Re: correct usage of aggregators, pittfalls?

Posted by Paolo Castagna <ca...@googlemail.com>.
Hi Nils,
I am not 100%, but...

Do you configure your GiraphJob propertly?

You need to tell Giraph you want to use your MyVertexWorkerContext.
You can do that using the GiraphJob.setWorkerContextClass(...) method
or the 'wc' option for the command line.

My 2 cents,
Paolo

Nils Rethmeier wrote:
> Hello everyone,
> 
> I ran into some issues while trying to figure out how to correctly use
> aggregators, since I want to implement a global priority queue that
> "schedules" processing on vertices. As a simple test to better
> understand aggregator useage I ended up modifying the SimpleShortestPathsVertex example and added the SumAggregator code
> from the SimplePageRankVertex example to it (Workercontext and
> compute()) (code posted below).
> Though this test code does not do anything useful I was surprised to
> see the following worker NullPointerExceptions during execution.
> 
> 2012-05-23 14:44:59,267 INFO
> org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
> truncater with mapRetainSize=-1 and reduceRetainSize=-1
> 2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
> Initialized cache for UID to User mapping with a cache timeout of
> 14400 seconds.
> 2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
> Got UserName hadoop00 for UID 508 from the native implementation
> 2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error running child
> java.lang.NullPointerException
>     at org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
>     at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
>     at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:396)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
> 2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
> cleanup for the task
> 
> 
> So my question is. What are the pitfalls (method call order, setup,
> superstep count) of aggregator usage, as following the description in
> useAggregator did
> not seem to help, so I am obviously missing some detail.
> 
> JAVA Code:
> 
>  @Override
>  public void compute(Iterator<DoubleWritable> msgIterator) {
>  LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
>  if (getSuperstep() == 0) {
>    setVertexValue(new DoubleWritable(Double.MAX_VALUE));
>  }
>  double minDist = isSource() ? 0d : Double.MAX_VALUE;
>  while (msgIterator.hasNext()) {
>    minDist = Math.min(minDist, msgIterator.next().get());
>  }
>  if (LOG.isDebugEnabled()) {
>    LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
>        " vertex value = " + getVertexValue());
>  }
>  if (getSuperstep() >= 0) {
>      sumAggreg.aggregate(1L);      // NPE at Line 104
>  }
> 
>  if (minDist < getVertexValue().get()) {
>    setVertexValue(new DoubleWritable(minDist));
>    for (LongWritable targetVertexId : this) {
>      FloatWritable edgeValue = getEdgeValue(targetVertexId);
>      if (LOG.isDebugEnabled()) {
>        LOG.debug("Vertex " + getVertexId() + " sent to " +
>            targetVertexId + " = " +
>            (minDist + edgeValue.get()));
>      }
>      sendMsg(targetVertexId,
>          new DoubleWritable(minDist + edgeValue.get()));
>    }
>  }
>  voteToHalt();
>  }
> 
>  public static class MyVertexWorkerContext extends
>    WorkerContext {
>  /** Final sum value for verification for local jobs */
>  private static long FINAL_SUM;
> 
>  public static long getFinalSum() {
>    return FINAL_SUM;
>  }
> 
>  @Override
>  public void preApplication()
>    throws InstantiationException, IllegalAccessException {
>    registerAggregator("sum", LongSumAggregator.class);
>  }
> 
>  @Override
>  public void postApplication() {
>      System.out.println("PreApp");
>    LongSumAggregator sumAggreg =
>        (LongSumAggregator) getAggregator("sum");
> 
>    FINAL_SUM = sumAggreg.getAggregatedValue().get();
>    LOG.info("aggregatedNumVertices=" + FINAL_SUM);
>  }
> 
>  @Override
>  public void preSuperstep() {
>      System.out.println("PreSuperStep");
>    LongSumAggregator sumAggreg =
>        (LongSumAggregator) getAggregator("sum");
>    this.useAggregator("sum");
>    sumAggreg.setAggregatedValue(new LongWritable(0L));
>  }
> 
>  @Override
>  public void postSuperstep() {}
>  }
> 
> Rest as in SimpleShortestPathsVertex.
> 
> Regards,
> Nils