You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pooja Jain <po...@gmail.com> on 2015/07/01 06:50:46 UTC

Issue with parquet write after join (Spark 1.4.0)

Hi,

We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
spark-submit. We are facing parquet write issue after doing dataframe joins

We have a full data set and then an incremental data. We are reading them
as dataframes, joining them, and then writing the data to the hdfs system
in parquet format. We are getting the timeout error on the last partition.

But if we do a count on the joined data it is working - which gives us the
confidence that join is happening properly. Only in case of writing to the
hdfs it is timing out.

Code flow:

// join two data frames - dfBase and dfIncr on primaryKey
val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) ===
dfIncr(primaryKey), "outer")

// applying a reduce function on each row.
val mergedDF = joinedDF.map(x =>
  reduceFunc(x)
)

//converting back to dataframe
val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

//writing to parquet file
newdf.write.parquet(hdfsfilepath)

Getting following exception:

15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26
with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26
on slave2: Executor heartbeat timed out after 255766 ms
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for
26 from TaskSet 7.0
15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in
stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in
stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting
to kill executor(s) 26
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 26 from BlockManagerMaster.
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing
block manager BlockManagerId(26, slave2, 54845)
15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26
successfully in removeExecutor
15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total
number of 26 executor(s).
15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is
now unavailable on executor 26 (193/200, false)
15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver
requested to kill executor(s) 26.
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26
on slave2: remote Rpc client disassociated
15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for
26 from TaskSet 7.0
15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 26 from BlockManagerMaster.
15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26
successfully in removeExecutor
15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association
with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in
stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task
failed while writing rows.
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to
connect to slave2/...:54845
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
	... 8 more
Caused by: java.io.IOException: Failed to connect to slave2/...:54845
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	... 3 more
Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
	... 1 more

Re: Issue with parquet write after join (Spark 1.4.0)

Posted by Michael Armbrust <mi...@databricks.com>.
I would still look at your executor logs.  A count() is rewritten by the
optimizer to be much more efficient because you don't actually need any of
the columns.  Also, writing parquet allocates quite a few large buffers.

On Wed, Jul 1, 2015 at 5:42 AM, Pooja Jain <po...@gmail.com> wrote:

> Join is happening successfully as I am able to do count() after the join.
>
> Error is coming only while trying to write in parquet format on hdfs.
>
> Thanks,
> Pooja.
>
> On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> It says:
>>
>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>
>> Could you look in the executor logs (stderr on slave2) and see what made
>> it shut down? Since you are doing a join there's a high possibility of OOM
>> etc.
>>
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <po...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
>>> spark-submit. We are facing parquet write issue after doing dataframe joins
>>>
>>> We have a full data set and then an incremental data. We are reading
>>> them as dataframes, joining them, and then writing the data to the hdfs
>>> system in parquet format. We are getting the timeout error on the last
>>> partition.
>>>
>>> But if we do a count on the joined data it is working - which gives us
>>> the confidence that join is happening properly. Only in case of writing to
>>> the hdfs it is timing out.
>>>
>>> Code flow:
>>>
>>> // join two data frames - dfBase and dfIncr on primaryKey
>>> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), "outer")
>>>
>>> // applying a reduce function on each row.
>>> val mergedDF = joinedDF.map(x =>
>>>   reduceFunc(x)
>>> )
>>>
>>> //converting back to dataframe
>>> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)
>>>
>>> //writing to parquet file
>>> newdf.write.parquet(hdfsfilepath)
>>>
>>> Getting following exception:
>>>
>>> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
>>> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
>>> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
>>> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
>>> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845)
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
>>> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s).
>>> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false)
>>> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26.
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated
>>> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
>>> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
>>> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows.
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
>>> 	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
>>> 	... 8 more
>>> Caused by: java.io.IOException: Failed to connect to slave2/...:54845
>>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>> 	... 3 more
>>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>>> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>> 	... 1 more
>>>
>>>
>>
>

Re: Issue with parquet write after join (Spark 1.4.0)

Posted by Raghavendra Pandey <ra...@gmail.com>.
By any chance, are you using time field in your df. Time fields are known
to be notorious in rdd conversion.
On Jul 1, 2015 6:13 PM, "Pooja Jain" <po...@gmail.com> wrote:

> Join is happening successfully as I am able to do count() after the join.
>
> Error is coming only while trying to write in parquet format on hdfs.
>
> Thanks,
> Pooja.
>
> On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> It says:
>>
>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>
>> Could you look in the executor logs (stderr on slave2) and see what made
>> it shut down? Since you are doing a join there's a high possibility of OOM
>> etc.
>>
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <po...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
>>> spark-submit. We are facing parquet write issue after doing dataframe joins
>>>
>>> We have a full data set and then an incremental data. We are reading
>>> them as dataframes, joining them, and then writing the data to the hdfs
>>> system in parquet format. We are getting the timeout error on the last
>>> partition.
>>>
>>> But if we do a count on the joined data it is working - which gives us
>>> the confidence that join is happening properly. Only in case of writing to
>>> the hdfs it is timing out.
>>>
>>> Code flow:
>>>
>>> // join two data frames - dfBase and dfIncr on primaryKey
>>> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), "outer")
>>>
>>> // applying a reduce function on each row.
>>> val mergedDF = joinedDF.map(x =>
>>>   reduceFunc(x)
>>> )
>>>
>>> //converting back to dataframe
>>> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)
>>>
>>> //writing to parquet file
>>> newdf.write.parquet(hdfsfilepath)
>>>
>>> Getting following exception:
>>>
>>> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
>>> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
>>> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
>>> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
>>> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845)
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
>>> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s).
>>> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false)
>>> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26.
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated
>>> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
>>> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
>>> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows.
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
>>> 	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
>>> 	... 8 more
>>> Caused by: java.io.IOException: Failed to connect to slave2/...:54845
>>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>> 	... 3 more
>>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>>> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>> 	... 1 more
>>>
>>>
>>
>

Re: Issue with parquet write after join (Spark 1.4.0)

Posted by Pooja Jain <po...@gmail.com>.
Join is happening successfully as I am able to do count() after the join.

Error is coming only while trying to write in parquet format on hdfs.

Thanks,
Pooja.

On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> It says:
>
> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>
> Could you look in the executor logs (stderr on slave2) and see what made
> it shut down? Since you are doing a join there's a high possibility of OOM
> etc.
>
>
> Thanks
> Best Regards
>
> On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <po...@gmail.com> wrote:
>
>> Hi,
>>
>> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
>> spark-submit. We are facing parquet write issue after doing dataframe joins
>>
>> We have a full data set and then an incremental data. We are reading them
>> as dataframes, joining them, and then writing the data to the hdfs system
>> in parquet format. We are getting the timeout error on the last partition.
>>
>> But if we do a count on the joined data it is working - which gives us
>> the confidence that join is happening properly. Only in case of writing to
>> the hdfs it is timing out.
>>
>> Code flow:
>>
>> // join two data frames - dfBase and dfIncr on primaryKey
>> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), "outer")
>>
>> // applying a reduce function on each row.
>> val mergedDF = joinedDF.map(x =>
>>   reduceFunc(x)
>> )
>>
>> //converting back to dataframe
>> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)
>>
>> //writing to parquet file
>> newdf.write.parquet(hdfsfilepath)
>>
>> Getting following exception:
>>
>> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
>> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms
>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
>> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
>> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
>> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26
>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845)
>> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
>> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s).
>> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false)
>> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26.
>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
>> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated
>> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
>> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
>> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
>> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
>> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
>> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows.
>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> 	at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845
>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
>> 	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
>> 	... 8 more
>> Caused by: java.io.IOException: Failed to connect to slave2/...:54845
>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> 	... 3 more
>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>> 	... 1 more
>>
>>
>

Re: Issue with parquet write after join (Spark 1.4.0)

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
It says:

Caused by: java.net.ConnectException: Connection refused: slave2/...:54845

Could you look in the executor logs (stderr on slave2) and see what made it
shut down? Since you are doing a join there's a high possibility of OOM etc.


Thanks
Best Regards

On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <po...@gmail.com> wrote:

> Hi,
>
> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
> spark-submit. We are facing parquet write issue after doing dataframe joins
>
> We have a full data set and then an incremental data. We are reading them
> as dataframes, joining them, and then writing the data to the hdfs system
> in parquet format. We are getting the timeout error on the last partition.
>
> But if we do a count on the joined data it is working - which gives us the
> confidence that join is happening properly. Only in case of writing to the
> hdfs it is timing out.
>
> Code flow:
>
> // join two data frames - dfBase and dfIncr on primaryKey
> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), "outer")
>
> // applying a reduce function on each row.
> val mergedDF = joinedDF.map(x =>
>   reduceFunc(x)
> )
>
> //converting back to dataframe
> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)
>
> //writing to parquet file
> newdf.write.parquet(hdfsfilepath)
>
> Getting following exception:
>
> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms
> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26
> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845)
> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s).
> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false)
> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26.
> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated
> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows.
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
> 	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
> 	... 8 more
> Caused by: java.io.IOException: Failed to connect to slave2/...:54845
> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 	... 3 more
> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	... 1 more
>
>