You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2017/04/29 14:32:38 UTC

Periodic flush sink?

Hi,

I have a sink that writes my records into HBase.

The data stream is attached to measurements from an internal testing
instance of the website.
As a consequence there are periods of really high load (someone is doing a
load test) and really low load (only a hand full of people are testing
stuff).

I read the records from Kafka and I want to write the records into HBase.
Because under high load it is more efficient to buffer the writes between
the client and the server and as indicated by HBase I use a BufferedMutator.

This BufferedMutator works with a 'fixed size' buffer and under high load
setting it to a few MiB improves the performance writing to HBase greatly.
However under low load you have to wait until the buffer is full and that
can be a LONG time (hours) when the load is really low.

I want to fire a periodic event into my sink to ensure I get a flush of the
buffers atleast every few seconds.

Simply implement a standard Java  TimerTask and fire that using a Timer?
Or is there a better way of doing that in Flink?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Periodic flush sink?

Posted by Ted Yu <yu...@gmail.com>.
bq. is the mutator thread safe?

See HBASE-17361

On Wed, May 3, 2017 at 1:52 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Niels,
> With any kind of buffering you need to be careful when it comes to fault
> tolerance. In your case, you should make sure to flush the buffers when
> checkpointing, otherwise you might lose data because those elements will
> not be resend after a failure.
>
> With the periodic timer my only concern would be concurrency issues, i.e.
> is the mutator thread safe?
>
> Best,
> Aljoscha
>
> On 30. Apr 2017, at 09:24, Kamil Dziublinski <ka...@gmail.com>
> wrote:
>
> Hi Niels,
>
> This sounds to me like a great use case for using window functions. You
> could partition your data (use keyby) based on website and then hold your
> window for certain amount of time. After that you could give your sink
> already batched object and store it directly. On top of that if you are
> worried that data might become too big in fixed window time you could use a
> trigger that fires both based on time and size. Although imo its no problem
> to have bigger put for hbase. But you need to test.
> I have very similar use case with kafka and hbase and I solved it like
> that.
> Hope that helps.
> On Sat, 29 Apr 2017 at 18:05, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Thanks.
>>
>> The specific table I have here is used to debugging purposes so at the
>> HBase level I set a TTL of the data of 12 hours.
>> So I'm not worrying about the Hfiles.
>> Doing a lot of 'small' calls has an impact on HBase as a whole (not just
>> this table) so I want buffering.
>> Having a buffer that can hold 1000 events and at times I create 10 events
>> with a single page and I'm the only on on the site (at that moment) the
>> events will be buffered for a much too long time.
>>
>> I did a quick test and this seems to work for my case.
>> In what situations do you guys expect this code construct to fail? Any
>> edge cases I missed?
>>
>> Niels
>>
>> private transient BufferedMutator mutator = null;
>> private transient Timer timer = null;
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
>>   Connection connection = ConnectionFactory.createConnection(hbaseConfig);
>>
>>   mutator = connection.getBufferedMutator(
>>     new BufferedMutatorParams(TableName.valueOf(tableName))
>>       .pool(getDefaultExecutor(hbaseConfig))
>>       .writeBufferSize(HBASE_BUFFER_SIZE)
>>   );
>>
>>   timer = new Timer();
>>   timer.schedule(new TimerTask(){
>>     @Override
>>     public void run() {
>>       try {
>>         MySink.this.mutator.flush();
>>       } catch (Exception e) {
>>         // Ignore
>>       }
>>     }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
>> }
>>
>> @Override
>> public void close() throws IOException {
>>   timer.cancel();
>>   mutator.close();
>> }
>>
>>
>>
>>
>>
>> On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> I expect Flink expert to answer your question.
>>>
>>> bq. I get a flush of the buffers atleast every few seconds
>>>
>>> From hbase point of view, during low traffic period, the above may
>>> result in many small hfiles, leading to more work for the compaction.
>>>
>>> FYI
>>>
>>> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a sink that writes my records into HBase.
>>>>
>>>> The data stream is attached to measurements from an internal testing
>>>> instance of the website.
>>>> As a consequence there are periods of really high load (someone is
>>>> doing a load test) and really low load (only a hand full of people are
>>>> testing stuff).
>>>>
>>>> I read the records from Kafka and I want to write the records into
>>>> HBase.
>>>> Because under high load it is more efficient to buffer the writes
>>>> between the client and the server and as indicated by HBase I use a
>>>> BufferedMutator.
>>>>
>>>> This BufferedMutator works with a 'fixed size' buffer and under high
>>>> load setting it to a few MiB improves the performance writing to HBase
>>>> greatly.
>>>> However under low load you have to wait until the buffer is full and
>>>> that can be a LONG time (hours) when the load is really low.
>>>>
>>>> I want to fire a periodic event into my sink to ensure I get a flush of
>>>> the buffers atleast every few seconds.
>>>>
>>>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>>>> Or is there a better way of doing that in Flink?
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>

Re: Periodic flush sink?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Niels,
With any kind of buffering you need to be careful when it comes to fault tolerance. In your case, you should make sure to flush the buffers when checkpointing, otherwise you might lose data because those elements will not be resend after a failure.

With the periodic timer my only concern would be concurrency issues, i.e. is the mutator thread safe?

Best,
Aljoscha
> On 30. Apr 2017, at 09:24, Kamil Dziublinski <ka...@gmail.com> wrote:
> 
> Hi Niels,
> 
> This sounds to me like a great use case for using window functions. You could partition your data (use keyby) based on website and then hold your window for certain amount of time. After that you could give your sink already batched object and store it directly. On top of that if you are worried that data might become too big in fixed window time you could use a trigger that fires both based on time and size. Although imo its no problem to have bigger put for hbase. But you need to test. 
> I have very similar use case with kafka and hbase and I solved it like that. 
> Hope that helps.
> On Sat, 29 Apr 2017 at 18:05, Niels Basjes <Niels@basjes.nl <ma...@basjes.nl>> wrote:
> Thanks.
> 
> The specific table I have here is used to debugging purposes so at the HBase level I set a TTL of the data of 12 hours.
> So I'm not worrying about the Hfiles.
> Doing a lot of 'small' calls has an impact on HBase as a whole (not just this table) so I want buffering.
> Having a buffer that can hold 1000 events and at times I create 10 events with a single page and I'm the only on on the site (at that moment) the events will be buffered for a much too long time.
> 
> I did a quick test and this seems to work for my case.
> In what situations do you guys expect this code construct to fail? Any edge cases I missed?
> 
> Niels
> 
> private transient BufferedMutator mutator = null;
> private transient Timer timer = null;
> 
> @Override
> public void open(Configuration parameters) throws Exception {
>   org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
>   Connection connection = ConnectionFactory.createConnection(hbaseConfig);
> 
>   mutator = connection.getBufferedMutator(
>     new BufferedMutatorParams(TableName.valueOf(tableName))
>       .pool(getDefaultExecutor(hbaseConfig))
>       .writeBufferSize(HBASE_BUFFER_SIZE)
>   );
> 
>   timer = new Timer();
>   timer.schedule(new TimerTask(){
>     @Override
>     public void run() {
>       try {
>         MySink.this.mutator.flush();
>       } catch (Exception e) {
>         // Ignore
>       }
>     }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
> }
> 
> @Override
> public void close() throws IOException {
>   timer.cancel();
>   mutator.close();
> }
> 
> 
> 
> 
> On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yuzhihong@gmail.com <ma...@gmail.com>> wrote:
> I expect Flink expert to answer your question.
> 
> bq. I get a flush of the buffers atleast every few seconds
> 
> From hbase point of view, during low traffic period, the above may result in many small hfiles, leading to more work for the compaction.
> 
> FYI
> 
> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Niels@basjes.nl <ma...@basjes.nl>> wrote:
> Hi,
> 
> I have a sink that writes my records into HBase.
> 
> The data stream is attached to measurements from an internal testing instance of the website.
> As a consequence there are periods of really high load (someone is doing a load test) and really low load (only a hand full of people are testing stuff).
> 
> I read the records from Kafka and I want to write the records into HBase.
> Because under high load it is more efficient to buffer the writes between the client and the server and as indicated by HBase I use a BufferedMutator.
> 
> This BufferedMutator works with a 'fixed size' buffer and under high load setting it to a few MiB improves the performance writing to HBase greatly.
> However under low load you have to wait until the buffer is full and that can be a LONG time (hours) when the load is really low.
> 
> I want to fire a periodic event into my sink to ensure I get a flush of the buffers atleast every few seconds.
> 
> Simply implement a standard Java  TimerTask and fire that using a Timer?
> Or is there a better way of doing that in Flink?
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Re: Periodic flush sink?

Posted by Kamil Dziublinski <ka...@gmail.com>.
Hi Niels,

This sounds to me like a great use case for using window functions. You
could partition your data (use keyby) based on website and then hold your
window for certain amount of time. After that you could give your sink
already batched object and store it directly. On top of that if you are
worried that data might become too big in fixed window time you could use a
trigger that fires both based on time and size. Although imo its no problem
to have bigger put for hbase. But you need to test.
I have very similar use case with kafka and hbase and I solved it like
that.
Hope that helps.
On Sat, 29 Apr 2017 at 18:05, Niels Basjes <Ni...@basjes.nl> wrote:

> Thanks.
>
> The specific table I have here is used to debugging purposes so at the
> HBase level I set a TTL of the data of 12 hours.
> So I'm not worrying about the Hfiles.
> Doing a lot of 'small' calls has an impact on HBase as a whole (not just
> this table) so I want buffering.
> Having a buffer that can hold 1000 events and at times I create 10 events
> with a single page and I'm the only on on the site (at that moment) the
> events will be buffered for a much too long time.
>
> I did a quick test and this seems to work for my case.
> In what situations do you guys expect this code construct to fail? Any
> edge cases I missed?
>
> Niels
>
> private transient BufferedMutator mutator = null;
> private transient Timer timer = null;
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
>   Connection connection = ConnectionFactory.createConnection(hbaseConfig);
>
>   mutator = connection.getBufferedMutator(
>     new BufferedMutatorParams(TableName.valueOf(tableName))
>       .pool(getDefaultExecutor(hbaseConfig))
>       .writeBufferSize(HBASE_BUFFER_SIZE)
>   );
>
>   timer = new Timer();
>   timer.schedule(new TimerTask(){
>     @Override
>     public void run() {
>       try {
>         MySink.this.mutator.flush();
>       } catch (Exception e) {
>         // Ignore
>       }
>     }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
> }
>
> @Override
> public void close() throws IOException {
>   timer.cancel();
>   mutator.close();
> }
>
>
>
>
>
> On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> I expect Flink expert to answer your question.
>>
>> bq. I get a flush of the buffers atleast every few seconds
>>
>> From hbase point of view, during low traffic period, the above may result
>> in many small hfiles, leading to more work for the compaction.
>>
>> FYI
>>
>> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> I have a sink that writes my records into HBase.
>>>
>>> The data stream is attached to measurements from an internal testing
>>> instance of the website.
>>> As a consequence there are periods of really high load (someone is doing
>>> a load test) and really low load (only a hand full of people are testing
>>> stuff).
>>>
>>> I read the records from Kafka and I want to write the records into HBase.
>>> Because under high load it is more efficient to buffer the writes
>>> between the client and the server and as indicated by HBase I use a
>>> BufferedMutator.
>>>
>>> This BufferedMutator works with a 'fixed size' buffer and under high
>>> load setting it to a few MiB improves the performance writing to HBase
>>> greatly.
>>> However under low load you have to wait until the buffer is full and
>>> that can be a LONG time (hours) when the load is really low.
>>>
>>> I want to fire a periodic event into my sink to ensure I get a flush of
>>> the buffers atleast every few seconds.
>>>
>>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>>> Or is there a better way of doing that in Flink?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Periodic flush sink?

Posted by Niels Basjes <Ni...@basjes.nl>.
Thanks.

The specific table I have here is used to debugging purposes so at the
HBase level I set a TTL of the data of 12 hours.
So I'm not worrying about the Hfiles.
Doing a lot of 'small' calls has an impact on HBase as a whole (not just
this table) so I want buffering.
Having a buffer that can hold 1000 events and at times I create 10 events
with a single page and I'm the only on on the site (at that moment) the
events will be buffered for a much too long time.

I did a quick test and this seems to work for my case.
In what situations do you guys expect this code construct to fail? Any edge
cases I missed?

Niels

private transient BufferedMutator mutator = null;
private transient Timer timer = null;

@Override
public void open(Configuration parameters) throws Exception {
  org.apache.hadoop.conf.Configuration hbaseConfig =
HBaseConfiguration.create();
  Connection connection = ConnectionFactory.createConnection(hbaseConfig);

  mutator = connection.getBufferedMutator(
    new BufferedMutatorParams(TableName.valueOf(tableName))
      .pool(getDefaultExecutor(hbaseConfig))
      .writeBufferSize(HBASE_BUFFER_SIZE)
  );

  timer = new Timer();
  timer.schedule(new TimerTask(){
    @Override
    public void run() {
      try {
        MySink.this.mutator.flush();
      } catch (Exception e) {
        // Ignore
      }
    }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
}

@Override
public void close() throws IOException {
  timer.cancel();
  mutator.close();
}





On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yu...@gmail.com> wrote:

> I expect Flink expert to answer your question.
>
> bq. I get a flush of the buffers atleast every few seconds
>
> From hbase point of view, during low traffic period, the above may result
> in many small hfiles, leading to more work for the compaction.
>
> FYI
>
> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> I have a sink that writes my records into HBase.
>>
>> The data stream is attached to measurements from an internal testing
>> instance of the website.
>> As a consequence there are periods of really high load (someone is doing
>> a load test) and really low load (only a hand full of people are testing
>> stuff).
>>
>> I read the records from Kafka and I want to write the records into HBase.
>> Because under high load it is more efficient to buffer the writes between
>> the client and the server and as indicated by HBase I use a BufferedMutator.
>>
>> This BufferedMutator works with a 'fixed size' buffer and under high load
>> setting it to a few MiB improves the performance writing to HBase greatly.
>> However under low load you have to wait until the buffer is full and that
>> can be a LONG time (hours) when the load is really low.
>>
>> I want to fire a periodic event into my sink to ensure I get a flush of
>> the buffers atleast every few seconds.
>>
>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>> Or is there a better way of doing that in Flink?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Periodic flush sink?

Posted by Ted Yu <yu...@gmail.com>.
I expect Flink expert to answer your question.

bq. I get a flush of the buffers atleast every few seconds

From hbase point of view, during low traffic period, the above may result
in many small hfiles, leading to more work for the compaction.

FYI

On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> I have a sink that writes my records into HBase.
>
> The data stream is attached to measurements from an internal testing
> instance of the website.
> As a consequence there are periods of really high load (someone is doing a
> load test) and really low load (only a hand full of people are testing
> stuff).
>
> I read the records from Kafka and I want to write the records into HBase.
> Because under high load it is more efficient to buffer the writes between
> the client and the server and as indicated by HBase I use a BufferedMutator.
>
> This BufferedMutator works with a 'fixed size' buffer and under high load
> setting it to a few MiB improves the performance writing to HBase greatly.
> However under low load you have to wait until the buffer is full and that
> can be a LONG time (hours) when the load is really low.
>
> I want to fire a periodic event into my sink to ensure I get a flush of
> the buffers atleast every few seconds.
>
> Simply implement a standard Java  TimerTask and fire that using a Timer?
> Or is there a better way of doing that in Flink?
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>