You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@giraph.apache.org by Puneet Agarwal <pu...@yahoo.com> on 2014/11/16 07:37:05 UTC

Persistent Aggregator Forgets Previously Aggregated Value ????

Hi All,

I have been facing a problem in Giraph for last two weeks. Following is the detailed description, sample code, corresponding log message and a few questions.
I will greatly appreciate any help in this regard.

Code Description:
=================
My algorithm assigns a score to select vertexes of the graph. I have to find the best K(=5) vertexes such that their score is minimum.
I have written a Persistent Aggregator for this which takes Text value. The snippet of the code is given below.

It works fine on my laptop (single worker), but not on the cluster of 4 computers, when running with 10 workers.
I am using Hadoop 1.0.0 and Giraph 1.0.0.


Problem Description:
====================
In the aggregator one of the previously aggregated value suddenly gets lost.
Therefore, my algorithm generates wrong results.

You can observe this in the logs, given below, in following manner
a.    At line number 11, the vertex 8.4793-1 has weight 3.0, and it is the best so far.
b.    At line number 12, the aggregator receives a new value "6.1100-1,6.0", i.e., vertex=6.1100-1 has weight=6.0
c.    At line number 13, in the begining of aggregate method, the output of getAggregatedValue() is also same as newValue.
This means it simply forgets the previously aggregated value, why so? 

I have printed object-ids also, in the log messages, in order to ensure that this is happening in the same object.

Source Code Snippet:
====================

@Override
public void aggregate(Text value) {
    printLogMessage("aggregate", "MyAGG", "Entering - NewValue=", value);
    printLogMessage("aggregate", "MyAGG",
            "Entering - getAggregatedValue()=", getAggregatedValue());

        // Main Logic is here - Omitting for simplicity

    setAggregatedValue(new Text(myTopK));
    printLogMessage("aggregate", "MyAGG", "Exiting AggregatedValue=",
            getAggregatedValue().toString());
}

@Override
public Text createInitialValue() {
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Entering getAggregatedValue()=", getAggregatedValue());
    Text initialValue = getAggregatedValue();
    if (initialValue == null) {
        initialValue = new Text();
    }
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Exiting ReturnedValue=", getAggregatedValue());
    return initialValue;
}
private void printLogMessage(String methodName, String prefix, String msg, Object obj) {
    if (isDebugOn(methodName)) {
        if (obj == null) {
            obj = "";
        }
        System.out.println(java.lang.System.identityHashCode(this) + "--"+ prefix + "==>" + 
                                methodName + "() - " + msg + obj.toString());
    }
}

Printed Log Messages:
=====================
1.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=2
2.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=
3.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=
4.  249768912--MyAGG==>aggregate() - Entering - NewValue=8.4793-1,3.0
5.  249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=8.4793-1,3.0
6.  249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=8.4793-1,3.0
7.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
8.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
9.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=3
10. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
11. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
12. 249768912--MyAGG==>aggregate() - Entering - NewValue=6.1100-1,6.0
13. 249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=6.1100-1,6.0
14. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=6.1100-1,6.0
15. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=6.1100-1,6.0
16. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=6.1100-1,6.0
17. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=4
18. ...
...

My Queries:
===========
1.    Why does the createInitialValue method gets called twice, in the same superstep, on the same object of the aggregator?
2.    Even if its gets called twice, why should it forget the previouly aggregated value?

Please help me resolve this issue. This is a major problem in my work. 
I am not able to proceed further because of this.

Thanks in anticipation.

- Puneet
IIT Delhi, India


Aggregator Issue and GIRAPH-806

Posted by Puneet Agarwal <pu...@yahoo.com.INVALID>.
Hi All,I am sorry for double posting and for posting a use related e-mail in dev forum. Doing this because, I got no response to some of e-mails in the users' list.
I have two main problems:
1.    The aggregator functionality is not behaving as it should. I need to understand how aggregators work in Giraph to resolve that problem. So my questions are:

a.    I observed that more than one objects of my custom aggregator class get instantiated per worker, why so?b.    I also observed that there is a master aggregator which seems to be getting the locally aggregated values from various workers. When this master-aggregator receives a value from one of the local-aggregators, it forgets the previously aggregated value, which should not happen. I have posted code-snippet and log messages in the users' forum also given in the attached e-mail.c.    Is this a bug in Giraph?d.    How can I understand Aggregator behaviour of Giraph in detailed, is reading the source-code of Giraph the only option?e.    Is something wrong in what I am doing?
 
2.    Sometimes the Giraph jobs hang, and I noticed that I get similar problems as mentioned in Jira issue - GIRAPH-806. I observed that there is a patch available for this issue. But when I try to apply the patch on top of Giraph 1.0.0, I face the problem that the patch has been provided on a nightly build dated ---> 1390205044000 ( I did not understand this date format), and not on Giraph 1.0.0.

a.    Should I upgrade to 1.1.0 ( when it becomes available)?b.    I upgrade to nightly build of 1390205044000, and then apply this patch?c.    What is the suggested way out for me now?
Seeking your opinion on the above mentioned issues. I heavily rely on Giraph for my work, and I am completely stuck because of the above mentioned issues. Rquesting you to help me in this regard.

-Puneet
      On Monday, November 17, 2014 8:58 AM, Puneet Agarwal <pu...@yahoo.com> wrote:
   

 I have no clue on how to resolve this issue. Any guidance in this situation can be helpful.
Claudio - is there any detailed description of how aggregator works in Giraph in your book?
Getting no response from anyone, should such questions be asked in developers' list?

-PuneetIIT Delhi, India


      On Sunday, November 16, 2014 12:11 PM, Puneet Agarwal <pu...@yahoo.com> wrote:
   

 Hi All,

I have been facing a problem in Giraph for last two weeks. Following is the detailed description, sample code, corresponding log message and a few questions.
I will greatly appreciate any help in this regard.

Code Description:
=================
My algorithm assigns a score to select vertexes of the graph. I have to find the best K(=5) vertexes such that their score is minimum.
I have written a Persistent Aggregator for this which takes Text value. The snippet of the code is given below.

It works fine on my laptop (single worker), but not on the cluster of 4 computers, when running with 10 workers.
I am using Hadoop 1.0.0 and Giraph 1.0.0.


Problem Description:
====================
In the aggregator one of the previously aggregated value suddenly gets lost.
Therefore, my algorithm generates wrong results.

You can observe this in the logs, given below, in following manner
a.    At line number 11, the vertex 8.4793-1 has weight 3.0, and it is the best so far.
b.    At line number 12, the aggregator receives a new value "6.1100-1,6.0", i.e., vertex=6.1100-1 has weight=6.0
c.    At line number 13, in the begining of aggregate method, the output of getAggregatedValue() is also same as newValue.
This means it simply forgets the previously aggregated value, why so? 

I have printed object-ids also, in the log messages, in order to ensure that this is happening in the same object.

Source Code Snippet:
====================

@Override
public void aggregate(Text value) {
    printLogMessage("aggregate", "MyAGG", "Entering - NewValue=", value);
    printLogMessage("aggregate", "MyAGG",
            "Entering - getAggregatedValue()=", getAggregatedValue());

        // Main Logic is here - Omitting for simplicity

    setAggregatedValue(new Text(myTopK));
    printLogMessage("aggregate", "MyAGG", "Exiting AggregatedValue=",
            getAggregatedValue().toString());
}

@Override
public Text createInitialValue() {
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Entering getAggregatedValue()=", getAggregatedValue());
    Text initialValue = getAggregatedValue();
    if (initialValue == null) {
        initialValue = new Text();
    }
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Exiting ReturnedValue=", getAggregatedValue());
    return initialValue;
}
private void printLogMessage(String methodName, String prefix, String msg, Object obj) {
    if (isDebugOn(methodName)) {
        if (obj == null) {
            obj = "";
        }
        System.out.println(java.lang.System.identityHashCode(this) + "--"+ prefix + "==>" + 
                                methodName + "() - " + msg + obj.toString());
    }
}

Printed Log Messages:
=====================
1.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=2
2.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=
3.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=
4.  249768912--MyAGG==>aggregate() - Entering - NewValue=8.4793-1,3.0
5.  249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=8.4793-1,3.0
6.  249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=8.4793-1,3.0
7.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
8.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
9.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=3
10. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
11. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
12. 249768912--MyAGG==>aggregate() - Entering - NewValue=6.1100-1,6.0
13. 249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=6.1100-1,6.0
14. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=6.1100-1,6.0
15. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=6.1100-1,6.0
16. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=6.1100-1,6.0
17. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=4
18. ...
...

My Queries:
===========
1.    Why does the createInitialValue method gets called twice, in the same superstep, on the same object of the aggregator?
2.    Even if its gets called twice, why should it forget the previouly aggregated value?

Please help me resolve this issue. This is a major problem in my work. 
I am not able to proceed further because of this.

Thanks in anticipation.

- Puneet
IIT Delhi, India



    

   

Re: Fw: Persistent Aggregator Forgets Previously Aggregated Value ????

Posted by Puneet Agarwal <pu...@yahoo.com>.
Dear Matthew,
Many thanks. It seem to have worked. I will post detailed analysis later. Hurry ......Thanks a ton. 
( again....  phew... I was stuck on this for long time)
I will try and repay this be explaining how Aggregators work in Giraph, so that nobody else faces this issue in future.

Following is what I understand now.1. There is an aggregator at every worker - Let me call this as local-aggregator
2. There is another aggregator that aggregates the values for every superstep - let me call this ss-aggregator3. There is a global aggregator which remembers the aggregated value from previous superstep. Let me call this master-aggregator.4. There is sometimes one extra and sometimes more than one extra aggregator at every worker, not sure that's their role so let me call them as extra-aggregators.
How I think it is working is:
a. The master-aggregator receives the value aggregated in previous superstep as new value of the method aggregate(). In this method, we can access the  value sent by ss-aggregator by calling getAggregatedValue(). Thus by merging these two values we get finally aggregated value. 

b. local-aggregators send their aggregated values to ss-aggregators. It is hard to figure out, which one out of various aggregators is the ss-aggregator.

c. Not sure what is the role of extra-aggregators.
d. It seems that the ss-aggregators get created in a superstep and get destroyed at the end. They don't live beyond one superstep, but their value get passed to suitable aggregator, to complete the functionality.
Is my understanding right?
Also -->

Can I print the currentSuperstep in aggregate method? ( currentSuperstep= the superstep after which the messages are being aggregated)
Is there any way to access getConf in aggregate method, if yes how?
- Puneet
 

     On Monday, November 17, 2014 2:36 PM, Panagiotis Eustratiadis <ep...@gmail.com> wrote:
   

 Hello Puneet,
I am encountering a very similar problem, and I'm studying it. I will update this thread as soon as I find something. I think it has something to do with the code that initializes the aggregators, but I'm not quite sure.
Regards,Eustratiadis Panagiotis.
On 17 November 2014 05:25, Puneet Agarwal <pu...@yahoo.com> wrote:

I have no clue on how to resolve this issue. Any guidance in this situation can be helpful.
Claudio - is there any detailed description of how aggregator works in Giraph in your book?
Getting no response from anyone, should such questions be asked in developers' list?

-PuneetIIT Delhi, India


      On Sunday, November 16, 2014 12:11 PM, Puneet Agarwal <pu...@yahoo.com> wrote:
   

 Hi All,

I have been facing a problem in Giraph for last two weeks. Following is the detailed description, sample code, corresponding log message and a few questions.
I will greatly appreciate any help in this regard.

Code Description:
=================
My algorithm assigns a score to select vertexes of the graph. I have to find the best K(=5) vertexes such that their score is minimum.
I have written a Persistent Aggregator for this which takes Text value. The snippet of the code is given below.

It works fine on my laptop (single worker), but not on the cluster of 4 computers, when running with 10 workers.
I am using Hadoop 1.0.0 and Giraph 1.0.0.


Problem Description:
====================
In the aggregator one of the previously aggregated value suddenly gets lost.
Therefore, my algorithm generates wrong results.

You can observe this in the logs, given below, in following manner
a.    At line number 11, the vertex 8.4793-1 has weight 3.0, and it is the best so far.
b.    At line number 12, the aggregator receives a new value "6.1100-1,6.0", i.e., vertex=6.1100-1 has weight=6.0
c.    At line number 13, in the begining of aggregate method, the output of getAggregatedValue() is also same as newValue.
This means it simply forgets the previously aggregated value, why so? 

I have printed object-ids also, in the log messages, in order to ensure that this is happening in the same object.

Source Code Snippet:
====================

@Override
public void aggregate(Text value) {
    printLogMessage("aggregate", "MyAGG", "Entering - NewValue=", value);
    printLogMessage("aggregate", "MyAGG",
            "Entering - getAggregatedValue()=", getAggregatedValue());

        // Main Logic is here - Omitting for simplicity

    setAggregatedValue(new Text(myTopK));
    printLogMessage("aggregate", "MyAGG", "Exiting AggregatedValue=",
            getAggregatedValue().toString());
}

@Override
public Text createInitialValue() {
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Entering getAggregatedValue()=", getAggregatedValue());
    Text initialValue = getAggregatedValue();
    if (initialValue == null) {
        initialValue = new Text();
    }
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Exiting ReturnedValue=", getAggregatedValue());
    return initialValue;
}
private void printLogMessage(String methodName, String prefix, String msg, Object obj) {
    if (isDebugOn(methodName)) {
        if (obj == null) {
            obj = "";
        }
        System.out.println(java.lang.System.identityHashCode(this) + "--"+ prefix + "==>" + 
                                methodName + "() - " + msg + obj.toString());
    }
}

Printed Log Messages:
=====================
1.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=2
2.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=
3.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=
4.  249768912--MyAGG==>aggregate() - Entering - NewValue=8.4793-1,3.0
5.  249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=8.4793-1,3.0
6.  249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=8.4793-1,3.0
7.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
8.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
9.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=3
10. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
11. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
12. 249768912--MyAGG==>aggregate() - Entering - NewValue=6.1100-1,6.0
13. 249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=6.1100-1,6.0
14. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=6.1100-1,6.0
15. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=6.1100-1,6.0
16. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=6.1100-1,6.0
17. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=4
18. ...
...

My Queries:
===========
1.    Why does the createInitialValue method gets called twice, in the same superstep, on the same object of the aggregator?
2.    Even if its gets called twice, why should it forget the previouly aggregated value?

Please help me resolve this issue. This is a major problem in my work. 
I am not able to proceed further because of this.

Thanks in anticipation.

- Puneet
IIT Delhi, India



    



   

Re: Fw: Persistent Aggregator Forgets Previously Aggregated Value ????

Posted by Panagiotis Eustratiadis <ep...@gmail.com>.
Hello Puneet,

I am encountering a very similar problem, and I'm studying it. I will
update this thread as soon as I find something. I think it has something to
do with the code that initializes the aggregators, but I'm not quite sure.

Regards,
Eustratiadis Panagiotis.

On 17 November 2014 05:25, Puneet Agarwal <pu...@yahoo.com> wrote:

> I have no clue on how to resolve this issue. Any guidance in this
> situation can be helpful.
>
> Claudio - is there any detailed description of how aggregator works in
> Giraph in your book?
>
> Getting no response from anyone, should such questions be asked in
> developers' list?
>
> -Puneet
> IIT Delhi, India
>
>
>    On Sunday, November 16, 2014 12:11 PM, Puneet Agarwal <
> puagarwal@yahoo.com> wrote:
>
>
> Hi All,
>
> I have been facing a problem in Giraph for last two weeks. Following is
> the detailed description, sample code, corresponding log message and a few
> questions.
> I will greatly appreciate any help in this regard.
>
> Code Description:
> =================
> My algorithm assigns a score to select vertexes of the graph. I have to
> find the best K(=5) vertexes such that their score is minimum.
> I have written a Persistent Aggregator for this which takes Text value.
> The snippet of the code is given below.
>
> It works fine on my laptop (single worker), but not on the cluster of 4
> computers, when running with 10 workers.
> I am using Hadoop 1.0.0 and Giraph 1.0.0.
>
>
> Problem Description:
> ====================
> In the aggregator one of the previously aggregated value suddenly gets
> lost.
> Therefore, my algorithm generates wrong results.
>
> You can observe this in the logs, given below, in following manner
> a.    At line number 11, the vertex 8.4793-1 has weight 3.0, and it is the
> best so far.
> b.    At line number 12, the aggregator receives a new value
> "6.1100-1,6.0", i.e., vertex=6.1100-1 has weight=6.0
> c.    At line number 13, in the begining of aggregate method, the output
> of getAggregatedValue() is also same as newValue.
> This means it simply forgets the previously aggregated value, why so?
>
> I have printed object-ids also, in the log messages, in order to ensure
> that this is happening in the same object.
>
> Source Code Snippet:
> ====================
>
> @Override
> public void aggregate(Text value) {
>     printLogMessage("aggregate", "MyAGG", "Entering - NewValue=", value);
>     printLogMessage("aggregate", "MyAGG",
>             "Entering - getAggregatedValue()=", getAggregatedValue());
>
>         // Main Logic is here - Omitting for simplicity
>
>     setAggregatedValue(new Text(myTopK));
>     printLogMessage("aggregate", "MyAGG", "Exiting AggregatedValue=",
>             getAggregatedValue().toString());
> }
>
> @Override
> public Text createInitialValue() {
>     printLogMessage("=====>createInitialValue", "MyAGG",
>             "Entering getAggregatedValue()=", getAggregatedValue());
>     Text initialValue = getAggregatedValue();
>     if (initialValue == null) {
>         initialValue = new Text();
>     }
>     printLogMessage("=====>createInitialValue", "MyAGG",
>             "Exiting ReturnedValue=", getAggregatedValue());
>     return initialValue;
> }
> private void printLogMessage(String methodName, String prefix, String msg,
> Object obj) {
>     if (isDebugOn(methodName)) {
>         if (obj == null) {
>             obj = "";
>         }
>         System.out.println(java.lang.System.identityHashCode(this) + "--"+
> prefix + "==>" +
>                                 methodName + "() - " + msg +
> obj.toString());
>     }
> }
>
> Printed Log Messages:
> =====================
> 1.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====>
> SupsetStep No.=2
> 2.  249768912--MyAGG==>=====>createInitialValue() - Entering
> getAggregatedValue()=
> 3.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=
> 4.  249768912--MyAGG==>aggregate() - Entering - NewValue=8.4793-1,3.0
> 5.  249768912--MyAGG==>aggregate() - Entering -
> getAggregatedValue()=8.4793-1,3.0
> 6.  249768912--MyAGG==>aggregate() - Received AggregatedValue Again-
> Exiting CurrVal=8.4793-1,3.0
> 7.  249768912--MyAGG==>=====>createInitialValue() - Entering
> getAggregatedValue()=8.4793-1,3.0
> 8.  249768912--MyAGG==>=====>createInitialValue() - Exiting
> ReturnedValue=8.4793-1,3.0
> 9.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====>
> SupsetStep No.=3
> 10. 249768912--MyAGG==>=====>createInitialValue() - Entering
> getAggregatedValue()=8.4793-1,3.0
> 11. 249768912--MyAGG==>=====>createInitialValue() - Exiting
> ReturnedValue=8.4793-1,3.0
> 12. 249768912--MyAGG==>aggregate() - Entering - NewValue=6.1100-1,6.0
> 13. 249768912--MyAGG==>aggregate() - Entering -
> getAggregatedValue()=6.1100-1,6.0
> 14. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again-
> Exiting CurrVal=6.1100-1,6.0
> 15. 249768912--MyAGG==>=====>createInitialValue() - Entering
> getAggregatedValue()=6.1100-1,6.0
> 16. 249768912--MyAGG==>=====>createInitialValue() - Exiting
> ReturnedValue=6.1100-1,6.0
> 17. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====>
> SupsetStep No.=4
> 18. ...
> ...
>
> My Queries:
> ===========
> 1.    Why does the createInitialValue method gets called twice, in the
> same superstep, on the same object of the aggregator?
> 2.    Even if its gets called twice, why should it forget the previouly
> aggregated value?
>
> Please help me resolve this issue. This is a major problem in my work.
> I am not able to proceed further because of this.
>
> Thanks in anticipation.
>
> - Puneet
> IIT Delhi, India
>
>
>
>

Fw: Persistent Aggregator Forgets Previously Aggregated Value ????

Posted by Puneet Agarwal <pu...@yahoo.com>.
I have no clue on how to resolve this issue. Any guidance in this situation can be helpful.
Claudio - is there any detailed description of how aggregator works in Giraph in your book?
Getting no response from anyone, should such questions be asked in developers' list?

-PuneetIIT Delhi, India


      On Sunday, November 16, 2014 12:11 PM, Puneet Agarwal <pu...@yahoo.com> wrote:
   

 Hi All,

I have been facing a problem in Giraph for last two weeks. Following is the detailed description, sample code, corresponding log message and a few questions.
I will greatly appreciate any help in this regard.

Code Description:
=================
My algorithm assigns a score to select vertexes of the graph. I have to find the best K(=5) vertexes such that their score is minimum.
I have written a Persistent Aggregator for this which takes Text value. The snippet of the code is given below.

It works fine on my laptop (single worker), but not on the cluster of 4 computers, when running with 10 workers.
I am using Hadoop 1.0.0 and Giraph 1.0.0.


Problem Description:
====================
In the aggregator one of the previously aggregated value suddenly gets lost.
Therefore, my algorithm generates wrong results.

You can observe this in the logs, given below, in following manner
a.    At line number 11, the vertex 8.4793-1 has weight 3.0, and it is the best so far.
b.    At line number 12, the aggregator receives a new value "6.1100-1,6.0", i.e., vertex=6.1100-1 has weight=6.0
c.    At line number 13, in the begining of aggregate method, the output of getAggregatedValue() is also same as newValue.
This means it simply forgets the previously aggregated value, why so? 

I have printed object-ids also, in the log messages, in order to ensure that this is happening in the same object.

Source Code Snippet:
====================

@Override
public void aggregate(Text value) {
    printLogMessage("aggregate", "MyAGG", "Entering - NewValue=", value);
    printLogMessage("aggregate", "MyAGG",
            "Entering - getAggregatedValue()=", getAggregatedValue());

        // Main Logic is here - Omitting for simplicity

    setAggregatedValue(new Text(myTopK));
    printLogMessage("aggregate", "MyAGG", "Exiting AggregatedValue=",
            getAggregatedValue().toString());
}

@Override
public Text createInitialValue() {
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Entering getAggregatedValue()=", getAggregatedValue());
    Text initialValue = getAggregatedValue();
    if (initialValue == null) {
        initialValue = new Text();
    }
    printLogMessage("=====>createInitialValue", "MyAGG",
            "Exiting ReturnedValue=", getAggregatedValue());
    return initialValue;
}
private void printLogMessage(String methodName, String prefix, String msg, Object obj) {
    if (isDebugOn(methodName)) {
        if (obj == null) {
            obj = "";
        }
        System.out.println(java.lang.System.identityHashCode(this) + "--"+ prefix + "==>" + 
                                methodName + "() - " + msg + obj.toString());
    }
}

Printed Log Messages:
=====================
1.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=2
2.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=
3.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=
4.  249768912--MyAGG==>aggregate() - Entering - NewValue=8.4793-1,3.0
5.  249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=8.4793-1,3.0
6.  249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=8.4793-1,3.0
7.  249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
8.  249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
9.  %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=3
10. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=8.4793-1,3.0
11. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=8.4793-1,3.0
12. 249768912--MyAGG==>aggregate() - Entering - NewValue=6.1100-1,6.0
13. 249768912--MyAGG==>aggregate() - Entering - getAggregatedValue()=6.1100-1,6.0
14. 249768912--MyAGG==>aggregate() - Received AggregatedValue Again- Exiting CurrVal=6.1100-1,6.0
15. 249768912--MyAGG==>=====>createInitialValue() - Entering getAggregatedValue()=6.1100-1,6.0
16. 249768912--MyAGG==>=====>createInitialValue() - Exiting ReturnedValue=6.1100-1,6.0
17. %%%%%%%%%%%%% Entered Master Compute %%%%%%%%%%%%%%%%%%%% ====> SupsetStep No.=4
18. ...
...

My Queries:
===========
1.    Why does the createInitialValue method gets called twice, in the same superstep, on the same object of the aggregator?
2.    Even if its gets called twice, why should it forget the previouly aggregated value?

Please help me resolve this issue. This is a major problem in my work. 
I am not able to proceed further because of this.

Thanks in anticipation.

- Puneet
IIT Delhi, India