You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Everett Anderson <ev...@nuna.com.INVALID> on 2017/03/16 21:55:03 UTC

Spark 2.0.2 Dataset union() slowness vs RDD union?

Hi,

We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of tables
together and save as Parquet to S3, but it seems to take a long time. We're
using the S3A FileSystem implementation under the covers, too, if that
helps.

Watching the Spark UI, the executors all eventually stop (we're using
dynamic allocation) but under the SQL tab we can see a "save at
NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is still
running of course, but it may take tens of minutes to finish. It makes me
wonder if our data all being collected through the driver.

If we instead convert the Datasets to RDDs and call SparkContext.union() it
works quickly.

Anyone know if this is a known issue?

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
Closing the loop on this --

It appears we were just hitting some other problem related to S3A/S3,
likely that the temporary directory used by the S3A Hadoop file system
implementation for buffering data during upload either was full or had the
wrong permissions.




On Thu, Mar 16, 2017 at 6:03 PM, Everett Anderson <ev...@nuna.com> wrote:

> Hi!
>
> On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz <br...@gmail.com> wrote:
>
>> Hi Everett,
>>
>> IIRC we added unionAll in Spark 2.0 which is the same implementation as
>> rdd union. The union in DataFrames with Spark 2.0 does dedeuplication, and
>> that's why you should be seeing the slowdown.
>>
>
> I thought it was the other way -- unionAll was deprecated in 2.0 and union
> now does not de-dupe --
>
> "Deprecated. use union(). Since 2.0.0.
> Returns a new Dataset containing union of rows in this Dataset and another
> Dataset. This is equivalent to UNION ALL in SQL."
>
> from
>
> https://spark.apache.org/docs/2.0.2/api/java/org/apache/
> spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset)
> and
> https://spark.apache.org/docs/2.0.2/api/java/org/apache/
> spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset)
>
>
>
>
>
>
>
>
>
>>
>> Best,
>> Burak
>>
>> On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson <
>> everett@nuna.com.invalid> wrote:
>>
>>> Looks like the Dataset version of union may also fail with the following
>>> on larger data sets, which again seems like it might be drawing everything
>>> into the driver for some reason --
>>>
>>> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
>>> 5760, ip-10-8-52-198.us-west-2.compute.internal):
>>> java.lang.IllegalArgumentException: bound must be positive
>>> at java.util.Random.nextInt(Random.java:388)
>>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>>> onfChanged(LocalDirAllocator.java:305)
>>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.g
>>> etLocalPathForWrite(LocalDirAllocator.java:344)
>>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>>> reateTmpFileForWrite(LocalDirAllocator.java:416)
>>> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite
>>> (LocalDirAllocator.java:198)
>>> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStr
>>> eam.java:87)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
>>> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFi
>>> leWriter.java:176)
>>> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFi
>>> leWriter.java:160)
>>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>>> r(ParquetOutputFormat.java:289)
>>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>>> r(ParquetOutputFormat.java:262)
>>> at org.apache.spark.sql.execution.datasources.parquet.ParquetOu
>>> tputWriter.<init>(ParquetFileFormat.scala:562)
>>> at org.apache.spark.sql.execution.datasources.parquet.ParquetFi
>>> leFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
>>> at org.apache.spark.sql.execution.datasources.BaseWriterContain
>>> er.newOutputWriter(WriterContainer.scala:131)
>>> at org.apache.spark.sql.execution.datasources.DefaultWriterCont
>>> ainer.writeRows(WriterContainer.scala:247)
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.appl
>>> y(InsertIntoHadoopFsRelationCommand.scala:143)
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.appl
>>> y(InsertIntoHadoopFsRelationCommand.scala:143)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ev...@nuna.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
>>>> tables together and save as Parquet to S3, but it seems to take a long
>>>> time. We're using the S3A FileSystem implementation under the covers, too,
>>>> if that helps.
>>>>
>>>> Watching the Spark UI, the executors all eventually stop (we're using
>>>> dynamic allocation) but under the SQL tab we can see a "save at
>>>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is
>>>> still running of course, but it may take tens of minutes to finish. It
>>>> makes me wonder if our data all being collected through the driver.
>>>>
>>>> If we instead convert the Datasets to RDDs and
>>>> call SparkContext.union() it works quickly.
>>>>
>>>> Anyone know if this is a known issue?
>>>>
>>>>
>>>
>>
>

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
Hi!

On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz <br...@gmail.com> wrote:

> Hi Everett,
>
> IIRC we added unionAll in Spark 2.0 which is the same implementation as
> rdd union. The union in DataFrames with Spark 2.0 does dedeuplication, and
> that's why you should be seeing the slowdown.
>

I thought it was the other way -- unionAll was deprecated in 2.0 and union
now does not de-dupe --

"Deprecated. use union(). Since 2.0.0.
Returns a new Dataset containing union of rows in this Dataset and another
Dataset. This is equivalent to UNION ALL in SQL."

from

https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset)
and
https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset)









>
> Best,
> Burak
>
> On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson <
> everett@nuna.com.invalid> wrote:
>
>> Looks like the Dataset version of union may also fail with the following
>> on larger data sets, which again seems like it might be drawing everything
>> into the driver for some reason --
>>
>> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
>> 5760, ip-10-8-52-198.us-west-2.compute.internal):
>> java.lang.IllegalArgumentException: bound must be positive
>> at java.util.Random.nextInt(Random.java:388)
>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>> onfChanged(LocalDirAllocator.java:305)
>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.g
>> etLocalPathForWrite(LocalDirAllocator.java:344)
>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>> reateTmpFileForWrite(LocalDirAllocator.java:416)
>> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite
>> (LocalDirAllocator.java:198)
>> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStr
>> eam.java:87)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
>> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFi
>> leWriter.java:176)
>> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFi
>> leWriter.java:160)
>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>> r(ParquetOutputFormat.java:289)
>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>> r(ParquetOutputFormat.java:262)
>> at org.apache.spark.sql.execution.datasources.parquet.ParquetOu
>> tputWriter.<init>(ParquetFileFormat.scala:562)
>> at org.apache.spark.sql.execution.datasources.parquet.ParquetFi
>> leFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
>> at org.apache.spark.sql.execution.datasources.BaseWriterContain
>> er.newOutputWriter(WriterContainer.scala:131)
>> at org.apache.spark.sql.execution.datasources.DefaultWriterCont
>> ainer.writeRows(WriterContainer.scala:247)
>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.
>> apply(InsertIntoHadoopFsRelationCommand.scala:143)
>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.
>> apply(InsertIntoHadoopFsRelationCommand.scala:143)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ev...@nuna.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
>>> tables together and save as Parquet to S3, but it seems to take a long
>>> time. We're using the S3A FileSystem implementation under the covers, too,
>>> if that helps.
>>>
>>> Watching the Spark UI, the executors all eventually stop (we're using
>>> dynamic allocation) but under the SQL tab we can see a "save at
>>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is
>>> still running of course, but it may take tens of minutes to finish. It
>>> makes me wonder if our data all being collected through the driver.
>>>
>>> If we instead convert the Datasets to RDDs and call SparkContext.union()
>>> it works quickly.
>>>
>>> Anyone know if this is a known issue?
>>>
>>>
>>
>

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

Posted by Burak Yavuz <br...@gmail.com>.
Hi Everett,

IIRC we added unionAll in Spark 2.0 which is the same implementation as rdd
union. The union in DataFrames with Spark 2.0 does dedeuplication, and
that's why you should be seeing the slowdown.

Best,
Burak

On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson <ev...@nuna.com.invalid>
wrote:

> Looks like the Dataset version of union may also fail with the following
> on larger data sets, which again seems like it might be drawing everything
> into the driver for some reason --
>
> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
> 5760, ip-10-8-52-198.us-west-2.compute.internal): java.lang.IllegalArgumentException:
> bound must be positive
> at java.util.Random.nextInt(Random.java:388)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> confChanged(LocalDirAllocator.java:305)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> getLocalPathForWrite(LocalDirAllocator.java:344)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> createTmpFileForWrite(LocalDirAllocator.java:416)
> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(
> LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(
> S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(
> ParquetFileWriter.java:176)
> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(
> ParquetFileWriter.java:160)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:289)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:262)
> at org.apache.spark.sql.execution.datasources.parquet.
> ParquetOutputWriter.<init>(ParquetFileFormat.scala:562)
> at org.apache.spark.sql.execution.datasources.parquet.
> ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
> at org.apache.spark.sql.execution.datasources.BaseWriterContainer.
> newOutputWriter(WriterContainer.scala:131)
> at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.
> writeRows(WriterContainer.scala:247)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> Hi,
>>
>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
>> tables together and save as Parquet to S3, but it seems to take a long
>> time. We're using the S3A FileSystem implementation under the covers, too,
>> if that helps.
>>
>> Watching the Spark UI, the executors all eventually stop (we're using
>> dynamic allocation) but under the SQL tab we can see a "save at
>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is
>> still running of course, but it may take tens of minutes to finish. It
>> makes me wonder if our data all being collected through the driver.
>>
>> If we instead convert the Datasets to RDDs and call SparkContext.union()
>> it works quickly.
>>
>> Anyone know if this is a known issue?
>>
>>
>

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
Looks like the Dataset version of union may also fail with the following on
larger data sets, which again seems like it might be drawing everything
into the driver for some reason --

7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
5760, ip-10-8-52-198.us-west-2.compute.internal):
java.lang.IllegalArgumentException: bound must be positive
at java.util.Random.nextInt(Random.java:388)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:305)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at
org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:176)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:160)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:289)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetFileFormat.scala:562)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ev...@nuna.com> wrote:

> Hi,
>
> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
> tables together and save as Parquet to S3, but it seems to take a long
> time. We're using the S3A FileSystem implementation under the covers, too,
> if that helps.
>
> Watching the Spark UI, the executors all eventually stop (we're using
> dynamic allocation) but under the SQL tab we can see a "save at
> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is still
> running of course, but it may take tens of minutes to finish. It makes me
> wonder if our data all being collected through the driver.
>
> If we instead convert the Datasets to RDDs and call SparkContext.union()
> it works quickly.
>
> Anyone know if this is a known issue?
>
>