You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael Quinlan <mq...@gmail.com> on 2014/01/08 05:32:59 UTC

Spark SequenceFile Java API Repeat Key Values

I've spent some time trying to import data into an RDD using the Spark Java
API, but am not able to properly load data stored in a Hadoop v1.1.1
sequence file with key and value types both LongWritable. I've attached a
copy of the sequence file to this posting. It contains 3000 key, value
pairs. I'm attempting to read using the following code snip:

System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

JavaSparkContext ctx = new JavaSparkContext("local[2]", 
            "AppName",
            "/Users/mquinlan/spark-0.8.0-incubating","jar.name"); 
        
//Load DataCube via Spark sequenceFile
JavaPairRDD<LongWritable,LongWritable> DataCube =
ctx.sequenceFile("/local_filesystem/output.seq", 
            LongWritable.class, LongWritable.class);

The code above produces a DataCube filled with duplicate entries relating in
some way to the number of splits. For example, the last 1500 or so entries
all have the same key and value: (2999,22483). The previous 1500 entries
appear to represent the last key value from first split of the file. I've
confirmed that changing the number of threads (local[3]) does change the RDD
representation, maintaining this general last key value pattern. 

Using the Hadoop (only) API methods, I am able to correctly read the file
even from within the same Jar:

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);        
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
Path("/local_filesystem/output.seq"), conf);
LongWritable key = new LongWritable();
LongWritable value = new LongWritable();
while(reader.next(key, value)) {
     System.out.println(key + ":" + value);
}

I've also confirmed that an RDD populated by the ctx.parallelize() method:

int n=100;
List<LongWritable> tl = new ArrayList<LongWritable>(n);
for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
DataCube = preCube.map(
                new PairFunction<LongWritable,LongWritable,LongWritable> ()
{
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable in) throws Exception {
                        return (new Tuple2(in, in));
                    }
                });

can be written to a sequence file using the RDD method:

DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
LongWritable.class, SequenceFileOutputFormat.class);

and correctly read using the Hadoop (only) API copied above.

It seems like there only a problem when I'm attempting to read the sequence
file directly into the RDD. All other operations are performing as expected. 

I'd greatly appreciate any advice someone could provide.

Regards,

Michael

output.seq
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>   



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark SequenceFile Java API Repeat Key Values

Posted by Michael Quinlan <mq...@gmail.com>.
Matei and Andrew,

Thank you both for your prompt responses. Matei is correct in that I am
attempting to cache a large RDD for repeated query.

I was able to implement your suggestion in a Scala version of the code,
which I've copied below. I should point out two minor details:
LongWritable.clone() is a private method and both the key and value need to
be "cloned" in order for the data to be cached correctly.

My attempt at a Java version wasn't as successful. If you don't mind, could
you please suggest a better way if it currently exists? This is mostly
educational since I already have a working version in Scala. I'm new to
both.

Regards,

Mike

Java:

public class App 
{
    public static void main(String[] args) throws Exception {
        if (args.length < 3) {
          System.err.println("Usage: SynthesisService <master> <input file>
<jar file>");
          System.exit(1);
        }
        
        System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
       
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

        JavaSparkContext ctx = new JavaSparkContext(args[0], 
            "SynthesisService",
            "~/spark-0.8.0-incubating",args[2]); 
        
        //Load DataCube via Spark sequenceFile
        JavaPairRDD<LongWritable,LongWritable> temp_DataCube =
ctx.sequenceFile(args[1], 
            LongWritable.class, LongWritable.class);
        
        JavaRDD<Tuple2&lt;LongWritable,LongWritable>> DataCube;
        DataCube = temp_DataCube.map(
                new
Function2<LongWritable,LongWritable,Tuple2&lt;LongWritable,LongWritable>> ()
{
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable key, LongWritable value) {
                        return (new Tuple2(new LongWritable(key.get()),
value));
                    }
                
                });

-----
COMPILATION ERROR : 
-------------------------------------------------------------
spark/synthesis/service/Init/App.java:[51,32] error: no suitable method
found for map(<anonymous
Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>>>)
1 error

Scala:

package testspark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.KryoRegistrator

import org.apache.hadoop.io.LongWritable

import com.esotericsoftware.kryo.Kryo

class MyRegistrator extends KryoRegistrator{
    def registerClasses(kryo: Kryo){
        kryo.register(classOf[LongWritable]);
        kryo.register(classOf[Tuple2[LongWritable,LongWritable]]);
    }
}

object ScalaSynthesisServer {
	
	def pseudoClone(x: LongWritable, y: LongWritable):
(LongWritable,LongWritable) = {
		return new Tuple2(new LongWritable(x.get()) , new LongWritable(y.get()))
	}
	
	def main(args: Array[String]) {
		if (args.length < 3) {
			System.err.println("Usage: ScalaSynthesisServer <master> <input file>
<jar file>")
			System.exit(1)
		}
		
		System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
		System.setProperty("spark.kryo.registrator","testspark.MyRegistrator")
	
		val sc = new SparkContext(args(0),
"ScalaSynthesisServer","~/spark-0.8.0-incubating",List(args(2)))
		
		val DataCube = sc.sequenceFile(args(1), classOf[LongWritable],
classOf[LongWritable]).map(a => pseudoClone(a._1,a._2))
		
		DataCube.cache()
		
		val list = DataCube.collect();
		
		var x = 0; 
		for( x <- list ){
			println("Key= " + x._1 + " Value= " + x._2);
		}
	}
}




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353p552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark SequenceFile Java API Repeat Key Values

Posted by Andrew Ash <an...@andrewash.com>.
Agreed on the clone by default approach -- this reused object gotcha has
hit several people I know when using Avro.

We should be careful to not ignore the performance impact that made Hadoop
reuse objects in the first place though.  I'm not sure what this means in
practice though, you either clone the objects in Spark or you don't.


On Tue, Jan 7, 2014 at 9:47 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Yup, a) would make it work.
>
> I’d actually prefer that we change it so it clones the objects by default,
> and add a boolean flag (default false) for people who want to reuse
> objects. We’d have to do the same in hadoopRDD and the various versions of
> that as well.
>
> Matei
>
> On Jan 8, 2014, at 12:38 AM, Andrew Ash <an...@andrewash.com> wrote:
>
> Matei, do you mean something like A rather than B below?
>
> A) rdd.map(_.clone).cache
> B) rdd.cache
>
> I'd be happy to add documentation if there's a good place for it, but I'm
> not sure there's an obvious place for it.
>
>
> On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> Yeah, unfortunately sequenceFile() reuses the Writable object across
>> records. If you plan to use each record repeatedly (e.g. cache it), you
>> should clone them using a map function. It was originally designed assuming
>> you only look at each record once, but it’s poorly documented.
>>
>> Matei
>>
>> On Jan 7, 2014, at 11:32 PM, Michael Quinlan <mq...@gmail.com> wrote:
>>
>> > I've spent some time trying to import data into an RDD using the Spark
>> Java
>> > API, but am not able to properly load data stored in a Hadoop v1.1.1
>> > sequence file with key and value types both LongWritable. I've attached
>> a
>> > copy of the sequence file to this posting. It contains 3000 key, value
>> > pairs. I'm attempting to read using the following code snip:
>> >
>> > System.setProperty("spark.serializer",
>> > "org.apache.spark.serializer.KryoSerializer");
>> >
>> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
>> >
>> > JavaSparkContext ctx = new JavaSparkContext("local[2]",
>> >            "AppName",
>> >            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
>> >
>> > //Load DataCube via Spark sequenceFile
>> > JavaPairRDD<LongWritable,LongWritable> DataCube =
>> > ctx.sequenceFile("/local_filesystem/output.seq",
>> >            LongWritable.class, LongWritable.class);
>> >
>> > The code above produces a DataCube filled with duplicate entries
>> relating in
>> > some way to the number of splits. For example, the last 1500 or so
>> entries
>> > all have the same key and value: (2999,22483). The previous 1500 entries
>> > appear to represent the last key value from first split of the file.
>> I've
>> > confirmed that changing the number of threads (local[3]) does change
>> the RDD
>> > representation, maintaining this general last key value pattern.
>> >
>> > Using the Hadoop (only) API methods, I am able to correctly read the
>> file
>> > even from within the same Jar:
>> >
>> > Configuration conf = new Configuration();
>> > FileSystem fs = FileSystem.get(conf);
>> > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
>> > Path("/local_filesystem/output.seq"), conf);
>> > LongWritable key = new LongWritable();
>> > LongWritable value = new LongWritable();
>> > while(reader.next(key, value)) {
>> >     System.out.println(key + ":" + value);
>> > }
>> >
>> > I've also confirmed that an RDD populated by the ctx.parallelize()
>> method:
>> >
>> > int n=100;
>> > List<LongWritable> tl = new ArrayList<LongWritable>(n);
>> > for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
>> > JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
>> > DataCube = preCube.map(
>> >                new PairFunction<LongWritable,LongWritable,LongWritable>
>> ()
>> > {
>> >                    @Override
>> >                    public Tuple2<LongWritable,LongWritable>
>> >                    call(LongWritable in) throws Exception {
>> >                        return (new Tuple2(in, in));
>> >                    }
>> >                });
>> >
>> > can be written to a sequence file using the RDD method:
>> >
>> > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
>> > LongWritable.class, SequenceFileOutputFormat.class);
>> >
>> > and correctly read using the Hadoop (only) API copied above.
>> >
>> > It seems like there only a problem when I'm attempting to read the
>> sequence
>> > file directly into the RDD. All other operations are performing as
>> expected.
>> >
>> > I'd greatly appreciate any advice someone could provide.
>> >
>> > Regards,
>> >
>> > Michael
>> >
>> > output.seq
>> > <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>>
>
>

Re: Spark SequenceFile Java API Repeat Key Values

Posted by Matei Zaharia <ma...@gmail.com>.
Yup, a) would make it work.

I’d actually prefer that we change it so it clones the objects by default, and add a boolean flag (default false) for people who want to reuse objects. We’d have to do the same in hadoopRDD and the various versions of that as well.

Matei

On Jan 8, 2014, at 12:38 AM, Andrew Ash <an...@andrewash.com> wrote:

> Matei, do you mean something like A rather than B below?
> 
> A) rdd.map(_.clone).cache
> B) rdd.cache
> 
> I'd be happy to add documentation if there's a good place for it, but I'm not sure there's an obvious place for it.
> 
> 
> On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.
> 
> Matei
> 
> On Jan 7, 2014, at 11:32 PM, Michael Quinlan <mq...@gmail.com> wrote:
> 
> > I've spent some time trying to import data into an RDD using the Spark Java
> > API, but am not able to properly load data stored in a Hadoop v1.1.1
> > sequence file with key and value types both LongWritable. I've attached a
> > copy of the sequence file to this posting. It contains 3000 key, value
> > pairs. I'm attempting to read using the following code snip:
> >
> > System.setProperty("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer");
> > System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
> >
> > JavaSparkContext ctx = new JavaSparkContext("local[2]",
> >            "AppName",
> >            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
> >
> > //Load DataCube via Spark sequenceFile
> > JavaPairRDD<LongWritable,LongWritable> DataCube =
> > ctx.sequenceFile("/local_filesystem/output.seq",
> >            LongWritable.class, LongWritable.class);
> >
> > The code above produces a DataCube filled with duplicate entries relating in
> > some way to the number of splits. For example, the last 1500 or so entries
> > all have the same key and value: (2999,22483). The previous 1500 entries
> > appear to represent the last key value from first split of the file. I've
> > confirmed that changing the number of threads (local[3]) does change the RDD
> > representation, maintaining this general last key value pattern.
> >
> > Using the Hadoop (only) API methods, I am able to correctly read the file
> > even from within the same Jar:
> >
> > Configuration conf = new Configuration();
> > FileSystem fs = FileSystem.get(conf);
> > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> > Path("/local_filesystem/output.seq"), conf);
> > LongWritable key = new LongWritable();
> > LongWritable value = new LongWritable();
> > while(reader.next(key, value)) {
> >     System.out.println(key + ":" + value);
> > }
> >
> > I've also confirmed that an RDD populated by the ctx.parallelize() method:
> >
> > int n=100;
> > List<LongWritable> tl = new ArrayList<LongWritable>(n);
> > for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> > JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> > DataCube = preCube.map(
> >                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> > {
> >                    @Override
> >                    public Tuple2<LongWritable,LongWritable>
> >                    call(LongWritable in) throws Exception {
> >                        return (new Tuple2(in, in));
> >                    }
> >                });
> >
> > can be written to a sequence file using the RDD method:
> >
> > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> > LongWritable.class, SequenceFileOutputFormat.class);
> >
> > and correctly read using the Hadoop (only) API copied above.
> >
> > It seems like there only a problem when I'm attempting to read the sequence
> > file directly into the RDD. All other operations are performing as expected.
> >
> > I'd greatly appreciate any advice someone could provide.
> >
> > Regards,
> >
> > Michael
> >
> > output.seq
> > <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 


Re: Spark SequenceFile Java API Repeat Key Values

Posted by Andrew Ash <an...@andrewash.com>.
Matei, do you mean something like A rather than B below?

A) rdd.map(_.clone).cache
B) rdd.cache

I'd be happy to add documentation if there's a good place for it, but I'm
not sure there's an obvious place for it.


On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Yeah, unfortunately sequenceFile() reuses the Writable object across
> records. If you plan to use each record repeatedly (e.g. cache it), you
> should clone them using a map function. It was originally designed assuming
> you only look at each record once, but it’s poorly documented.
>
> Matei
>
> On Jan 7, 2014, at 11:32 PM, Michael Quinlan <mq...@gmail.com> wrote:
>
> > I've spent some time trying to import data into an RDD using the Spark
> Java
> > API, but am not able to properly load data stored in a Hadoop v1.1.1
> > sequence file with key and value types both LongWritable. I've attached a
> > copy of the sequence file to this posting. It contains 3000 key, value
> > pairs. I'm attempting to read using the following code snip:
> >
> > System.setProperty("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer");
> >
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
> >
> > JavaSparkContext ctx = new JavaSparkContext("local[2]",
> >            "AppName",
> >            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
> >
> > //Load DataCube via Spark sequenceFile
> > JavaPairRDD<LongWritable,LongWritable> DataCube =
> > ctx.sequenceFile("/local_filesystem/output.seq",
> >            LongWritable.class, LongWritable.class);
> >
> > The code above produces a DataCube filled with duplicate entries
> relating in
> > some way to the number of splits. For example, the last 1500 or so
> entries
> > all have the same key and value: (2999,22483). The previous 1500 entries
> > appear to represent the last key value from first split of the file. I've
> > confirmed that changing the number of threads (local[3]) does change the
> RDD
> > representation, maintaining this general last key value pattern.
> >
> > Using the Hadoop (only) API methods, I am able to correctly read the file
> > even from within the same Jar:
> >
> > Configuration conf = new Configuration();
> > FileSystem fs = FileSystem.get(conf);
> > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> > Path("/local_filesystem/output.seq"), conf);
> > LongWritable key = new LongWritable();
> > LongWritable value = new LongWritable();
> > while(reader.next(key, value)) {
> >     System.out.println(key + ":" + value);
> > }
> >
> > I've also confirmed that an RDD populated by the ctx.parallelize()
> method:
> >
> > int n=100;
> > List<LongWritable> tl = new ArrayList<LongWritable>(n);
> > for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> > JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> > DataCube = preCube.map(
> >                new PairFunction<LongWritable,LongWritable,LongWritable>
> ()
> > {
> >                    @Override
> >                    public Tuple2<LongWritable,LongWritable>
> >                    call(LongWritable in) throws Exception {
> >                        return (new Tuple2(in, in));
> >                    }
> >                });
> >
> > can be written to a sequence file using the RDD method:
> >
> > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> > LongWritable.class, SequenceFileOutputFormat.class);
> >
> > and correctly read using the Hadoop (only) API copied above.
> >
> > It seems like there only a problem when I'm attempting to read the
> sequence
> > file directly into the RDD. All other operations are performing as
> expected.
> >
> > I'd greatly appreciate any advice someone could provide.
> >
> > Regards,
> >
> > Michael
> >
> > output.seq
> > <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>

Re: Spark SequenceFile Java API Repeat Key Values

Posted by Matei Zaharia <ma...@gmail.com>.
Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.

Matei

On Jan 7, 2014, at 11:32 PM, Michael Quinlan <mq...@gmail.com> wrote:

> I've spent some time trying to import data into an RDD using the Spark Java
> API, but am not able to properly load data stored in a Hadoop v1.1.1
> sequence file with key and value types both LongWritable. I've attached a
> copy of the sequence file to this posting. It contains 3000 key, value
> pairs. I'm attempting to read using the following code snip:
> 
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
> 
> JavaSparkContext ctx = new JavaSparkContext("local[2]", 
>            "AppName",
>            "/Users/mquinlan/spark-0.8.0-incubating","jar.name"); 
> 
> //Load DataCube via Spark sequenceFile
> JavaPairRDD<LongWritable,LongWritable> DataCube =
> ctx.sequenceFile("/local_filesystem/output.seq", 
>            LongWritable.class, LongWritable.class);
> 
> The code above produces a DataCube filled with duplicate entries relating in
> some way to the number of splits. For example, the last 1500 or so entries
> all have the same key and value: (2999,22483). The previous 1500 entries
> appear to represent the last key value from first split of the file. I've
> confirmed that changing the number of threads (local[3]) does change the RDD
> representation, maintaining this general last key value pattern. 
> 
> Using the Hadoop (only) API methods, I am able to correctly read the file
> even from within the same Jar:
> 
> Configuration conf = new Configuration();
> FileSystem fs = FileSystem.get(conf);        
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> Path("/local_filesystem/output.seq"), conf);
> LongWritable key = new LongWritable();
> LongWritable value = new LongWritable();
> while(reader.next(key, value)) {
>     System.out.println(key + ":" + value);
> }
> 
> I've also confirmed that an RDD populated by the ctx.parallelize() method:
> 
> int n=100;
> List<LongWritable> tl = new ArrayList<LongWritable>(n);
> for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> DataCube = preCube.map(
>                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> {
>                    @Override
>                    public Tuple2<LongWritable,LongWritable> 
>                    call(LongWritable in) throws Exception {
>                        return (new Tuple2(in, in));
>                    }
>                });
> 
> can be written to a sequence file using the RDD method:
> 
> DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> LongWritable.class, SequenceFileOutputFormat.class);
> 
> and correctly read using the Hadoop (only) API copied above.
> 
> It seems like there only a problem when I'm attempting to read the sequence
> file directly into the RDD. All other operations are performing as expected. 
> 
> I'd greatly appreciate any advice someone could provide.
> 
> Regards,
> 
> Michael
> 
> output.seq
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>   
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.