You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Ron Hashimshony <ro...@myheritage.com> on 2017/05/26 19:27:02 UTC
AbstractMethodError when moving from MR to Spark pipeline
Hi,
I have a pipeline that runs fine with MRPipeline. I tried to replace it
with SparkPipeline, and am getting AbstractMethodError.
This is the code (reduced to highlight the problem):
Pipeline pipe = new SparkPipeline("local", "test");
PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input,
Avros.records(siteInfo.class)));
PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new
MapFn<site_info, Pair<Long, Integer>>() {
@Override
public Pair<Long, Integer> map(siteInfo input) {
return Pair.of(input.getSiteId(), input.getPartitionId());
}
}, Avros.tableOf(Avros.longs(), Avros.ints()));
siteToPart.write(To.textFile(output));
pipe.done();
The exception is this:
java.lang.AbstractMethodError:
org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
Thanks,
Ron.
Re: AbstractMethodError when moving from MR to Spark pipeline
Posted by Ron Hashimshony <ro...@myheritage.com>.
Thanks Josh... nailed it.
Are we planning on shipping 1.0.0 out soon? Need any help with that?
And now that it's running - I have generated the same pipeline also in
Spark (w/o Crunch, using RDDs) and I compared the DAGs Spark generated, and
they look completely different, with Crunch adding many intermediate steps
in each step (map, mapPartitions, mapPartitionsWithIndex to name a few).
Can you give me some insight as to how is Crunch submitting the jobs to
Spark?
I am going to do some benchmarking, but will there be overhead to these
extra steps?
Thanks,
Ron.
On Fri, May 26, 2017 at 10:30 PM Josh Wills <jo...@gmail.com> wrote:
> Hey Ron,
>
> If I had to guess, I'd suspect you'll need to run against a version built
> from master that has these patches included:
>
> https://issues.apache.org/jira/browse/CRUNCH-618
>
> Josh
>
> On Fri, May 26, 2017 at 12:27 PM, Ron Hashimshony <
> ron.hashimshony@myheritage.com> wrote:
>
>> Hi,
>> I have a pipeline that runs fine with MRPipeline. I tried to replace it
>> with SparkPipeline, and am getting AbstractMethodError.
>> This is the code (reduced to highlight the problem):
>>
>> Pipeline pipe = new SparkPipeline("local", "test");
>>
>> PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input, Avros.records(siteInfo.class)));
>>
>> PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new MapFn<site_info, Pair<Long, Integer>>() {
>> @Override
>> public Pair<Long, Integer> map(siteInfo input) {
>> return Pair.of(input.getSiteId(), input.getPartitionId());
>> }
>> }, Avros.tableOf(Avros.longs(), Avros.ints()));
>>
>> siteToPart.write(To.textFile(output));
>> pipe.done();
>>
>>
>> The exception is this:
>>
>> java.lang.AbstractMethodError:
>> org.apache.crunch.impl.spark.fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
>>
>> Thanks,
>> Ron.
>>
>
>
Re: AbstractMethodError when moving from MR to Spark pipeline
Posted by Josh Wills <jo...@gmail.com>.
Hey Ron,
If I had to guess, I'd suspect you'll need to run against a version built
from master that has these patches included:
https://issues.apache.org/jira/browse/CRUNCH-618
Josh
On Fri, May 26, 2017 at 12:27 PM, Ron Hashimshony <
ron.hashimshony@myheritage.com> wrote:
> Hi,
> I have a pipeline that runs fine with MRPipeline. I tried to replace it
> with SparkPipeline, and am getting AbstractMethodError.
> This is the code (reduced to highlight the problem):
>
> Pipeline pipe = new SparkPipeline("local", "test");
>
> PCollection<siteInfo> siteInfoInput = pipe.read(From.avroFile(input, Avros.records(siteInfo.class)));
>
> PTable<Long, Integer> siteToPart = siteInfoInput.parallelDo(new MapFn<site_info, Pair<Long, Integer>>() {
> @Override
> public Pair<Long, Integer> map(siteInfo input) {
> return Pair.of(input.getSiteId(), input.getPartitionId());
> }
> }, Avros.tableOf(Avros.longs(), Avros.ints()));
>
> siteToPart.write(To.textFile(output));
> pipe.done();
>
>
> The exception is this:
>
> java.lang.AbstractMethodError: org.apache.crunch.impl.spark.
> fn.CrunchPairTuple2.call(Ljava/lang/Object;)Ljava/util/Iterator;
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:796)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:796)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Tried with both crunch 0.13.0 and 0.15.0 with spark 2.1
>
> Thanks,
> Ron.
>