You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shengshan Zhang <zs...@gmail.com> on 2017/10/17 11:00:15 UTC

java.io.NotSerializableException about SparkStreaming

Hello guys!
java.io.NotSerializableException troubles me a lot when i process data with spark. 
```
    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
    hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com <http://hadoop-zk0.s.qima-inc.com/>,hadoop-zk1.s.qima-inc.com <http://hadoop-zk1.s.qima-inc.com/>,hadoop-zk2.s.qima-inc.com <http://hadoop-zk2.s.qima-inc.com/>")
    val newAPIJobConfiguration = Job.getInstance(hbase_conf);
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
    // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
    mydata.foreachRDD( rdd => {
      val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
      .map(_.get).map(Scan.convertForHbase _ )
      json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
    })
```

However it fails cause of java.io.NotSerializableException and follow is error info
17/10/16 18:56:50 ERROR Utils: Exception encountered
        java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)


So i change  my code as follows 
object mytest_config{
    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
    hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
    val newAPIJobConfiguration = Job.getInstance(hbase_conf);
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
  }
mydata.foreachRDD( rdd => {
      val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
      .map(_.get).map(Scan.convertForHbase _ )
      json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
    })
And this could work!
Somebody got any ideas why this work , and what is the recommended way officially?



【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>    <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>   


【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>    <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>   


【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>   <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>

Re: java.io.NotSerializableException about SparkStreaming

Posted by 高佳翔 <ga...@gmail.com>.
Hi Shengshan,

In first code, ‘newAPIJobConfiguration’ is sharing across all rdds. So, it
should be serializable.

In second code, each rdd creates a new ‘mytest_config’ object and an
individual ‘newAPIJobConfiguration’ instead of sharing the same object. So
it can be non-serializable.

If it’s possible, maybe you can try to save the result of
mydata.foreachRDD(…) instead of each rdd like

val result = mydata.foreachRDD( rdd => {
      val json_rdd = rdd.map(Json.parse _
).map(_.validate[Scan]).map(Scan.transformScanRestult
_).filter(_.nonEmpty)
      .map(_.get).map(Scan.convertForHbase _ )

result.write.save(...)

​

On Tue, Oct 17, 2017 at 7:00 PM, Shengshan Zhang <zs...@gmail.com> wrote:

> Hello guys!
> java.io.NotSerializableException troubles me a lot when i process data
> with spark.
> ```
>     val hbase_conf = HBaseConfiguration.create()
>     hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
>     hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,
> hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
>     val newAPIJobConfiguration = Job.getInstance(hbase_conf);
>     newAPIJobConfiguration.getConfiguration().set(
> TableOutputFormat.OUTPUT_TABLE, "mytest_table");
>     newAPIJobConfiguration.setOutputFormatClass(classOf[
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat[
> ImmutableBytesWritable]])
>     newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir",
> "/tmp")
>     // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
>     mydata.foreachRDD( rdd => {
>       val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult
> _).filter(_.nonEmpty)
>       .map(_.get).map(Scan.convertForHbase _ )
>       json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.
> getConfiguration)
>     })
> ```
>
> However it fails cause of *java.io
> <http://java.io>.NotSerializableException *and follow is error info
> 17/10/16 18:56:50 ERROR Utils: Exception encountered
>         java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
>
> *So i change  my code as follows *
>
> object mytest_config{
>     val hbase_conf = HBaseConfiguration.create()
>     hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
>     hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
>     val newAPIJobConfiguration = Job.getInstance(hbase_conf);
>     newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
>     newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
>     newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
>   }
> mydata.foreachRDD( rdd => {
>       val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
>       .map(_.get).map(Scan.convertForHbase _ )
>       json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
>     })
>
> And this could work!
> Somebody got any ideas why this work , and what is the recommended way
> officially?
>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
>



-- 
Gao JiaXiang
Data Analyst, GCBI <http://www.gcbi.com.cn>