You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Lev Tsentsiper <le...@numerify.com> on 2016/11/05 00:15:48 UTC

NoSuchElementException

My code throws an exception when I am trying to create new DataSet from within SteamWriter sink

Simplified version of the code

  val df = sparkSession.readStream
    .format("json")
    .option("nullValue", " ")
    .option("headerFlag", "true")
    .option("spark.sql.shuffle.partitions", 1)
    .option("mode", "FAILFAST")
    .schema(tableSchema)
    .load(s"s3n://....")
df.writeStream
    //TODO Switch to S3 location
    //.option("checkpointLocation", s"$input/$tenant/checkpoints/")
    .option("checkpointLocation", "/tmp/checkpoins/test1")
    .foreach(new ForwachWriter() {
               ....
     override def close() = {
        val sparkSession = SparkSession.builder()
          .config(new SparkConf()
            .setAppName("zzz").set("spark.app.id", ""xxx)
            .set("spark.master", "local[1]")
          ).getOrCreate()

            val data = sparkSession.createDataset(rowList).
            .createOrReplaceTempView(tempTableName)
             val sql =   sparkSession.sql("....")
            sql.repartition(1).foreachPartition(iter=> {})
     }

});

This code throws an exception

java.util.NoSuchElementException: key not found: 202
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
        at org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
        at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
        at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
        at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
        at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
        at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:101)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
        at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
        at org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
        at org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:217)
        at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
        at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
        at org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
        at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
        at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
        at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
        at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
        at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
        at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:309)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:347)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:122)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
        at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:113)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:100)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:36)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:88)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2347)
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2344)
        at com.numerify.platform.pipeline.TableWriter$$anonfun$close$5.apply(TableWriter.scala:109)



This code works when run locally, but fails in cluster deployment.
Can anyone suggest better way to handle creation and processing of DataSet within ForeachWriter?

Thank you


Re: NoSuchElementException

Posted by Michael Armbrust <mi...@databricks.com>.
What are you trying to do?  It looks like you are mixing multiple
SparkContexts together.

On Fri, Nov 4, 2016 at 5:15 PM, Lev Tsentsiper <le...@numerify.com>
wrote:

> My code throws an exception when I am trying to create new DataSet from
> within SteamWriter sink
>
> Simplified version of the code
>
>   val df = sparkSession.readStream
>     .format("json")
>     .option("nullValue", " ")
>     .option("headerFlag", "true")
>     .option("spark.sql.shuffle.partitions", 1)
>     .option("mode", "FAILFAST")
>     .schema(tableSchema)
>     .load(s"s3n://....")
> df.writeStream
>     //TODO Switch to S3 location
>     //.option("checkpointLocation", s"$input/$tenant/checkpoints/")
>     .option("checkpointLocation", "/tmp/checkpoins/test1")
>     .foreach(new ForwachWriter() {
>                ....
>      override def close() = {
>         val sparkSession = SparkSession.builder()
>           .config(new SparkConf()
>             .setAppName("zzz").set("spark.app.id", ""xxx)
>             .set("spark.master", "local[1]")
>           ).getOrCreate()
>
>             val data = sparkSession.createDataset(rowList).
>             .createOrReplaceTempView(tempTableName)
>              val sql =   sparkSession.sql("....")
>             sql.repartition(1).foreachPartition(iter=> {})
>      }
>
> });
>
> This code throws an exception
>
> java.util.NoSuchElementException: key not found: 202
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:59)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
>         at org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
>
>         at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421)
>
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
> readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
>
>         at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
>
>         at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:65)
>         at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
>
>         at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
>
>         at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>         at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> prepareBroadcast(BroadcastHashJoinExec.scala:101)
>         at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> codegenOuter(BroadcastHashJoinExec.scala:242)
>         at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> doConsume(BroadcastHashJoinExec.scala:83)
>         at org.apache.spark.sql.execution.CodegenSupport$class.consume(
> WholeStageCodegenExec.scala:153)
>         at org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
>
>         at org.apache.spark.sql.execution.RowDataSourceScanExec.
> doProduce(ExistingRDD.scala:217)
>         at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>         at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
>         at org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
>
>         at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> doProduce(BroadcastHashJoinExec.scala:77)
>         at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>         at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
>         at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
>         at org.apache.spark.sql.execution.ProjectExec.doProduce(
> basicPhysicalOperators.scala:40)
>         at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>         at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
>         at org.apache.spark.sql.execution.ProjectExec.produce(
> basicPhysicalOperators.scala:30)
>         at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(
> WholeStageCodegenExec.scala:309)
>         at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(
> WholeStageCodegenExec.scala:347)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>
>         at org.apache.spark.sql.execution.exchange.ShuffleExchange.
> prepareShuffleDependency(ShuffleExchange.scala:86)
>         at org.apache.spark.sql.execution.exchange.
> ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:122)
>         at org.apache.spark.sql.execution.exchange.
> ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113)
>         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>
>         at org.apache.spark.sql.execution.exchange.
> ShuffleExchange.doExecute(ShuffleExchange.scala:113)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>
>         at org.apache.spark.sql.execution.InputAdapter.inputRDDs(
> WholeStageCodegenExec.scala:233)
>         at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:113)
>
>         at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(
> WholeStageCodegenExec.scala:361)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>
>         at org.apache.spark.sql.execution.joins.
> SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:100)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>
>         at org.apache.spark.sql.execution.InputAdapter.inputRDDs(
> WholeStageCodegenExec.scala:233)
>         at org.apache.spark.sql.execution.ProjectExec.inputRDDs(
> basicPhysicalOperators.scala:36)
>         at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(
> WholeStageCodegenExec.scala:361)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>
>         at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:88)
>
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>         at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>
>         at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
>
>         at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
>
>         at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2347)
>
>         at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2344)
>         at com.numerify.platform.pipeline.TableWriter$$anonfun$
> close$5.apply(TableWriter.scala:109)
>
>
>
> This code works when run locally, but fails in cluster deployment.
> Can anyone suggest better way to handle creation and processing of DataSet
> within ForeachWriter?
>
> Thank you
>
>