You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shengshan Zhang <zs...@gmail.com> on 2017/10/17 11:00:15 UTC
java.io.NotSerializableException about SparkStreaming
Hello guys!
java.io.NotSerializableException troubles me a lot when i process data with spark.
```
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com <http://hadoop-zk0.s.qima-inc.com/>,hadoop-zk1.s.qima-inc.com <http://hadoop-zk1.s.qima-inc.com/>,hadoop-zk2.s.qima-inc.com <http://hadoop-zk2.s.qima-inc.com/>")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
// TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
.map(_.get).map(Scan.convertForHbase _ )
json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
})
```
However it fails cause of java.io.NotSerializableException and follow is error info
17/10/16 18:56:50 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
So i change my code as follows
object mytest_config{
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
}
mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
.map(_.get).map(Scan.convertForHbase _ )
json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
})
And this could work!
Somebody got any ideas why this work , and what is the recommended way officially?
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
Re: java.io.NotSerializableException about SparkStreaming
Posted by 高佳翔 <ga...@gmail.com>.
Hi Shengshan,
In first code, ‘newAPIJobConfiguration’ is sharing across all rdds. So, it
should be serializable.
In second code, each rdd creates a new ‘mytest_config’ object and an
individual ‘newAPIJobConfiguration’ instead of sharing the same object. So
it can be non-serializable.
If it’s possible, maybe you can try to save the result of
mydata.foreachRDD(…) instead of each rdd like
val result = mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _
).map(_.validate[Scan]).map(Scan.transformScanRestult
_).filter(_.nonEmpty)
.map(_.get).map(Scan.convertForHbase _ )
result.write.save(...)
On Tue, Oct 17, 2017 at 7:00 PM, Shengshan Zhang <zs...@gmail.com> wrote:
> Hello guys!
> java.io.NotSerializableException troubles me a lot when i process data
> with spark.
> ```
> val hbase_conf = HBaseConfiguration.create()
> hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
> hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,
> hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
> val newAPIJobConfiguration = Job.getInstance(hbase_conf);
> newAPIJobConfiguration.getConfiguration().set(
> TableOutputFormat.OUTPUT_TABLE, "mytest_table");
> newAPIJobConfiguration.setOutputFormatClass(classOf[
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat[
> ImmutableBytesWritable]])
> newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir",
> "/tmp")
> // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
> mydata.foreachRDD( rdd => {
> val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult
> _).filter(_.nonEmpty)
> .map(_.get).map(Scan.convertForHbase _ )
> json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.
> getConfiguration)
> })
> ```
>
> However it fails cause of *java.io
> <http://java.io>.NotSerializableException *and follow is error info
> 17/10/16 18:56:50 ERROR Utils: Exception encountered
> java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
>
> *So i change my code as follows *
>
> object mytest_config{
> val hbase_conf = HBaseConfiguration.create()
> hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
> hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
> val newAPIJobConfiguration = Job.getInstance(hbase_conf);
> newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
> newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
> newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
> }
> mydata.foreachRDD( rdd => {
> val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
> .map(_.get).map(Scan.convertForHbase _ )
> json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
> })
>
> And this could work!
> Somebody got any ideas why this work , and what is the recommended way
> officially?
>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> <http://you.163.com/item/detail?id=1183001&from=web_gg_mail_jiaobiao_7>
>
--
Gao JiaXiang
Data Analyst, GCBI <http://www.gcbi.com.cn>