You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by yangliuyu <ya...@163.com> on 2014/12/12 09:35:57 UTC
Serialization issue when using HBase with Spark
The scenario is using HTable instance to scan multiple rowkey range in Spark
tasks look likes below:
Option 1:
val users = input
.map { case (deviceId, uid) =>
uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
val conf = HBaseConfiguration.create()
val table = new HTable(conf, "actions")
val result = iterator.map{ userId=>
(userId, getUserActions(table, userId, timeStart, timeStop))
}
table.close()
result
})
But got the exception:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
...
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
The reason not using sc.newAPIHadoopRDD is it only support one scan each
time.
val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
And if using MultiTableInputFormat, driver is not possible put all rowkeys
into HBaseConfiguration
Option 2:
sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
It may divide all rowkey ranges into several parts then use option 2, but I
prefer option 1. So is there any solution for option 1?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Serialization issue when using HBase with Spark
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Can you paste the complete code? it looks like at some point you are
passing a hadoop's configuration which is not Serializable. You can look at
this thread for similar discussion
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-td13378.html
Thanks
Best Regards
On Fri, Dec 12, 2014 at 2:05 PM, yangliuyu <ya...@163.com> wrote:
>
> The scenario is using HTable instance to scan multiple rowkey range in
> Spark
> tasks look likes below:
> Option 1:
> val users = input
> .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> val conf = HBaseConfiguration.create()
> val table = new HTable(conf, "actions")
> val result = iterator.map{ userId=>
> (userId, getUserActions(table, userId, timeStart, timeStop))
> }
> table.close()
> result
> })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
> at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> at
>
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
Re:Re: Serialization issue when using HBase with Spark
Posted by yangliuyu <ya...@163.com>.
Thanks All.
Finally the works code is below:
object PlayRecord {
def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long, timeStop: Long, cacheSize: Int,
filterSongDays: Int, filterPlaylistDays: Int): RDD[(String, (Int, Set[Long], Set[Long]))] = {
accounts.mapPartitions(iterator => {
if (iterator.nonEmpty) {
val conf = HBaseConfiguration.create()
val table = new HTable(conf, "user_action")
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), Bytes.toBytes("song_id"), CompareOp.EQUAL, new RegexStringComparator("^\\d+$")))
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), Bytes.toBytes("module"), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("displayed"))))
iterator.map(id => {
val scan = new Scan()
scan.setCaching(cacheSize)
scan.addFamily(Bytes.toBytes("stat"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("module"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("song_id"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("time"))
val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop)
scan.setStartRow(rowKeyRange._1)
scan.setStopRow(rowKeyRange._2)
scan.setFilter(filterList)
val userData = table.getScanner(scan).iterator().asScala.map(r => {
val module = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("module")))
val time = Bytes.toLong(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("time")))
module match {
case "listen" =>
val songId = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("song_id")))
(module, (time / DAY_MILLIS, songId))
case "displayed" =>
val playlistIds = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids")))
(module, (time / DAY_MILLIS, playlistIds))
case _ =>
(module, (0L, ""))
}
}).toList.groupBy(_._1)
val playRecordData = userData.get("listen")
val playRecords = if (playRecordData.nonEmpty) playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet
else Set[Long]()
val playlistDisPlayData = userData.get("displayed")
val playlistRecords = if (playlistDisPlayData.nonEmpty) playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet
else Set[Long]()
val result = (id, (idType, playRecords, playlistRecords))
if (!iterator.hasNext) {
table.close()
}
result
})
} else {
iterator.map(id => {
(id, (idType, Set[Long](), Set[Long]()))
})
}
})
}
}
As Shixiong mentioned Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/, I close table when iterator.hasNext is false, otherwise the application will be hung. And there is also another interesting project http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will try it later.
At 2014-12-15 17:52:47, "Aniket Bhatnagar" <an...@gmail.com> wrote:
"The reason not using sc.newAPIHadoopRDD is it only support one scan each time."
I am not sure is that's true. You can use multiple scans as following:
val scanStrings = scans.map(scan => convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)
where convertScanToString is implemented as:
/**
* Serializes a HBase scan into string.
* @param scan Scan to serialize.
* @return Base64 encoded serialized scan.
*/
private def convertScanToString(scan: Scan) = {
val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
Thanks,
Aniket
On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zs...@gmail.com> wrote:
Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section "setup() and cleanup()" in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
Best Regards,
Shixiong Zhu
2014-12-14 16:35 GMT+08:00 Yanbo <ya...@gmail.com>:In #1, class HTable can not be serializable.
You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface.
发自我的 iPad
> 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
>
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
> .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> val conf = HBaseConfiguration.create()
> val table = new HTable(conf, "actions")
> val result = iterator.map{ userId=>
> (userId, getUserActions(table, userId, timeStart, timeStop))
> }
> table.close()
> result
> })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Serialization issue when using HBase with Spark
Posted by Aniket Bhatnagar <an...@gmail.com>.
"The reason not using sc.newAPIHadoopRDD is it only support one scan each
time."
I am not sure is that's true. You can use multiple scans as following:
val scanStrings = scans.map(scan => convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)
where convertScanToString is implemented as:
/**
* Serializes a HBase scan into string.
* @param scan Scan to serialize.
* @return Base64 encoded serialized scan.
*/
private def convertScanToString(scan: Scan) = {
val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
Thanks,
Aniket
On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zs...@gmail.com> wrote:
> Just point out a bug in your codes. You should not use `mapPartitions`
> like that. For details, I recommend Section "setup() and cleanup()" in Sean
> Owen's post:
> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
>
> Best Regards,
> Shixiong Zhu
>
> 2014-12-14 16:35 GMT+08:00 Yanbo <ya...@gmail.com>:
>>
>> In #1, class HTable can not be serializable.
>> You also need to check you self defined function getUserActions and make
>> sure it is a member function of one class who implement serializable
>> interface.
>>
>> 发自我的 iPad
>>
>> > 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
>> >
>> > The scenario is using HTable instance to scan multiple rowkey range in
>> Spark
>> > tasks look likes below:
>> > Option 1:
>> > val users = input
>> > .map { case (deviceId, uid) =>
>> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>> > val conf = HBaseConfiguration.create()
>> > val table = new HTable(conf, "actions")
>> > val result = iterator.map{ userId=>
>> > (userId, getUserActions(table, userId, timeStart, timeStop))
>> > }
>> > table.close()
>> > result
>> > })
>> >
>> > But got the exception:
>> > org.apache.spark.SparkException: Task not serializable
>> > at
>> >
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> > at
>> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>> > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> > at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
>> > ...
>> > Caused by: java.io.NotSerializableException:
>> > org.apache.hadoop.conf.Configuration
>> > at
>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> > at
>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> > at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >
>> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
>> > time.
>> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>> > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> > classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > And if using MultiTableInputFormat, driver is not possible put all
>> rowkeys
>> > into HBaseConfiguration
>> > Option 2:
>> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>> > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> > classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > It may divide all rowkey ranges into several parts then use option 2,
>> but I
>> > prefer option 1. So is there any solution for option 1?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> > For additional commands, e-mail: user-help@spark.apache.org
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
Re: Serialization issue when using HBase with Spark
Posted by Shixiong Zhu <zs...@gmail.com>.
Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section "setup() and cleanup()" in Sean
Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
Best Regards,
Shixiong Zhu
2014-12-14 16:35 GMT+08:00 Yanbo <ya...@gmail.com>:
>
> In #1, class HTable can not be serializable.
> You also need to check you self defined function getUserActions and make
> sure it is a member function of one class who implement serializable
> interface.
>
> 发自我的 iPad
>
> > 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
> >
> > The scenario is using HTable instance to scan multiple rowkey range in
> Spark
> > tasks look likes below:
> > Option 1:
> > val users = input
> > .map { case (deviceId, uid) =>
> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> > val conf = HBaseConfiguration.create()
> > val table = new HTable(conf, "actions")
> > val result = iterator.map{ userId=>
> > (userId, getUserActions(table, userId, timeStart, timeStop))
> > }
> > table.close()
> > result
> > })
> >
> > But got the exception:
> > org.apache.spark.SparkException: Task not serializable
> > at
> >
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> > at
> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> > at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> > ...
> > Caused by: java.io.NotSerializableException:
> > org.apache.hadoop.conf.Configuration
> > at
> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> > at
> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> > at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >
> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
> > time.
> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> > classOf[org.apache.hadoop.hbase.client.Result])
> >
> > And if using MultiTableInputFormat, driver is not possible put all
> rowkeys
> > into HBaseConfiguration
> > Option 2:
> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> > classOf[org.apache.hadoop.hbase.client.Result])
> >
> > It may divide all rowkey ranges into several parts then use option 2,
> but I
> > prefer option 1. So is there any solution for option 1?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
Re: Serialization issue when using HBase with Spark
Posted by Yanbo <ya...@gmail.com>.
In #1, class HTable can not be serializable.
You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface.
发自我的 iPad
> 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
>
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
> .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> val conf = HBaseConfiguration.create()
> val table = new HTable(conf, "actions")
> val result = iterator.map{ userId=>
> (userId, getUserActions(table, userId, timeStart, timeStop))
> }
> table.close()
> result
> })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org