You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@smartlab.ws> on 2017/08/27 09:02:07 UTC

The implementation of the RichSinkFunction is not serializable.

Hi,

I'm trying to write on HBase using writeOutputFormat using a custom HBase
format inspired from this example
<https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
the error reported in the mail object.

Now, the OutputFormat I'm using is the following:

abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor,
confPath : Path) extends OutputFormat[T]{

  private val LOG = LoggerFactory.getLogger(this.getClass)

  var conf : org.apache.hadoop.conf.Configuration = _
  var connection : Connection = _
  var table : Table = _
  var taskNumber : String = _

  @throws[IOException]
  def configure(parameters: Configuration): Unit = {
    conf = HBaseConfiguration.create()
    conf.addResource(confPath.getPath)
    connection = ConnectionFactory.createConnection(conf)
  }


  @throws[IOException]
  def close(): Unit = {
    table.close()

  }


  @throws[IOException]
  def open(taskNumber: Int, numTasks: Int): Unit = {
    this.taskNumber = String.valueOf(taskNumber)
    val admin = connection.getAdmin

    if(!admin.tableExists(tableDescriptor.getTableName))
      admin.createTable(tableDescriptor)

    table = connection.getTable(tableDescriptor.getTableName)

  }
}

which is inherited by the actual format used, that implements the
writeRecord method


class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
  extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)

with BatchContainer being

case class BatchContainer(batch: Iterable[(String, String, String,
Int)]) extends Serializable

I'd like to ask you: what needs to be Serializable? As far as I see, conf,
connection and table are not Serializable and so they are surely part of
the issue. Are the constructor parameters, especially tableDescriptor which
is not Serializable, to be considered in this case? Should all the methods
implemented from the OutputFormat interface contain only Serializable
variables?

Thank you for you attention,
Federico

Re: The implementation of the RichSinkFunction is not serializable.

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hello everyone,

I solved my issue by using an Array[Byte] as a parameter, instead of the
explicit HTableDescriptor parameter. This way I can instantiate the
TableDescriptor inside the open method of OutputFormat using the static
method HTableDescriptor.parseFrom. In the end, marking conf, table and
connection as transient wouldn't make any difference.

Regards

2017-08-27 14:22 GMT+02:00 Federico D'Ambrosio <
federico.dambrosio@smartlab.ws>:

> Hi,
>
> could you elaborate, please? Marking conf, connection and table as
> transient wouldn't help because of the presence of the HTableDescriptor
> reference?
>
> 2017-08-27 12:44 GMT+02:00 Jörn Franke <jo...@gmail.com>:
>
>> It looks like that in your case everything should be serializable. An
>> alternative would be to mark certain non-serializable things as transient,
>> but as far as I see this is not possible in your case.
>>
>> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws> wrote:
>>
>> Hi,
>>
>> I'm trying to write on HBase using writeOutputFormat using a custom HBase
>> format inspired from this example
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
>> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
>> the error reported in the mail object.
>>
>> Now, the OutputFormat I'm using is the following:
>>
>> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{
>>
>>   private val LOG = LoggerFactory.getLogger(this.getClass)
>>
>>   var conf : org.apache.hadoop.conf.Configuration = _
>>   var connection : Connection = _
>>   var table : Table = _
>>   var taskNumber : String = _
>>
>>   @throws[IOException]
>>   def configure(parameters: Configuration): Unit = {
>>     conf = HBaseConfiguration.create()
>>     conf.addResource(confPath.getPath)
>>     connection = ConnectionFactory.createConnection(conf)
>>   }
>>
>>
>>   @throws[IOException]
>>   def close(): Unit = {
>>     table.close()
>>
>>   }
>>
>>
>>   @throws[IOException]
>>   def open(taskNumber: Int, numTasks: Int): Unit = {
>>     this.taskNumber = String.valueOf(taskNumber)
>>     val admin = connection.getAdmin
>>
>>     if(!admin.tableExists(tableDescriptor.getTableName))
>>       admin.createTable(tableDescriptor)
>>
>>     table = connection.getTable(tableDescriptor.getTableName)
>>
>>   }
>> }
>>
>> which is inherited by the actual format used, that implements the writeRecord method
>>
>>
>> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>>
>> with BatchContainer being
>>
>> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable
>>
>> I'd like to ask you: what needs to be Serializable? As far as I see,
>> conf, connection and table are not Serializable and so they are surely part
>> of the issue. Are the constructor parameters, especially tableDescriptor
>> which is not Serializable, to be considered in this case? Should all the
>> methods implemented from the OutputFormat interface contain only
>> Serializable variables?
>>
>> Thank you for you attention,
>> Federico
>>
>>
>

Re: The implementation of the RichSinkFunction is not serializable.

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hi,

could you elaborate, please? Marking conf, connection and table as
transient wouldn't help because of the presence of the HTableDescriptor
reference?

2017-08-27 12:44 GMT+02:00 Jörn Franke <jo...@gmail.com>:

> It looks like that in your case everything should be serializable. An
> alternative would be to mark certain non-serializable things as transient,
> but as far as I see this is not possible in your case.
>
> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
> Hi,
>
> I'm trying to write on HBase using writeOutputFormat using a custom HBase
> format inspired from this example
> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
> the error reported in the mail object.
>
> Now, the OutputFormat I'm using is the following:
>
> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{
>
>   private val LOG = LoggerFactory.getLogger(this.getClass)
>
>   var conf : org.apache.hadoop.conf.Configuration = _
>   var connection : Connection = _
>   var table : Table = _
>   var taskNumber : String = _
>
>   @throws[IOException]
>   def configure(parameters: Configuration): Unit = {
>     conf = HBaseConfiguration.create()
>     conf.addResource(confPath.getPath)
>     connection = ConnectionFactory.createConnection(conf)
>   }
>
>
>   @throws[IOException]
>   def close(): Unit = {
>     table.close()
>
>   }
>
>
>   @throws[IOException]
>   def open(taskNumber: Int, numTasks: Int): Unit = {
>     this.taskNumber = String.valueOf(taskNumber)
>     val admin = connection.getAdmin
>
>     if(!admin.tableExists(tableDescriptor.getTableName))
>       admin.createTable(tableDescriptor)
>
>     table = connection.getTable(tableDescriptor.getTableName)
>
>   }
> }
>
> which is inherited by the actual format used, that implements the writeRecord method
>
>
> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>
> with BatchContainer being
>
> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable
>
> I'd like to ask you: what needs to be Serializable? As far as I see, conf,
> connection and table are not Serializable and so they are surely part of
> the issue. Are the constructor parameters, especially tableDescriptor which
> is not Serializable, to be considered in this case? Should all the methods
> implemented from the OutputFormat interface contain only Serializable
> variables?
>
> Thank you for you attention,
> Federico
>
>

Re: The implementation of the RichSinkFunction is not serializable.

Posted by Jörn Franke <jo...@gmail.com>.
It looks like that in your case everything should be serializable. An alternative would be to mark certain non-serializable things as transient, but as far as I see this is not possible in your case.

> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
> Hi,
> 
> I'm trying to write on HBase using writeOutputFormat using a custom HBase format inspired from this example in flink-hbase (mind you, I'm using Scala instead of Java) and encountering the error reported in the mail object.
> 
> Now, the OutputFormat I'm using is the following:
> 
> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath : Path) extends OutputFormat[T]{
> 
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> 
>   var conf : org.apache.hadoop.conf.Configuration = _
>   var connection : Connection = _
>   var table : Table = _
>   var taskNumber : String = _
> 
>   @throws[IOException]
>   def configure(parameters: Configuration): Unit = {
>     conf = HBaseConfiguration.create()
>     conf.addResource(confPath.getPath)
>     connection = ConnectionFactory.createConnection(conf)
>   }
> 
> 
>   @throws[IOException]
>   def close(): Unit = {
>     table.close()
> 
>   }
> 
> 
>   @throws[IOException]
>   def open(taskNumber: Int, numTasks: Int): Unit = {
>     this.taskNumber = String.valueOf(taskNumber)
>     val admin = connection.getAdmin
> 
>     if(!admin.tableExists(tableDescriptor.getTableName))
>       admin.createTable(tableDescriptor)
> 
>     table = connection.getTable(tableDescriptor.getTableName)
> 
>   }
> }
> 
> which is inherited by the actual format used, that implements the writeRecord method
> 
> 
> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
> 
> with BatchContainer being
> 
> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends Serializable
> 
> I'd like to ask you: what needs to be Serializable? As far as I see, conf, connection and table are not Serializable and so they are surely part of the issue. Are the constructor parameters, especially tableDescriptor which is not Serializable, to be considered in this case? Should all the methods implemented from the OutputFormat interface contain only Serializable variables? 
> 
> Thank you for you attention,
> Federico