You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by yangliuyu <ya...@163.com> on 2014/12/12 09:35:57 UTC

Serialization issue when using HBase with Spark

The scenario is using HTable instance to scan multiple rowkey range in Spark
tasks look likes below:
Option 1:
val users = input
      .map { case (deviceId, uid) =>
uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
      val conf = HBaseConfiguration.create()
      val table = new HTable(conf, "actions")
      val result = iterator.map{ userId=>
        (userId, getUserActions(table, userId, timeStart, timeStop))
      }
      table.close()
      result
    })

But got the exception:
org.apache.spark.SparkException: Task not serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
        at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
...
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

The reason not using sc.newAPIHadoopRDD is it only support one scan each
time.
val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],    
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],    
      classOf[org.apache.hadoop.hbase.client.Result]) 

And if using MultiTableInputFormat, driver is not possible put all rowkeys
into HBaseConfiguration
Option 2:
sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

It may divide all rowkey ranges into several parts then use option 2, but I
prefer option 1. So is there any solution for option 1? 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Serialization issue when using HBase with Spark

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Can you paste the complete code? it looks like at some point you are
passing a hadoop's configuration which is not Serializable. You can look at
this thread for similar discussion
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-td13378.html

Thanks
Best Regards

On Fri, Dec 12, 2014 at 2:05 PM, yangliuyu <ya...@163.com> wrote:
>
> The scenario is using HTable instance to scan multiple rowkey range in
> Spark
> tasks look likes below:
> Option 1:
> val users = input
>       .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>       val conf = HBaseConfiguration.create()
>       val table = new HTable(conf, "actions")
>       val result = iterator.map{ userId=>
>         (userId, getUserActions(table, userId, timeStart, timeStop))
>       }
>       table.close()
>       result
>     })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
>         at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>         at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>         at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>         at
>
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>       classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>       classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re:Re: Serialization issue when using HBase with Spark

Posted by yangliuyu <ya...@163.com>.
Thanks All.


Finally the works code is below:


object PlayRecord {
  def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long, timeStop: Long, cacheSize: Int,
                     filterSongDays: Int, filterPlaylistDays: Int): RDD[(String, (Int, Set[Long], Set[Long]))] = {
    accounts.mapPartitions(iterator => {
      if (iterator.nonEmpty) {
        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, "user_action")
        val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
        filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), Bytes.toBytes("song_id"), CompareOp.EQUAL, new RegexStringComparator("^\\d+$")))
        filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), Bytes.toBytes("module"), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("displayed"))))
        iterator.map(id => {
          val scan = new Scan()
          scan.setCaching(cacheSize)
          scan.addFamily(Bytes.toBytes("stat"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("module"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("song_id"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("time"))
          val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop)
          scan.setStartRow(rowKeyRange._1)
          scan.setStopRow(rowKeyRange._2)
          scan.setFilter(filterList)
          val userData = table.getScanner(scan).iterator().asScala.map(r => {
            val module = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("module")))
            val time = Bytes.toLong(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("time")))
            module match {
              case "listen" =>
                val songId = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("song_id")))
                (module, (time / DAY_MILLIS, songId))
              case "displayed" =>
                val playlistIds = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids")))
                (module, (time / DAY_MILLIS, playlistIds))
              case _ =>
                (module, (0L, ""))
            }
          }).toList.groupBy(_._1)
          val playRecordData = userData.get("listen")
          val playRecords = if (playRecordData.nonEmpty) playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet
          else Set[Long]()
          val playlistDisPlayData = userData.get("displayed")
          val playlistRecords = if (playlistDisPlayData.nonEmpty) playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet
          else Set[Long]()
          val result = (id, (idType, playRecords, playlistRecords))
          if (!iterator.hasNext) {
            table.close()
          }
          result
        })
      } else {
        iterator.map(id => {
          (id, (idType, Set[Long](), Set[Long]()))
        })
      }
    })
  }
}


As Shixiong mentioned Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/, I close table when iterator.hasNext is false, otherwise the application will be hung. And there is also another interesting project  http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will try it later. 





At 2014-12-15 17:52:47, "Aniket Bhatnagar" <an...@gmail.com> wrote:
"The reason not using sc.newAPIHadoopRDD is it only support one scan each time."



I am not sure is that's true. You can use multiple scans as following:


val scanStrings = scans.map(scan => convertScanToString(scan)) 
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)


where convertScanToString is implemented as:


/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}


Thanks,
Aniket


On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zs...@gmail.com> wrote:

Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section "setup() and cleanup()" in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/



Best Regards,

Shixiong Zhu


2014-12-14 16:35 GMT+08:00 Yanbo <ya...@gmail.com>:In #1, class HTable can not be serializable.
You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface.

发自我的 iPad

> 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:

>
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
>      .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>      val conf = HBaseConfiguration.create()
>      val table = new HTable(conf, "actions")
>      val result = iterator.map{ userId=>
>        (userId, getUserActions(table, userId, timeStart, timeStop))
>      }
>      table.close()
>      result
>    })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
>        at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>        at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>        at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
>        at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>        at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>        at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>      classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>      classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Serialization issue when using HBase with Spark

Posted by Aniket Bhatnagar <an...@gmail.com>.
"The reason not using sc.newAPIHadoopRDD is it only support one scan each
time."

I am not sure is that's true. You can use multiple scans as following:

val scanStrings = scans.map(scan => convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)

where convertScanToString is implemented as:

/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}

Thanks,
Aniket

On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zs...@gmail.com> wrote:

> Just point out a bug in your codes. You should not use `mapPartitions`
> like that. For details, I recommend Section "setup() and cleanup()" in Sean
> Owen's post:
> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
>
> Best Regards,
> Shixiong Zhu
>
> 2014-12-14 16:35 GMT+08:00 Yanbo <ya...@gmail.com>:
>>
>> In #1, class HTable can not be serializable.
>> You also need to check you self defined function getUserActions and make
>> sure it is a member function of one class who implement serializable
>> interface.
>>
>> 发自我的 iPad
>>
>> > 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
>> >
>> > The scenario is using HTable instance to scan multiple rowkey range in
>> Spark
>> > tasks look likes below:
>> > Option 1:
>> > val users = input
>> >      .map { case (deviceId, uid) =>
>> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>> >      val conf = HBaseConfiguration.create()
>> >      val table = new HTable(conf, "actions")
>> >      val result = iterator.map{ userId=>
>> >        (userId, getUserActions(table, userId, timeStart, timeStop))
>> >      }
>> >      table.close()
>> >      result
>> >    })
>> >
>> > But got the exception:
>> > org.apache.spark.SparkException: Task not serializable
>> >        at
>> >
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> >        at
>> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> >        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>> >        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> >        at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
>> > ...
>> > Caused by: java.io.NotSerializableException:
>> > org.apache.hadoop.conf.Configuration
>> >        at
>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> >        at
>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >        at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >
>> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
>> > time.
>> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> >      classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > And if using MultiTableInputFormat, driver is not possible put all
>> rowkeys
>> > into HBaseConfiguration
>> > Option 2:
>> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> >      classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > It may divide all rowkey ranges into several parts then use option 2,
>> but I
>> > prefer option 1. So is there any solution for option 1?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> > For additional commands, e-mail: user-help@spark.apache.org
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>

Re: Serialization issue when using HBase with Spark

Posted by Shixiong Zhu <zs...@gmail.com>.
Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section "setup() and cleanup()" in Sean
Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Best Regards,
Shixiong Zhu

2014-12-14 16:35 GMT+08:00 Yanbo <ya...@gmail.com>:
>
> In #1, class HTable can not be serializable.
> You also need to check you self defined function getUserActions and make
> sure it is a member function of one class who implement serializable
> interface.
>
> 发自我的 iPad
>
> > 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
> >
> > The scenario is using HTable instance to scan multiple rowkey range in
> Spark
> > tasks look likes below:
> > Option 1:
> > val users = input
> >      .map { case (deviceId, uid) =>
> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> >      val conf = HBaseConfiguration.create()
> >      val table = new HTable(conf, "actions")
> >      val result = iterator.map{ userId=>
> >        (userId, getUserActions(table, userId, timeStart, timeStop))
> >      }
> >      table.close()
> >      result
> >    })
> >
> > But got the exception:
> > org.apache.spark.SparkException: Task not serializable
> >        at
> >
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> >        at
> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> >        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> >        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> >        at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> > ...
> > Caused by: java.io.NotSerializableException:
> > org.apache.hadoop.conf.Configuration
> >        at
> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> >        at
> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >        at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >
> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
> > time.
> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> >      classOf[org.apache.hadoop.hbase.client.Result])
> >
> > And if using MultiTableInputFormat, driver is not possible put all
> rowkeys
> > into HBaseConfiguration
> > Option 2:
> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> >      classOf[org.apache.hadoop.hbase.client.Result])
> >
> > It may divide all rowkey ranges into several parts then use option 2,
> but I
> > prefer option 1. So is there any solution for option 1?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Serialization issue when using HBase with Spark

Posted by Yanbo <ya...@gmail.com>.
In #1, class HTable can not be serializable.
You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface.

发自我的 iPad

> 在 2014年12月12日,下午4:35,yangliuyu <ya...@163.com> 写道:
> 
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
>      .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>      val conf = HBaseConfiguration.create()
>      val table = new HTable(conf, "actions")
>      val result = iterator.map{ userId=>
>        (userId, getUserActions(table, userId, timeStart, timeStop))
>      }
>      table.close()
>      result
>    })
> 
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
>        at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>        at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>        at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
>        at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>        at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>        at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],    
>      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],    
>      classOf[org.apache.hadoop.hbase.client.Result]) 
> 
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>      classOf[org.apache.hadoop.hbase.client.Result])
> 
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1? 
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org