You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Ron Hashimshony <ro...@myheritage.com> on 2017/05/26 19:27:02 UTC

AbstractMethodError when moving from MR to Spark pipeline

Hi,
I have a pipeline that runs fine with MRPipeline. I tried to replace it
with SparkPipeline, and am getting AbstractMethodError.
This is the code (reduced to highlight the problem):

Pipeline pipe = new SparkPipeline("local", "test");

PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input,
Avros.records(siteInfo.class)));

PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new
MapFn<site_info, Pair<Long, Integer>>() {
    @Override
    public Pair<Long, Integer> map(siteInfo input) {
        return Pair.of(input.getSiteId(), input.getPartitionId());
    }
}, Avros.tableOf(Avros.longs(), Avros.ints()));

siteToPart.write(To.textFile(output));
pipe.done();


The exception is this:

java.lang.AbstractMethodError:
org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1

Thanks,
Ron.

Re: AbstractMethodError when moving from MR to Spark pipeline

Posted by Ron Hashimshony <ro...@myheritage.com>.
Thanks Josh... nailed it.
Are we planning on shipping 1.0.0 out soon? Need any help with that?
And now that it's running - I have generated the same pipeline also in
Spark (w/o Crunch, using RDDs) and I compared the DAGs Spark generated, and
they look completely different, with Crunch adding many intermediate steps
in each step (map, mapPartitions, mapPartitionsWithIndex to name a few).
Can you give me some insight as to how is Crunch submitting the jobs to
Spark?
I am going to do some benchmarking, but will there be overhead to these
extra steps?
Thanks,
Ron.

On Fri, May 26, 2017 at 10:30 PM Josh Wills <jo...@gmail.com> wrote:

> Hey Ron,
>
> If I had to guess, I'd suspect you'll need to run against a version built
> from master that has these patches included:
>
> https://issues.apache.org/jira/browse/CRUNCH-618
>
> Josh
>
> On Fri, May 26, 2017 at 12:27 PM, Ron Hashimshony <
> ron.hashimshony@myheritage.com> wrote:
>
>> Hi,
>> I have a pipeline that runs fine with MRPipeline. I tried to replace it
>> with SparkPipeline, and am getting AbstractMethodError.
>> This is the code (reduced to highlight the problem):
>>
>> Pipeline pipe = new SparkPipeline("local", "test");
>>
>> PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input, Avros.records(siteInfo.class)));
>>
>> PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new MapFn<site_info, Pair<Long, Integer>>() {
>>     @Override
>>     public Pair<Long, Integer> map(siteInfo input) {
>>         return Pair.of(input.getSiteId(), input.getPartitionId());
>>     }
>> }, Avros.tableOf(Avros.longs(), Avros.ints()));
>>
>> siteToPart.write(To.textFile(output));
>> pipe.done();
>>
>>
>> The exception is this:
>>
>> java.lang.AbstractMethodError:
>> org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
>>
>> Thanks,
>> Ron.
>>
>
>

Re: AbstractMethodError when moving from MR to Spark pipeline

Posted by Josh Wills <jo...@gmail.com>.
Hey Ron,

If I had to guess, I'd suspect you'll need to run against a version built
from master that has these patches included:

https://issues.apache.org/jira/browse/CRUNCH-618

Josh

On Fri, May 26, 2017 at 12:27 PM, Ron Hashimshony <
ron.hashimshony@myheritage.com> wrote:

> Hi,
> I have a pipeline that runs fine with MRPipeline. I tried to replace it
> with SparkPipeline, and am getting AbstractMethodError.
> This is the code (reduced to highlight the problem):
>
> Pipeline pipe = new SparkPipeline("local", "test");
>
> PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input, Avros.records(siteInfo.class)));
>
> PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new MapFn<site_info, Pair<Long, Integer>>() {
>     @Override
>     public Pair<Long, Integer> map(siteInfo input) {
>         return Pair.of(input.getSiteId(), input.getPartitionId());
>     }
> }, Avros.tableOf(Avros.longs(), Avros.ints()));
>
> siteToPart.write(To.textFile(output));
> pipe.done();
>
>
> The exception is this:
>
> java.lang.AbstractMethodError: org.apache.crunch.impl.spark.
> fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:796)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:796)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
>
> Thanks,
> Ron.
>