You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@kylin.apache.org by ShaoFeng Shi <sh...@apache.org> on 2018/09/10 10:26:28 UTC

Re: Spark cubing on EMR

Hi Sonny,

Here are my 2 cents: use HDFS as the working directory (for intermediate
files) and S3 for HBase, with another script to backup HDFS files to S3 in
an async way.

Hortonworks has the similar recommendation, see "Committing Output to
Amazon S3" in https://hortonworks.github.io/hdp-aws/s3-spark/

Sonny Heer <so...@gmail.com> 于2018年8月30日周四 下午8:29写道:

> Great thanks ShaoFeng!
>
> other spark issues.  all properties are pointing to S3 for storage and
> dynamic allocation set to true with autoscaling m/r cluster.  the spark job
> shows lots of renames:
>
> 2018-08-30 08:03:03,260 INFO  [Scheduler 1762472915 Job
> b9b45888-7eca-49a8-8677-b241d0529b79-721] spark.SparkExecutable:38 :
> 18/08/30 08:03:03 INFO s3n2.S3NativeFileSystem2: rename
> s3://hivestagehbase/kylin/kylin_metadata/kylin-b9b45888-7eca-49a8-8677-b241d0529b79/cube_sparktest/cuboid/level_1_cuboid/_temporary/0/task_20180830073925_0008_r_000369/part-r-00369
> s3://hivestagehbase/kylin/kylin_metadata/kylin-b9b45888-7eca-49a8-8677-b241d0529b79/cube_sparktest/cuboid/level_1_cuboid/part-r-00369
>
> Eventually I manually killed the job.  Can anyone share their experiences
> with cube building on S3/EMR?
>
> any ideas?
>
>
>
> On Wed, Aug 29, 2018 at 11:11 PM ShaoFeng Shi <sh...@apache.org>
> wrote:
>
>> Hi Sonny,
>>
>> Thanks for the information.
>>
>> I checked this EMR document; The "maximizeResourceAllocation" is an EMR
>> cluster property on provisioning, not a Spark configuration. If true, EMR
>> generates an optimized spark-defaults configuration (like memory, core,
>> instance, etc) based on your cluster size. This provides a very good
>> out-of-box user experience. But you can copy the generated spark-defaults
>> to Kylin, they will be recognized as well.
>>
>> (EMR 5.16 uses Spark 2.3, while Kylin's is 2.1, so to use EMR's Kylin
>> needs to upgrade Spark first. This work has not been kicked off.)
>>
>> For the Spark cubing optimization, I uploaded the slide we talked in
>> Kylin Meetup @Shanghai, hope it is helpful to you:
>> https://www.slideshare.net/ShiShaoFeng1/spark-tunning-in-apache-kylin
>>
>> 2018-08-30 13:39 GMT+08:00 Sonny Heer <so...@gmail.com>:
>>
>>> ShaoFeng,  yeah manually copy of jars does work in latest version of EMR
>>> (5.16.0).  Thanks.
>>>
>>> It would be nice to utilize the emr version of spark therefore can make
>>> use of emr specific properties like maximizeResourceAllocation
>>>
>>> see
>>>
>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html
>>>
>>>
>>> Thanks
>>>
>>> On Tue, Aug 28, 2018 at 8:17 PM Sonny Heer <so...@gmail.com> wrote:
>>>
>>>> yeah seems that way.  I did copy over the spark-defaults.conf from EMR
>>>> to KYLIN_HOME/spark/conf
>>>>
>>>> e.g.
>>>>
>>>> spark.driver.extraClassPath
>>>> :/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar
>>>>
>>>> spark.driver.extraLibraryPath
>>>> /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
>>>>
>>>> spark.executor.extraClassPath
>>>> :/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar
>>>>
>>>>
>>>>
>>>> This didn't work.  But will try manual moving jars....
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 28, 2018 at 5:29 PM ShaoFeng Shi <sh...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Sonny,
>>>>>
>>>>> Kylin is compiled with Spark 2.1, so if EMR's is not this version, it
>>>>> couldn't be used by Kylin.
>>>>>
>>>>> We suggest you to use Kylin's Spark and copy EMR specific
>>>>> implementation jars to it:
>>>>>
>>>>> cp /usr/lib/hadoop-lzo/lib/*.jar /usr/local/kylin/spark/jars/
>>>>> cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-*.jar
>>>>> /usr/local/kylin/spark/jars/
>>>>> cp /usr/lib/hadoop/hadoop-common*-amzn-*.jar
>>>>> /usr/local/kylin/spark/jars/
>>>>>
>>>>> This is how we did on EMR before, just be a sample for your reference.
>>>>> As EMR version keeps changing, there might be other cases.
>>>>>
>>>>> Please let me know if it works. I can add this piece to the
>>>>> documentation if got verified.
>>>>>
>>>>> 2018-08-29 6:04 GMT+08:00 Sonny Heer <so...@gmail.com>:
>>>>>
>>>>>> After fixing the above issue by updating spark_home to point to emr
>>>>>> spark.  i get the following.  still appears to be a spark versioning
>>>>>> issue...
>>>>>>
>>>>>>
>>>>>> Driver stacktrace:
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1753)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1741)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1740)
>>>>>> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1740)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
>>>>>> 	at scala.Option.foreach(Option.scala:257)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
>>>>>> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1974)
>>>>>> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1923)
>>>>>> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1912)
>>>>>> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
>>>>>> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
>>>>>> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
>>>>>> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
>>>>>> 	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
>>>>>> 	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
>>>>>> 	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
>>>>>> 	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
>>>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>>> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>>>>>> 	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
>>>>>> 	at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:831)
>>>>>> 	at org.apache.kylin.engine.spark.SparkCubingByLayer.saveToHDFS(SparkCubingByLayer.java:277)
>>>>>> 	at org.apache.kylin.engine.spark.SparkCubingByLayer.execute(SparkCubingByLayer.java:230)
>>>>>> 	at org.apache.kylin.common.util.AbstractApplication.execute(AbstractApplication.java:37)
>>>>>> 	at org.apache.kylin.common.util.SparkEntry.main(SparkEntry.java:44)
>>>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> 	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>>>>>> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>> Caused by: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage
>>>>>> Note: To register this class use: kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class);
>>>>>> 	at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488)
>>>>>> 	at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622)
>>>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347)
>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 28, 2018 at 8:11 AM Sonny Heer <so...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Unable to build cube at step "#6 Step Name: Build Cube with Spark"
>>>>>>>
>>>>>>> Looks to be a classpath issue with spark not able to find some
>>>>>>> amazon emr libs.  when i look in spark defaults /etc/spark/conf i do see
>>>>>>> the classpath being set correctly.
>>>>>>>
>>>>>>> any ideas?
>>>>>>>
>>>>>>>
>>>>>>> -------------
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.RuntimeException: error execute org.apache.kylin.engine.spark.SparkCubingByLayer
>>>>>>> 	at org.apache.kylin.common.util.AbstractApplication.execute(AbstractApplication.java:42)
>>>>>>> 	at org.apache.kylin.common.util.SparkEntry.main(SparkEntry.java:44)
>>>>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:744)
>>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
>>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
>>>>>>> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
>>>>>>> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>>> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>>>>> 	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>>>>>>> 	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
>>>>>>> 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
>>>>>>> 	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>>>>>>> 	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
>>>>>>> 	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
>>>>>>> 	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
>>>>>>> 	at org.apache.kylin.common.util.HadoopUtil.deletePath(HadoopUtil.java:133)
>>>>>>> 	at org.apache.kylin.engine.spark.SparkCubingByLayer.execute(SparkCubingByLayer.java:142)
>>>>>>> 	at org.apache.kylin.common.util.AbstractApplication.execute(AbstractApplication.java:37)
>>>>>>> 	... 10 more
>>>>>>> Caused by: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>>>>> 	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>>>>>>> 	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>>>>>> 	... 19 more
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>>
>>>>> Shaofeng Shi 史少锋
>>>>>
>>>>>
>>
>>
>> --
>> Best regards,
>>
>> Shaofeng Shi 史少锋
>>
>>

-- 
Best regards,

Shaofeng Shi 史少锋