You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Warfish <se...@gmail.com> on 2015/08/06 10:41:19 UTC

Enum values in custom objects mess up RDD operations

Hi everyone,

I was working with Spark for a little while now and have encountered a very
strange behaviour that caused me a lot of headaches:

I have written my own POJOs to encapsulate my data and this data is held in
some JavaRDDs. Part of these POJOs is a member variable of a custom enum
type. Whenever I do some operations on these RDDs such as subtract,
groupByKey, reduce or similar things, the results are inconsistent and
non-sensical. However, this happens only when the application runs in
standalone cluster mode (10 nodes). When running locally on my developer
machine, the code executes just fine. If you want to reproduce this
behaviour,  here
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip>  
is the complete Maven project that you can run out of the box. I am running
Spark 1.4.0 and submitting the application using 
/usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



Consider the following code for my custom object:


package de.spark.test;

import java.io.Serializable;
import java.util.Objects;

public class MyObject implements Serializable {

    private MyEnum myEnum;

    public MyObject(MyEnum myEnum) {
        this.myEnum = myEnum;
    }

    public MyEnum getMyEnum() {
        return myEnum;
    }

    public void setMyEnum(MyEnum myEnum) {
        this.myEnum = myEnum;
    }

    @Override
    public int hashCode() {
        int hash = 5;
        hash = 41 * hash + Objects.hashCode(this.myEnum);
        return hash;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final MyObject other = (MyObject) obj;
        if (this.myEnum != other.myEnum) {
            return false;
        }
        return true;
    }
    
    @Override
    public String toString() {
        return "MyObject{" + "myEnum=" + myEnum + '}';
    }

}


As you can see, I have overriden equals() and hashCode() (both are
auto-generated). The enum is given as follows:


package de.spark.test;

import java.io.Serializable;

public enum MyEnum implements Serializable {
  VALUE1, VALUE2
}


The main() method is defined by:


package de.spark.test;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class Main {

  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Spark Test")
                                    .setMaster("myMaster");

    JavaSparkContext jsc = new JavaSparkContext(conf);

    System.out.println("///////////////////////////////////////////////////
Object generation");

    List<MyObject> l1 = new ArrayList<>();
    
    for(int i = 0; i < 1000; i++) {
        l1.add(new MyObject(MyEnum.VALUE1));
    }
    
    JavaRDD<MyObject> myObjectRDD1 = jsc.parallelize(l1);
    JavaRDD<MyObject> myObjectRDD2 = jsc.parallelize(l1);

    System.out.println("myObjectRDD1 count          = " +
myObjectRDD1.count());
    System.out.println("myObjectRDD2 count          = " +
myObjectRDD2.count());
    
    System.out.println("///////////////////////////////////////////////////
Distinct");
    
    JavaRDD<MyObject> myObjectRDD1Distinct = myObjectRDD1.distinct();
    JavaRDD<MyObject> myObjectRDD2Distinct = myObjectRDD2.distinct();
    
    System.out.println("myObjectRDD1Distinct count  = " +
myObjectRDD1Distinct.count());
    System.out.println("myObjectRDD2Distinct count  = " +
myObjectRDD2Distinct.count());
    
    System.out.println("///////////////////////////////////////////////////
Subtract");

    JavaRDD<MyObject> myObjectRDD1Minus1 =
myObjectRDD1.subtract(myObjectRDD1);
    JavaRDD<MyObject> myObjectRDD1Minus2 =
myObjectRDD1.subtract(myObjectRDD2);
    JavaRDD<MyObject> myObjectRDD2Minus1 =
myObjectRDD2.subtract(myObjectRDD1);

    System.out.println("myObjectRDD1Minus1 count    = " +
myObjectRDD1Minus1.count());
    System.out.println("myObjectRDD1Minus2 count    = " +
myObjectRDD1Minus2.count());
    System.out.println("myObjectRDD2Minus1 count    = " +
myObjectRDD2Minus1.count());
    
    System.out.println("///////////////////////////////////////////////////
End");
  }
  
}


Both RDDs contain 1000 exactly equal objects, one would expect each call of
distinct() to result in 1 and subtract(JavaRDD<MyObject>) to result in empty
RDDs. However here is some sample output:


/////////////////////////////////////////////////// Object generation
myObjectRDD1 count          = 1000
myObjectRDD2 count          = 1000
/////////////////////////////////////////////////// Distinct
myObjectRDD1Distinct count  = 1
myObjectRDD2Distinct count  = 2
/////////////////////////////////////////////////// Subtract
myObjectRDD1Minus1 count    = 500
myObjectRDD1Minus2 count    = 0
myObjectRDD2Minus1 count    = 0
/////////////////////////////////////////////////// End


And this is a new run, directly following the previous one:

/////////////////////////////////////////////////// Object generation
myObjectRDD1 count          = 1000
myObjectRDD2 count          = 1000
/////////////////////////////////////////////////// Distinct
myObjectRDD1Distinct count  = 2
myObjectRDD2Distinct count  = 1
/////////////////////////////////////////////////// Subtract
myObjectRDD1Minus1 count    = 500
myObjectRDD1Minus2 count    = 500
myObjectRDD2Minus1 count    = 0
/////////////////////////////////////////////////// End


Some thoughts/observations: As soon as I take the enum value out of the
hashCode() function of MyObject, the code works just fine, i.e. the new
hashCode() function becomes

    @Override
    public int hashCode() {
        int hash = 5;
//        hash = 41 * hash + Objects.hashCode(this.myEnum);
        return hash;
    }

Additionally, the code executes fine on a local machine and only behaves
strangely on a cluster. These two observations make me believe that Spark
uses the hashCode of each object to distribute the objects between worker
nodes and somehow the enum value results in inconsistent hash codes.

Can someone help me out here?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Enum-values-in-custom-objects-mess-up-RDD-operations-tp24149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Enum values in custom objects mess up RDD operations

Posted by Sebastian Kalix <se...@gmail.com>.
Thanks a lot Igor,

the following hashCode function is stable:

    @Override
    public int hashCode() {
        int hash = 5;
        hash = 41 * hash + this.myEnum.ordinal();
        return hash;
    }

For anyone having the same problem:
http://tech.technoflirt.com/2014/08/21/issues-with-java-enum-hashcode/


Cheers,

Sebastian

Igor Berman <ig...@gmail.com> schrieb am Do., 6. Aug. 2015 um
10:59 Uhr:

> enums hashcode is jvm instance specific(ie. different jvms will give you
> different values), so  you can use ordinal in hashCode computation or use
> hashCode on enums ordinal as part of hashCode computation
>
> On 6 August 2015 at 11:41, Warfish <se...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I was working with Spark for a little while now and have encountered a
>> very
>> strange behaviour that caused me a lot of headaches:
>>
>> I have written my own POJOs to encapsulate my data and this data is held
>> in
>> some JavaRDDs. Part of these POJOs is a member variable of a custom enum
>> type. Whenever I do some operations on these RDDs such as subtract,
>> groupByKey, reduce or similar things, the results are inconsistent and
>> non-sensical. However, this happens only when the application runs in
>> standalone cluster mode (10 nodes). When running locally on my developer
>> machine, the code executes just fine. If you want to reproduce this
>> behaviour,  here
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
>> >
>> is the complete Maven project that you can run out of the box. I am
>> running
>> Spark 1.4.0 and submitting the application using
>> /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
>> de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar
>>
>>
>>
>> Consider the following code for my custom object:
>>
>>
>> package de.spark.test;
>>
>> import java.io.Serializable;
>> import java.util.Objects;
>>
>> public class MyObject implements Serializable {
>>
>>     private MyEnum myEnum;
>>
>>     public MyObject(MyEnum myEnum) {
>>         this.myEnum = myEnum;
>>     }
>>
>>     public MyEnum getMyEnum() {
>>         return myEnum;
>>     }
>>
>>     public void setMyEnum(MyEnum myEnum) {
>>         this.myEnum = myEnum;
>>     }
>>
>>     @Override
>>     public int hashCode() {
>>         int hash = 5;
>>         hash = 41 * hash + Objects.hashCode(this.myEnum);
>>         return hash;
>>     }
>>
>>     @Override
>>     public boolean equals(Object obj) {
>>         if (obj == null) {
>>             return false;
>>         }
>>         if (getClass() != obj.getClass()) {
>>             return false;
>>         }
>>         final MyObject other = (MyObject) obj;
>>         if (this.myEnum != other.myEnum) {
>>             return false;
>>         }
>>         return true;
>>     }
>>
>>     @Override
>>     public String toString() {
>>         return "MyObject{" + "myEnum=" + myEnum + '}';
>>     }
>>
>> }
>>
>>
>> As you can see, I have overriden equals() and hashCode() (both are
>> auto-generated). The enum is given as follows:
>>
>>
>> package de.spark.test;
>>
>> import java.io.Serializable;
>>
>> public enum MyEnum implements Serializable {
>>   VALUE1, VALUE2
>> }
>>
>>
>> The main() method is defined by:
>>
>>
>> package de.spark.test;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import org.apache.spark.SparkConf;
>> import org.apache.spark.api.java.JavaRDD;
>> import org.apache.spark.api.java.JavaSparkContext;
>>
>> public class Main {
>>
>>   public static void main(String[] args) {
>>     SparkConf conf = new SparkConf().setAppName("Spark Test")
>>                                     .setMaster("myMaster");
>>
>>     JavaSparkContext jsc = new JavaSparkContext(conf);
>>
>>
>> System.out.println("///////////////////////////////////////////////////
>> Object generation");
>>
>>     List<MyObject> l1 = new ArrayList<>();
>>
>>     for(int i = 0; i < 1000; i++) {
>>         l1.add(new MyObject(MyEnum.VALUE1));
>>     }
>>
>>     JavaRDD<MyObject> myObjectRDD1 = jsc.parallelize(l1);
>>     JavaRDD<MyObject> myObjectRDD2 = jsc.parallelize(l1);
>>
>>     System.out.println("myObjectRDD1 count          = " +
>> myObjectRDD1.count());
>>     System.out.println("myObjectRDD2 count          = " +
>> myObjectRDD2.count());
>>
>>
>> System.out.println("///////////////////////////////////////////////////
>> Distinct");
>>
>>     JavaRDD<MyObject> myObjectRDD1Distinct = myObjectRDD1.distinct();
>>     JavaRDD<MyObject> myObjectRDD2Distinct = myObjectRDD2.distinct();
>>
>>     System.out.println("myObjectRDD1Distinct count  = " +
>> myObjectRDD1Distinct.count());
>>     System.out.println("myObjectRDD2Distinct count  = " +
>> myObjectRDD2Distinct.count());
>>
>>
>> System.out.println("///////////////////////////////////////////////////
>> Subtract");
>>
>>     JavaRDD<MyObject> myObjectRDD1Minus1 =
>> myObjectRDD1.subtract(myObjectRDD1);
>>     JavaRDD<MyObject> myObjectRDD1Minus2 =
>> myObjectRDD1.subtract(myObjectRDD2);
>>     JavaRDD<MyObject> myObjectRDD2Minus1 =
>> myObjectRDD2.subtract(myObjectRDD1);
>>
>>     System.out.println("myObjectRDD1Minus1 count    = " +
>> myObjectRDD1Minus1.count());
>>     System.out.println("myObjectRDD1Minus2 count    = " +
>> myObjectRDD1Minus2.count());
>>     System.out.println("myObjectRDD2Minus1 count    = " +
>> myObjectRDD2Minus1.count());
>>
>>
>> System.out.println("///////////////////////////////////////////////////
>> End");
>>   }
>>
>> }
>>
>>
>> Both RDDs contain 1000 exactly equal objects, one would expect each call
>> of
>> distinct() to result in 1 and subtract(JavaRDD<MyObject>) to result in
>> empty
>> RDDs. However here is some sample output:
>>
>>
>> /////////////////////////////////////////////////// Object generation
>> myObjectRDD1 count          = 1000
>> myObjectRDD2 count          = 1000
>> /////////////////////////////////////////////////// Distinct
>> myObjectRDD1Distinct count  = 1
>> myObjectRDD2Distinct count  = 2
>> /////////////////////////////////////////////////// Subtract
>> myObjectRDD1Minus1 count    = 500
>> myObjectRDD1Minus2 count    = 0
>> myObjectRDD2Minus1 count    = 0
>> /////////////////////////////////////////////////// End
>>
>>
>> And this is a new run, directly following the previous one:
>>
>> /////////////////////////////////////////////////// Object generation
>> myObjectRDD1 count          = 1000
>> myObjectRDD2 count          = 1000
>> /////////////////////////////////////////////////// Distinct
>> myObjectRDD1Distinct count  = 2
>> myObjectRDD2Distinct count  = 1
>> /////////////////////////////////////////////////// Subtract
>> myObjectRDD1Minus1 count    = 500
>> myObjectRDD1Minus2 count    = 500
>> myObjectRDD2Minus1 count    = 0
>> /////////////////////////////////////////////////// End
>>
>>
>> Some thoughts/observations: As soon as I take the enum value out of the
>> hashCode() function of MyObject, the code works just fine, i.e. the new
>> hashCode() function becomes
>>
>>     @Override
>>     public int hashCode() {
>>         int hash = 5;
>> //        hash = 41 * hash + Objects.hashCode(this.myEnum);
>>         return hash;
>>     }
>>
>> Additionally, the code executes fine on a local machine and only behaves
>> strangely on a cluster. These two observations make me believe that Spark
>> uses the hashCode of each object to distribute the objects between worker
>> nodes and somehow the enum value results in inconsistent hash codes.
>>
>> Can someone help me out here?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Enum-values-in-custom-objects-mess-up-RDD-operations-tp24149.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Enum values in custom objects mess up RDD operations

Posted by Igor Berman <ig...@gmail.com>.
enums hashcode is jvm instance specific(ie. different jvms will give you
different values), so  you can use ordinal in hashCode computation or use
hashCode on enums ordinal as part of hashCode computation

On 6 August 2015 at 11:41, Warfish <se...@gmail.com> wrote:

> Hi everyone,
>
> I was working with Spark for a little while now and have encountered a very
> strange behaviour that caused me a lot of headaches:
>
> I have written my own POJOs to encapsulate my data and this data is held in
> some JavaRDDs. Part of these POJOs is a member variable of a custom enum
> type. Whenever I do some operations on these RDDs such as subtract,
> groupByKey, reduce or similar things, the results are inconsistent and
> non-sensical. However, this happens only when the application runs in
> standalone cluster mode (10 nodes). When running locally on my developer
> machine, the code executes just fine. If you want to reproduce this
> behaviour,  here
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
> >
> is the complete Maven project that you can run out of the box. I am running
> Spark 1.4.0 and submitting the application using
> /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
> de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar
>
>
>
> Consider the following code for my custom object:
>
>
> package de.spark.test;
>
> import java.io.Serializable;
> import java.util.Objects;
>
> public class MyObject implements Serializable {
>
>     private MyEnum myEnum;
>
>     public MyObject(MyEnum myEnum) {
>         this.myEnum = myEnum;
>     }
>
>     public MyEnum getMyEnum() {
>         return myEnum;
>     }
>
>     public void setMyEnum(MyEnum myEnum) {
>         this.myEnum = myEnum;
>     }
>
>     @Override
>     public int hashCode() {
>         int hash = 5;
>         hash = 41 * hash + Objects.hashCode(this.myEnum);
>         return hash;
>     }
>
>     @Override
>     public boolean equals(Object obj) {
>         if (obj == null) {
>             return false;
>         }
>         if (getClass() != obj.getClass()) {
>             return false;
>         }
>         final MyObject other = (MyObject) obj;
>         if (this.myEnum != other.myEnum) {
>             return false;
>         }
>         return true;
>     }
>
>     @Override
>     public String toString() {
>         return "MyObject{" + "myEnum=" + myEnum + '}';
>     }
>
> }
>
>
> As you can see, I have overriden equals() and hashCode() (both are
> auto-generated). The enum is given as follows:
>
>
> package de.spark.test;
>
> import java.io.Serializable;
>
> public enum MyEnum implements Serializable {
>   VALUE1, VALUE2
> }
>
>
> The main() method is defined by:
>
>
> package de.spark.test;
>
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
>
> public class Main {
>
>   public static void main(String[] args) {
>     SparkConf conf = new SparkConf().setAppName("Spark Test")
>                                     .setMaster("myMaster");
>
>     JavaSparkContext jsc = new JavaSparkContext(conf);
>
>     System.out.println("///////////////////////////////////////////////////
> Object generation");
>
>     List<MyObject> l1 = new ArrayList<>();
>
>     for(int i = 0; i < 1000; i++) {
>         l1.add(new MyObject(MyEnum.VALUE1));
>     }
>
>     JavaRDD<MyObject> myObjectRDD1 = jsc.parallelize(l1);
>     JavaRDD<MyObject> myObjectRDD2 = jsc.parallelize(l1);
>
>     System.out.println("myObjectRDD1 count          = " +
> myObjectRDD1.count());
>     System.out.println("myObjectRDD2 count          = " +
> myObjectRDD2.count());
>
>     System.out.println("///////////////////////////////////////////////////
> Distinct");
>
>     JavaRDD<MyObject> myObjectRDD1Distinct = myObjectRDD1.distinct();
>     JavaRDD<MyObject> myObjectRDD2Distinct = myObjectRDD2.distinct();
>
>     System.out.println("myObjectRDD1Distinct count  = " +
> myObjectRDD1Distinct.count());
>     System.out.println("myObjectRDD2Distinct count  = " +
> myObjectRDD2Distinct.count());
>
>     System.out.println("///////////////////////////////////////////////////
> Subtract");
>
>     JavaRDD<MyObject> myObjectRDD1Minus1 =
> myObjectRDD1.subtract(myObjectRDD1);
>     JavaRDD<MyObject> myObjectRDD1Minus2 =
> myObjectRDD1.subtract(myObjectRDD2);
>     JavaRDD<MyObject> myObjectRDD2Minus1 =
> myObjectRDD2.subtract(myObjectRDD1);
>
>     System.out.println("myObjectRDD1Minus1 count    = " +
> myObjectRDD1Minus1.count());
>     System.out.println("myObjectRDD1Minus2 count    = " +
> myObjectRDD1Minus2.count());
>     System.out.println("myObjectRDD2Minus1 count    = " +
> myObjectRDD2Minus1.count());
>
>     System.out.println("///////////////////////////////////////////////////
> End");
>   }
>
> }
>
>
> Both RDDs contain 1000 exactly equal objects, one would expect each call of
> distinct() to result in 1 and subtract(JavaRDD<MyObject>) to result in
> empty
> RDDs. However here is some sample output:
>
>
> /////////////////////////////////////////////////// Object generation
> myObjectRDD1 count          = 1000
> myObjectRDD2 count          = 1000
> /////////////////////////////////////////////////// Distinct
> myObjectRDD1Distinct count  = 1
> myObjectRDD2Distinct count  = 2
> /////////////////////////////////////////////////// Subtract
> myObjectRDD1Minus1 count    = 500
> myObjectRDD1Minus2 count    = 0
> myObjectRDD2Minus1 count    = 0
> /////////////////////////////////////////////////// End
>
>
> And this is a new run, directly following the previous one:
>
> /////////////////////////////////////////////////// Object generation
> myObjectRDD1 count          = 1000
> myObjectRDD2 count          = 1000
> /////////////////////////////////////////////////// Distinct
> myObjectRDD1Distinct count  = 2
> myObjectRDD2Distinct count  = 1
> /////////////////////////////////////////////////// Subtract
> myObjectRDD1Minus1 count    = 500
> myObjectRDD1Minus2 count    = 500
> myObjectRDD2Minus1 count    = 0
> /////////////////////////////////////////////////// End
>
>
> Some thoughts/observations: As soon as I take the enum value out of the
> hashCode() function of MyObject, the code works just fine, i.e. the new
> hashCode() function becomes
>
>     @Override
>     public int hashCode() {
>         int hash = 5;
> //        hash = 41 * hash + Objects.hashCode(this.myEnum);
>         return hash;
>     }
>
> Additionally, the code executes fine on a local machine and only behaves
> strangely on a cluster. These two observations make me believe that Spark
> uses the hashCode of each object to distribute the objects between worker
> nodes and somehow the enum value results in inconsistent hash codes.
>
> Can someone help me out here?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Enum-values-in-custom-objects-mess-up-RDD-operations-tp24149.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>