You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Matt Cheah <mc...@palantir.com> on 2013/10/22 21:28:43 UTC

Visitor function to RDD elements

Hi everyone,

I have a driver holding a reference to an RDD. The driver would like to "visit" each item in the RDD in order, say with a visitor object that invokes visit(item) to modify that visitor's internal state. The visiting is not commutative (e.g. Visiting item A then B makes a different internal state from visiting item B then item A). Items in the RDD also are not necessarily distinct.

I've looked into accumulators which don't work because they require the operation to be commutative. Collect() will not work because the RDD is too large; in general, bringing the whole RDD into one partition won't work since the RDD is too large.

Is it possible to iterate over the items in an RDD in order without bringing the entire dataset into a single JVM at a time, and/or obtain chunks of the RDD in order on the driver? We've tried using the internal iterator() method. In some cases, we get a stack trace (running locally with 3 threads). I've included the stack trace below.

Thanks,

-Matt Cheah

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
at com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
at com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000] milliseconds
at org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
at org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
... 46 more


Re: Visitor function to RDD elements

Posted by Aaron Davidson <il...@gmail.com>.
I may be incorrect on this, but I believe that coalesce() could work here.
A single partition can be larger than memory, as long as no one tries to
load the entire partition into memory. Trouble should only arise if your
RDD operator tries to create an in-memory map of the entire partition
(e.g., reduceByKey). map(), however, should (theoretically) work in a
streaming manner over an arbitrarily large partition.


On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <mc...@palantir.com> wrote:

>  In this context, it would be able to create a visitor mapping for each
> partition. However, I'm looking for the ability to use a single visitor
> object that will walk over all partitions.
>
>  I suppose I could do this if I used coalesce() to combine everything to
> one partition but that's too much memory in one partition. Am I
> misinterpreting how to use it?
>
>   From: Mark Hamstra <ma...@clearstorydata.com>
> Reply-To: "user@spark.incubator.apache.org" <
> user@spark.incubator.apache.org>
> Date: Tuesday, October 22, 2013 12:51 PM
> To: user <us...@spark.incubator.apache.org>
> Subject: Re: Visitor function to RDD elements
>
>   mapPartitions
> mapPartitionsWithIndex
>
>  With care, you can use these and maintain the iteration order within
> partitions.  Beware, though, that any reduce functions need to be
> associative and commutative.
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:
>
>>  Hi everyone,
>>
>>  I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>>  I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>>  Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>>  org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>> at
>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>> at
>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at
>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> at
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [10000] milliseconds
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>> ... 46 more
>>
>>
>

Re: Visitor function to RDD elements

Posted by Mark Hamstra <ma...@clearstorydata.com>.
Completely serializing an operation over all of the elements of an RDD
really goes against the grain of how Spark wants to operate.  With
mapPartitionsWithIndex you can at least know which partition you are
working with in your iterator function.  By using multiple passes over the
data, handling as much independent parallel processing of each partition as
you can in a given pass and then communicating the per partition results of
the pass to the other partitions to handle any needed
cross-partition/cross-index processing/healing/merging, you can effect
something like serialized operations.


On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <mc...@palantir.com> wrote:

>  In this context, it would be able to create a visitor mapping for each
> partition. However, I'm looking for the ability to use a single visitor
> object that will walk over all partitions.
>
>  I suppose I could do this if I used coalesce() to combine everything to
> one partition but that's too much memory in one partition. Am I
> misinterpreting how to use it?
>
>   From: Mark Hamstra <ma...@clearstorydata.com>
> Reply-To: "user@spark.incubator.apache.org" <
> user@spark.incubator.apache.org>
> Date: Tuesday, October 22, 2013 12:51 PM
> To: user <us...@spark.incubator.apache.org>
> Subject: Re: Visitor function to RDD elements
>
>   mapPartitions
> mapPartitionsWithIndex
>
>  With care, you can use these and maintain the iteration order within
> partitions.  Beware, though, that any reduce functions need to be
> associative and commutative.
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:
>
>>  Hi everyone,
>>
>>  I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>>  I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>>  Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>>  org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>> at
>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>> at
>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at
>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> at
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [10000] milliseconds
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>> ... 46 more
>>
>>
>

Re: Visitor function to RDD elements

Posted by Christopher Nguyen <ct...@adatao.com>.
Ah, this is a slightly different problem statement, in that you may still
gain in taking advantage of Spark's parallelization for the data
transformation.

If you want to avoid the serdes+network overhead of sending the results
back to the driver, and instead have a consumer/sink to send the result set
to, you might consider having a single reducer that streams the rows out to
a local temporary file(s), then have the final reduce send (or trigger an
external send) of that result set to your consumer. 2GB files are fairly
small relative to TB disk sizes, and can easily stream within 10+ seconds
for 100MB/s local disk or network bandwidths.

If the original transformation would have taken minutes or longer
sequentially then this approach is still a win in performance.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 3:48 PM, Matt Cheah <mc...@palantir.com> wrote:

>  Thanks everyone – I think we're going to go with collect() and kick out
> things that attempt to obtain overly large sets.
>
>  However, I think my original concern still stands. Some reading online
> shows that Microsoft Excel, for example, supports displaying something on
> the order of 2-4 GB sized spreadsheets (
> http://social.technet.microsoft.com/Forums/office/en-US/60bf34fb-5f02-483a-a54b-645cc810b30f/excel-2013-file-size-limits-powerpivot?forum=officeitpro).
> If there is a 2GB RDD however streaming it all back to the driver seems
> wasteful where in reality we could fetch chunks of it at a time and load
> only parts in driver memory, as opposed to using 2GB of RAM on the driver.
> In fact I don't know what the maximum frame size that can be set would be
> via spark.akka.framesize.
>
>  -Matt Cheah
>
>   From: Mark Hamstra <ma...@clearstorydata.com>
> Reply-To: "user@spark.incubator.apache.org" <
> user@spark.incubator.apache.org>
> Date: Tuesday, October 22, 2013 3:32 PM
> To: user <us...@spark.incubator.apache.org>
>
> Subject: Re: Visitor function to RDD elements
>
>   Correct; that's the completely degenerate case where you can't do
> anything in parallel.  Often you'll also want your iterator function to
> send back some information to an accumulator (perhaps just the result
> calculated with the last element of the partition) which is then fed back
> into the operation on the next partition as either a broadcast variable or
> part of the closure.
>
>
>
> On Tue, Oct 22, 2013 at 3:25 PM, Nathan Kronenfeld <
> nkronenfeld@oculusinfo.com> wrote:
>
>> You shouldn't have to fly data around
>>
>>  You can just run it first on partition 0, then on partition 1, etc...
>>  I may have the name slightly off, but something approximately like:
>>
>>  for (p <- 0 until numPartitions)
>>   data.mapPartitionsWithIndex((i, iter) => if (0 == p) iter.map(fcn) else
>> List().iterator)
>>
>>  should work... BUT that being said, you've now really lost the point of
>> using Spark to begin with.
>>
>>
>

Re: Visitor function to RDD elements

Posted by Matt Cheah <mc...@palantir.com>.
(The link I provided there isn't a good source… seems like MSFT still screws this up anyways =P but they used to support it!)

From: Andrew Winings <mc...@palantir.com>>
Reply-To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Date: Tuesday, October 22, 2013 3:48 PM
To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Subject: Re: Visitor function to RDD elements

Thanks everyone – I think we're going to go with collect() and kick out things that attempt to obtain overly large sets.

However, I think my original concern still stands. Some reading online shows that Microsoft Excel, for example, supports displaying something on the order of 2-4 GB sized spreadsheets (http://social.technet.microsoft.com/Forums/office/en-US/60bf34fb-5f02-483a-a54b-645cc810b30f/excel-2013-file-size-limits-powerpivot?forum=officeitpro<https://urldefense.proofpoint.com/v1/url?u=http://social.technet.microsoft.com/Forums/office/en-US/60bf34fb-5f02-483a-a54b-645cc810b30f/excel-2013-file-size-limits-powerpivot?forum%3Dofficeitpro&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=EixrjRqv7AtWjBhWC9vqdJDp8g9%2FmILr%2F%2FuacHpwGBE%3D%0A&s=150a1f682e918b2b1c6a09c6fcf933b505a35313662aff14876b87c759f56317>). If there is a 2GB RDD however streaming it all back to the driver seems wasteful where in reality we could fetch chunks of it at a time and load only parts in driver memory, as opposed to using 2GB of RAM on the driver. In fact I don't know what the maximum frame size that can be set would be via spark.akka.framesize.

-Matt Cheah

From: Mark Hamstra <ma...@clearstorydata.com>>
Reply-To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Date: Tuesday, October 22, 2013 3:32 PM
To: user <us...@spark.incubator.apache.org>>
Subject: Re: Visitor function to RDD elements

Correct; that's the completely degenerate case where you can't do anything in parallel.  Often you'll also want your iterator function to send back some information to an accumulator (perhaps just the result calculated with the last element of the partition) which is then fed back into the operation on the next partition as either a broadcast variable or part of the closure.



On Tue, Oct 22, 2013 at 3:25 PM, Nathan Kronenfeld <nk...@oculusinfo.com>> wrote:
You shouldn't have to fly data around

You can just run it first on partition 0, then on partition 1, etc...  I may have the name slightly off, but something approximately like:

for (p <- 0 until numPartitions)
  data.mapPartitionsWithIndex((i, iter) => if (0 == p) iter.map(fcn) else List().iterator)

should work... BUT that being said, you've now really lost the point of using Spark to begin with.



Re: Visitor function to RDD elements

Posted by Matt Cheah <mc...@palantir.com>.
Thanks everyone – I think we're going to go with collect() and kick out things that attempt to obtain overly large sets.

However, I think my original concern still stands. Some reading online shows that Microsoft Excel, for example, supports displaying something on the order of 2-4 GB sized spreadsheets (http://social.technet.microsoft.com/Forums/office/en-US/60bf34fb-5f02-483a-a54b-645cc810b30f/excel-2013-file-size-limits-powerpivot?forum=officeitpro). If there is a 2GB RDD however streaming it all back to the driver seems wasteful where in reality we could fetch chunks of it at a time and load only parts in driver memory, as opposed to using 2GB of RAM on the driver. In fact I don't know what the maximum frame size that can be set would be via spark.akka.framesize.

-Matt Cheah

From: Mark Hamstra <ma...@clearstorydata.com>>
Reply-To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Date: Tuesday, October 22, 2013 3:32 PM
To: user <us...@spark.incubator.apache.org>>
Subject: Re: Visitor function to RDD elements

Correct; that's the completely degenerate case where you can't do anything in parallel.  Often you'll also want your iterator function to send back some information to an accumulator (perhaps just the result calculated with the last element of the partition) which is then fed back into the operation on the next partition as either a broadcast variable or part of the closure.



On Tue, Oct 22, 2013 at 3:25 PM, Nathan Kronenfeld <nk...@oculusinfo.com>> wrote:
You shouldn't have to fly data around

You can just run it first on partition 0, then on partition 1, etc...  I may have the name slightly off, but something approximately like:

for (p <- 0 until numPartitions)
  data.mapPartitionsWithIndex((i, iter) => if (0 == p) iter.map(fcn) else List().iterator)

should work... BUT that being said, you've now really lost the point of using Spark to begin with.



Re: Visitor function to RDD elements

Posted by Mark Hamstra <ma...@clearstorydata.com>.
Correct; that's the completely degenerate case where you can't do anything
in parallel.  Often you'll also want your iterator function to send back
some information to an accumulator (perhaps just the result calculated with
the last element of the partition) which is then fed back into the
operation on the next partition as either a broadcast variable or part of
the closure.



On Tue, Oct 22, 2013 at 3:25 PM, Nathan Kronenfeld <
nkronenfeld@oculusinfo.com> wrote:

> You shouldn't have to fly data around
>
> You can just run it first on partition 0, then on partition 1, etc...  I
> may have the name slightly off, but something approximately like:
>
> for (p <- 0 until numPartitions)
>   data.mapPartitionsWithIndex((i, iter) => if (0 == p) iter.map(fcn) else
> List().iterator)
>
> should work... BUT that being said, you've now really lost the point of
> using Spark to begin with.
>
>

Re: Visitor function to RDD elements

Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
You shouldn't have to fly data around

You can just run it first on partition 0, then on partition 1, etc...  I
may have the name slightly off, but something approximately like:

for (p <- 0 until numPartitions)
  data.mapPartitionsWithIndex((i, iter) => if (0 == p) iter.map(fcn) else
List().iterator)

should work... BUT that being said, you've now really lost the point of
using Spark to begin with.

Re: Visitor function to RDD elements

Posted by Tom Vacek <mi...@gmail.com>.
Unfortunately, I think you're going to either have to fly a lot of data
around or create a lot of garbage.


On Tue, Oct 22, 2013 at 3:36 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Hey Matt,
>
> It seems like you are trying to perform an operation that just isn't
> parrallelizable. In that case, it's going to be tricky without collecting
> the entire dataset on one node.
>
> Spark does not expose an iterator like you are suggesting, that lets you
> traverse an RDD. You could build one yourself though by collecting one
> partition at a time at the driver, thought this would require some lower
> level understanding of Spark.
>
> - Patrick
>
>
>
> On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <mc...@palantir.com> wrote:
>
>>  In this context, it would be able to create a visitor mapping for each
>> partition. However, I'm looking for the ability to use a single visitor
>> object that will walk over all partitions.
>>
>>  I suppose I could do this if I used coalesce() to combine everything to
>> one partition but that's too much memory in one partition. Am I
>> misinterpreting how to use it?
>>
>>   From: Mark Hamstra <ma...@clearstorydata.com>
>> Reply-To: "user@spark.incubator.apache.org" <
>> user@spark.incubator.apache.org>
>> Date: Tuesday, October 22, 2013 12:51 PM
>> To: user <us...@spark.incubator.apache.org>
>> Subject: Re: Visitor function to RDD elements
>>
>>   mapPartitions
>> mapPartitionsWithIndex
>>
>>  With care, you can use these and maintain the iteration order within
>> partitions.  Beware, though, that any reduce functions need to be
>> associative and commutative.
>>
>>
>> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:
>>
>>>  Hi everyone,
>>>
>>>  I have a driver holding a reference to an RDD. The driver would like
>>> to "visit" each item in the RDD in order, say with a visitor object that
>>> invokes visit(item) to modify that visitor's internal state. The visiting
>>> is not commutative (e.g. Visiting item A then B makes a different internal
>>> state from visiting item B then item A). Items in the RDD also are not
>>> necessarily distinct.
>>>
>>>  I've looked into accumulators which don't work because they require
>>> the operation to be commutative. Collect() will not work because the RDD is
>>> too large; in general, bringing the whole RDD into one partition won't work
>>> since the RDD is too large.
>>>
>>>  Is it possible to iterate over the items in an RDD in order without
>>> bringing the entire dataset into a single JVM at a time, and/or obtain
>>> chunks of the RDD in order on the driver? We've tried using the internal
>>> iterator() method. In some cases, we get a stack trace (running locally
>>> with 3 threads). I've included the stack trace below.
>>>
>>>  Thanks,
>>>
>>>  -Matt Cheah
>>>
>>>  org.apache.spark.SparkException: Error communicating with
>>> MapOutputTracker
>>> at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>> at
>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>> at
>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>>> at
>>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>>> at
>>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>>> at
>>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>>> at
>>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>> at
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> at
>>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>> at
>>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>> after [10000] milliseconds
>>> at
>>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>> at
>>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>>> at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>> ... 46 more
>>>
>>>
>>
>

Re: Visitor function to RDD elements

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Matt,

It seems like you are trying to perform an operation that just isn't
parrallelizable. In that case, it's going to be tricky without collecting
the entire dataset on one node.

Spark does not expose an iterator like you are suggesting, that lets you
traverse an RDD. You could build one yourself though by collecting one
partition at a time at the driver, thought this would require some lower
level understanding of Spark.

- Patrick



On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <mc...@palantir.com> wrote:

>  In this context, it would be able to create a visitor mapping for each
> partition. However, I'm looking for the ability to use a single visitor
> object that will walk over all partitions.
>
>  I suppose I could do this if I used coalesce() to combine everything to
> one partition but that's too much memory in one partition. Am I
> misinterpreting how to use it?
>
>   From: Mark Hamstra <ma...@clearstorydata.com>
> Reply-To: "user@spark.incubator.apache.org" <
> user@spark.incubator.apache.org>
> Date: Tuesday, October 22, 2013 12:51 PM
> To: user <us...@spark.incubator.apache.org>
> Subject: Re: Visitor function to RDD elements
>
>   mapPartitions
> mapPartitionsWithIndex
>
>  With care, you can use these and maintain the iteration order within
> partitions.  Beware, though, that any reduce functions need to be
> associative and commutative.
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:
>
>>  Hi everyone,
>>
>>  I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>>  I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>>  Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>>  org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>> at
>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>> at
>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at
>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> at
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [10000] milliseconds
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>> ... 46 more
>>
>>
>

Re: Visitor function to RDD elements

Posted by Matt Cheah <mc...@palantir.com>.
In this context, it would be able to create a visitor mapping for each partition. However, I'm looking for the ability to use a single visitor object that will walk over all partitions.

I suppose I could do this if I used coalesce() to combine everything to one partition but that's too much memory in one partition. Am I misinterpreting how to use it?

From: Mark Hamstra <ma...@clearstorydata.com>>
Reply-To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Date: Tuesday, October 22, 2013 12:51 PM
To: user <us...@spark.incubator.apache.org>>
Subject: Re: Visitor function to RDD elements

mapPartitions
mapPartitionsWithIndex

With care, you can use these and maintain the iteration order within partitions.  Beware, though, that any reduce functions need to be associative and commutative.


On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com>> wrote:
Hi everyone,

I have a driver holding a reference to an RDD. The driver would like to "visit" each item in the RDD in order, say with a visitor object that invokes visit(item) to modify that visitor's internal state. The visiting is not commutative (e.g. Visiting item A then B makes a different internal state from visiting item B then item A). Items in the RDD also are not necessarily distinct.

I've looked into accumulators which don't work because they require the operation to be commutative. Collect() will not work because the RDD is too large; in general, bringing the whole RDD into one partition won't work since the RDD is too large.

Is it possible to iterate over the items in an RDD in order without bringing the entire dataset into a single JVM at a time, and/or obtain chunks of the RDD in order on the driver? We've tried using the internal iterator() method. In some cases, we get a stack trace (running locally with 3 threads). I've included the stack trace below.

Thanks,

-Matt Cheah

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
at com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
at com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000] milliseconds
at org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
at org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
... 46 more



Re: Visitor function to RDD elements

Posted by Mark Hamstra <ma...@clearstorydata.com>.
mapPartitions
mapPartitionsWithIndex

With care, you can use these and maintain the iteration order within
partitions.  Beware, though, that any reduce functions need to be
associative and commutative.


On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:

>  Hi everyone,
>
>  I have a driver holding a reference to an RDD. The driver would like to
> "visit" each item in the RDD in order, say with a visitor object that
> invokes visit(item) to modify that visitor's internal state. The visiting
> is not commutative (e.g. Visiting item A then B makes a different internal
> state from visiting item B then item A). Items in the RDD also are not
> necessarily distinct.
>
>  I've looked into accumulators which don't work because they require the
> operation to be commutative. Collect() will not work because the RDD is too
> large; in general, bringing the whole RDD into one partition won't work
> since the RDD is too large.
>
>  Is it possible to iterate over the items in an RDD in order without
> bringing the entire dataset into a single JVM at a time, and/or obtain
> chunks of the RDD in order on the driver? We've tried using the internal
> iterator() method. In some cases, we get a stack trace (running locally
> with 3 threads). I've included the stack trace below.
>
>  Thanks,
>
>  -Matt Cheah
>
>  org.apache.spark.SparkException: Error communicating with
> MapOutputTracker
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
> at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
> at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at
> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
> at
> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
> at
> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at
> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10000] milliseconds
> at
> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
> at
> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
> ... 46 more
>
>

Re: Visitor function to RDD elements

Posted by Matt Cheah <mc...@palantir.com>.
Thanks for the responses everyone. My work partner Mingyu who is cc'ed here may be able to provide more context as to the use case of the visitor.

The ground issue I think is that while Spark is great for parallel computation, there will come a point AFTER these computations where we'd need to eventually perform some kind of visiting operation. For example, suppose the cluster performs a decently sized parallel computation, and the result is to be streamed out in-order to a listening socket? It seems like even more overhead for the RDD to need to be saved to disk first and read back out again to get this sequential behavior.

I appreciate the discussion though. Quite enlightening.

Thanks,

-Matt Cheah

From: Christopher Nguyen <ct...@adatao.com>>
Date: Tuesday, October 22, 2013 2:23 PM
To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>, Andrew Winings <mc...@palantir.com>>
Cc: Mingyu Kim <mk...@palantir.com>>
Subject: Re: Visitor function to RDD elements

For better precision,

s/Or to be able to handle very large data sets ("big memory")/Or to be able to hold very large data sets in one place ("big memory")/g

--
Christopher T. Nguyen
Co-founder & CEO, Adatao<https://urldefense.proofpoint.com/v1/url?u=http://adatao.com&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=5ccc6cda99f52249627b7e5ca0394b74029b623dfffc1a826c513cd3e2cb2913>
linkedin.com/in/ctnguyen<https://urldefense.proofpoint.com/v1/url?u=http://linkedin.com/in/ctnguyen&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=3c7ae0c0e983c6b2399863b70b0c594a7511d88eb8bec927c18e74bc81f670fc>



On Tue, Oct 22, 2013 at 2:16 PM, Christopher Nguyen <ct...@adatao.com>> wrote:
Matt, it would be useful to back up one level to your problem statement. If it is strictly restricted as described, then you have a sequential problem that's not parallelizable. What is the primary design goal here? To complete the operation in the shortest time possible ("big compute")? Or to be able to handle very large data sets ("big memory")?  Or to ensure that the operation completes in a fault-tolerant manner ("reliability")?

There are two paths from here:

  1.  Finding parallelizable opportunities: there may be ways to squint at the problem in just the right way that provides a way to parallelize it:
     *   Maybe you can come up with some algebra or approximations that allows for associativity, so that different partitions of the data can be operated on in parallel.
     *   Perhaps the data is a time series where weekly or monthly chunks can be summarized in parallel and the sequential logic can be brought up several hierarchical levels.
     *   Perhaps the statefulness of the visitor has a finite memory of past visits that you can take advantage of.
  2.  Finding alternatives: it's important to realize that Spark's strength is in "big compute" and not in "big memory". It's only 1 of the 13 dwarfs of parallel computing patterns, the map-reduce, shared-nothing model (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo"). It's a very successful model, but one that sometimes requires a refactoring of the algorithm/data to make it applicable. So if #1 above isn't at all possible, you might look into a "big memory" approach, such as Tachyon, or memcached, or even just reading a big file sequentially and applying your visitor to each data row, depending critically on what bottleneck you are engineering against.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao<https://urldefense.proofpoint.com/v1/url?u=http://adatao.com&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=5ccc6cda99f52249627b7e5ca0394b74029b623dfffc1a826c513cd3e2cb2913>
linkedin.com/in/ctnguyen<https://urldefense.proofpoint.com/v1/url?u=http://linkedin.com/in/ctnguyen&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=wN%2Fx%2FhPP%2BKO%2FchVEKgSYK9Qscw6MPdvECQix79iTADk%3D%0A&s=3c7ae0c0e983c6b2399863b70b0c594a7511d88eb8bec927c18e74bc81f670fc>



On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com>> wrote:
Hi everyone,

I have a driver holding a reference to an RDD. The driver would like to "visit" each item in the RDD in order, say with a visitor object that invokes visit(item) to modify that visitor's internal state. The visiting is not commutative (e.g. Visiting item A then B makes a different internal state from visiting item B then item A). Items in the RDD also are not necessarily distinct.

I've looked into accumulators which don't work because they require the operation to be commutative. Collect() will not work because the RDD is too large; in general, bringing the whole RDD into one partition won't work since the RDD is too large.

Is it possible to iterate over the items in an RDD in order without bringing the entire dataset into a single JVM at a time, and/or obtain chunks of the RDD in order on the driver? We've tried using the internal iterator() method. In some cases, we get a stack trace (running locally with 3 threads). I've included the stack trace below.

Thanks,

-Matt Cheah

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
at com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
at com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000] milliseconds
at org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
at org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
... 46 more




Re: Visitor function to RDD elements

Posted by Christopher Nguyen <ct...@adatao.com>.
For better precision,

s/Or to be able to handle very large data sets ("big memory")/Or to be able
to hold very large data sets in one place ("big memory")/g

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 2:16 PM, Christopher Nguyen <ct...@adatao.com> wrote:

> Matt, it would be useful to back up one level to your problem statement.
> If it is strictly restricted as described, then you have a sequential
> problem that's not parallelizable. What is the primary design goal here? To
> complete the operation in the shortest time possible ("big compute")? Or to
> be able to handle very large data sets ("big memory")?  Or to ensure that
> the operation completes in a fault-tolerant manner ("reliability")?
>
> There are two paths from here:
>
>    1. Finding parallelizable opportunities: there may be ways to squint
>    at the problem in just the right way that provides a way to parallelize it:
>       - Maybe you can come up with some algebra or approximations that
>       allows for associativity, so that different partitions of the data can be
>       operated on in parallel.
>       - Perhaps the data is a time series where weekly or monthly chunks
>       can be summarized in parallel and the sequential logic can be brought up
>       several hierarchical levels.
>       - Perhaps the statefulness of the visitor has a finite memory of
>       past visits that you can take advantage of.
>       2. Finding alternatives: it's important to realize that Spark's
>    strength is in "big compute" and not in "big memory". It's only 1 of the 13
>    dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
>    (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
>    It's a very successful model, but one that sometimes requires a refactoring
>    of the algorithm/data to make it applicable. So if #1 above isn't at all
>    possible, you might look into a "big memory" approach, such as Tachyon, or
>    memcached, or even just reading a big file sequentially and applying your
>    visitor to each data row, depending critically on what bottleneck you are
>    engineering against.
>
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:
>
>>  Hi everyone,
>>
>>  I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>>  I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>>  Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>>  org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>> at
>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>> at
>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at
>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> at
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [10000] milliseconds
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>> ... 46 more
>>
>>
>

Re: Visitor function to RDD elements

Posted by Christopher Nguyen <ct...@adatao.com>.
Matt, it would be useful to back up one level to your problem statement. If
it is strictly restricted as described, then you have a sequential problem
that's not parallelizable. What is the primary design goal here? To
complete the operation in the shortest time possible ("big compute")? Or to
be able to handle very large data sets ("big memory")?  Or to ensure that
the operation completes in a fault-tolerant manner ("reliability")?

There are two paths from here:

   1. Finding parallelizable opportunities: there may be ways to squint at
   the problem in just the right way that provides a way to parallelize it:
      - Maybe you can come up with some algebra or approximations that
      allows for associativity, so that different partitions of the data can be
      operated on in parallel.
      - Perhaps the data is a time series where weekly or monthly chunks
      can be summarized in parallel and the sequential logic can be brought up
      several hierarchical levels.
      - Perhaps the statefulness of the visitor has a finite memory of past
      visits that you can take advantage of.
      2. Finding alternatives: it's important to realize that Spark's
   strength is in "big compute" and not in "big memory". It's only 1 of the 13
   dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
   (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
   It's a very successful model, but one that sometimes requires a refactoring
   of the algorithm/data to make it applicable. So if #1 above isn't at all
   possible, you might look into a "big memory" approach, such as Tachyon, or
   memcached, or even just reading a big file sequentially and applying your
   visitor to each data row, depending critically on what bottleneck you are
   engineering against.


--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mc...@palantir.com> wrote:

>  Hi everyone,
>
>  I have a driver holding a reference to an RDD. The driver would like to
> "visit" each item in the RDD in order, say with a visitor object that
> invokes visit(item) to modify that visitor's internal state. The visiting
> is not commutative (e.g. Visiting item A then B makes a different internal
> state from visiting item B then item A). Items in the RDD also are not
> necessarily distinct.
>
>  I've looked into accumulators which don't work because they require the
> operation to be commutative. Collect() will not work because the RDD is too
> large; in general, bringing the whole RDD into one partition won't work
> since the RDD is too large.
>
>  Is it possible to iterate over the items in an RDD in order without
> bringing the entire dataset into a single JVM at a time, and/or obtain
> chunks of the RDD in order on the driver? We've tried using the internal
> iterator() method. In some cases, we get a stack trace (running locally
> with 3 threads). I've included the stack trace below.
>
>  Thanks,
>
>  -Matt Cheah
>
>  org.apache.spark.SparkException: Error communicating with
> MapOutputTracker
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
> at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
> at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at
> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
> at
> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
> at
> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at
> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10000] milliseconds
> at
> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
> at
> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
> ... 46 more
>
>