You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Vincent Marquez <vi...@gmail.com> on 2020/11/19 18:18:27 UTC

Proposal: Redis Stream Connector

Currently, Redis offers a streaming queue functionality similar to
Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.

I've written an UnboundedSource connector that makes use of Redis Streams
as a POC and it seems to work well.

If someone is willing to work with me, I could write up a JIRA and/or open
up a WIP pull request if there is interest in getting this as an official
connector.  I would mostly need guidance on naming/testing aspects.

https://redis.io/topics/streams-intro

*~Vincent*

Re: Proposal: Redis Stream Connector

Posted by Vincent Marquez <vi...@gmail.com>.
On Fri, Dec 4, 2020 at 12:28 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Vincent,
>
> 1. Just to be clear, for a streaming pipeline (say, on the dataflow
>> runner)it  will use the 'residual' result of the SplitRestriction
>> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
>> due to an error, then restarted with the same checkpoint it would resume
>> off from the last written Residual checkpoint position?
>
>
> Checkpoint is *only* persisted while the pipeline is running. So when the
> pipeline is stopped and restarted, there is no way for the pipeline to
> restart from the last checkpoint that is produced by the trySplit.
> Alternatively, you can use bundle finalization to commit what you have
> read, or create a CommitTransform. For example, in Kafka read, we create a
> commit transform[1] which commits the read offset every 5 mins. When the
> pipeline is stopped and restarted, the Kafka read will read from the last
> committed offset.
>

Ok, I'm definitely a bit confused by how this whole 'every five minutes'
thing works, I thought Luke C. was quite adamant in my correspondence that
there isn't a mechanism for backpressure in Dataflow, so there's no way to
'space out' windows on a timer.  From my understanding of this code, if
there is an entire queue being read from the beginning, all of the fixed
windows will trigger at the same time (or close to it), not actually every
five minutes.

This also seems like it could likely lead to data loss in the event of
upstream processing failing, as it is possible for the window to fire and
commit offsets before that work is completed further down the pipeline.
Hopefully i'm wrong and/or just confused, i'll move forward with trying to
make sure I can commit redis stream offsets from the bundle finalization.



>
>> 2. Another question I have for writing an unbounded Splittable DoFn is
>> how to write a test. With UnboundedSource, there was a way to wrap it to
>> make it bounded so it would stop after processing N number of elements.  I
>> would like to do the same, is there a uniform way of doing this currently?
>> If not I will just write a something custom for the loop in the
>> processElement to check if it should continue opposed to 'while (true)'.
>> Let me know if there is a uniform way, or you have a better idea on how to
>> write a test for my PTransform that uses a Splittable DoFn.
>>
>
>  What I do for Kafka test[2][3] is to start a streaming pipeline and wait
> for a certain time to cancel it.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java#L137
> [3]
> https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
>
> On Fri, Dec 4, 2020 at 11:57 AM Vincent Marquez <vi...@gmail.com>
> wrote:
>
>> Thank you for your help Boyuan.
>>
>> 1. Just to be clear, for a streaming pipeline (say, on the dataflow
>> runner)it  will use the 'residual' result of the SplitRestriction
>> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
>> due to an error, then restarted with the same checkpoint it would resume
>> off from the last written Residual checkpoint position?
>>
>> 2. Another question I have for writing an unbounded Splittable DoFn is
>> how to write a test. With UnboundedSource, there was a way to wrap it to
>> make it bounded so it would stop after processing N number of elements.  I
>> would like to do the same, is there a uniform way of doing this currently?
>> If not I will just write a something custom for the loop in the
>> processElement to check if it should continue opposed to 'while (true)'.
>> Let me know if there is a uniform way, or you have a better idea on how to
>> write a test for my PTransform that uses a Splittable DoFn.
>>
>> Thanks again.
>>
>>
>> *~Vincent*
>>
>>
>> On Mon, Nov 30, 2020 at 10:25 AM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
>>> performing checkpointing, keeping primary as current restriction and
>>> returning residuals. In the DoFn, you can do Splittable DoFn initiated
>>> checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
>>> guide[3] also talks about Splittable DoFn initiated checkpoint and runner
>>> initiated checkpoint.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
>>> [3]
>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>
>>> On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez <
>>> vincent.marquez@gmail.com> wrote:
>>>
>>>> Regarding checkpointing:
>>>>
>>>> I'm confused how the Splittable DoFn can make use of checkpoints to
>>>> resume and not have data loss.  Unlike the old API that had a very easy to
>>>> understand method called 'getCheckpointMark' that allows me to return the
>>>> completed work, I don't see where that is done with the current API.
>>>>
>>>> I tried looking at the OffsetRangeTracker and how it is used by Kafka
>>>> but I'm failing to understand it.  The process method takes the
>>>> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
>>>> to represent half completed work (in the event of an exception/crash during
>>>> a previous 'process' method run.   Is there some documentation that could
>>>> help me understand this part?  Thanks in advance.
>>>>
>>>> *~Vincent*
>>>>
>>>>
>>>> On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> Just want to mention that we have been working with Vincent in the
>>>>> ReadAll implementation for Cassandra based on normal DoFn, and we
>>>>> expect to get it merged for the next release of Beam. Vincent is
>>>>> familiarized now with DoFn based IO composition, a first step towards
>>>>> SDF understanding. Vincent you can think of our Cassandra RingRange as
>>>>> a Restriction in the context of SDF. Just for reference it would be
>>>>> good to read in advance these two:
>>>>>
>>>>> https://beam.apache.org/blog/splittable-do-fn/
>>>>> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>>>>>
>>>>> Thanks Boyuan for offering your help I think it is really needed
>>>>> considering that we don't have many Unbounded SDF connectors to use as
>>>>> reference.
>>>>>
>>>>> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <bo...@google.com>
>>>>> wrote:
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Hi Vincent,
>>>>> >>>
>>>>> >>> Thanks for your contribution! I'm happy to work with you on this
>>>>> when you contribute the code into Beam.
>>>>> >>
>>>>> >>
>>>>> >> Should I write up a JIRA to start?  I have access, I've already
>>>>> been in the process of contributing some big changes to the CassandraIO
>>>>> connector.
>>>>> >
>>>>> >
>>>>> > Yes, please create a JIRA and assign it to yourself.
>>>>> >
>>>>> >>
>>>>> >>
>>>>> >>>
>>>>> >>>
>>>>> >>> Another thing is that it would be preferable to use Splittable
>>>>> DoFn instead of using UnboundedSource to write a new IO.
>>>>> >>
>>>>> >>
>>>>> >> I would prefer to use the UnboundedSource connector, I've already
>>>>> written most of it, but also, I see some challenges using Splittable DoFn
>>>>> for Redis streams.
>>>>> >>
>>>>> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
>>>>> monotonically increasing counters, so there is not a way  to just claim a
>>>>> chunk of work and know that the chunk has any actual data in it.
>>>>> >>
>>>>> >> Since UnboundedSource is not yet deprecated, could I contribute
>>>>> that after finishing up some test aspects, and then perhaps we can
>>>>> implement a Splittable DoFn version?
>>>>> >
>>>>> >
>>>>> > It would be nice not to build new IOs on top of UnboundedSource.
>>>>> Currently we already have the wrapper class which translates the existing
>>>>> UnboundedSource into Unbounded Splittable DoFn and executes the
>>>>> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
>>>>> go through the UnboundedSource implementation together to figure out a
>>>>> design for using Splittable DoFn?
>>>>> >
>>>>> >
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>> >>>>
>>>>> >>>> Currently, Redis offers a streaming queue functionality similar
>>>>> to Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for
>>>>> it.
>>>>> >>>>
>>>>> >>>> I've written an UnboundedSource connector that makes use of Redis
>>>>> Streams as a POC and it seems to work well.
>>>>> >>>>
>>>>> >>>> If someone is willing to work with me, I could write up a JIRA
>>>>> and/or open up a WIP pull request if there is interest in getting this as
>>>>> an official connector.  I would mostly need guidance on naming/testing
>>>>> aspects.
>>>>> >>>>
>>>>> >>>> https://redis.io/topics/streams-intro
>>>>> >>>>
>>>>> >>>> ~Vincent
>>>>> >>
>>>>> >>
>>>>> >> ~Vincent
>>>>>
>>>>


*~Vincent*

Re: Proposal: Redis Stream Connector

Posted by Boyuan Zhang <bo...@google.com>.
Hi Vincent,

1. Just to be clear, for a streaming pipeline (say, on the dataflow
> runner)it  will use the 'residual' result of the SplitRestriction
> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
> due to an error, then restarted with the same checkpoint it would resume
> off from the last written Residual checkpoint position?


Checkpoint is *only* persisted while the pipeline is running. So when the
pipeline is stopped and restarted, there is no way for the pipeline to
restart from the last checkpoint that is produced by the trySplit.
Alternatively, you can use bundle finalization to commit what you have
read, or create a CommitTransform. For example, in Kafka read, we create a
commit transform[1] which commits the read offset every 5 mins. When the
pipeline is stopped and restarted, the Kafka read will read from the last
committed offset.


> 2. Another question I have for writing an unbounded Splittable DoFn is how
> to write a test. With UnboundedSource, there was a way to wrap it to make
> it bounded so it would stop after processing N number of elements.  I would
> like to do the same, is there a uniform way of doing this currently?  If
> not I will just write a something custom for the loop in the processElement
> to check if it should continue opposed to 'while (true)'. Let me know if
> there is a uniform way, or you have a better idea on how to write a test
> for my PTransform that uses a Splittable DoFn.
>

 What I do for Kafka test[2][3] is to start a streaming pipeline and wait
for a certain time to cancel it.

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java#L137
[3]
https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy

On Fri, Dec 4, 2020 at 11:57 AM Vincent Marquez <vi...@gmail.com>
wrote:

> Thank you for your help Boyuan.
>
> 1. Just to be clear, for a streaming pipeline (say, on the dataflow
> runner)it  will use the 'residual' result of the SplitRestriction
> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
> due to an error, then restarted with the same checkpoint it would resume
> off from the last written Residual checkpoint position?
>
> 2. Another question I have for writing an unbounded Splittable DoFn is how
> to write a test. With UnboundedSource, there was a way to wrap it to make
> it bounded so it would stop after processing N number of elements.  I would
> like to do the same, is there a uniform way of doing this currently?  If
> not I will just write a something custom for the loop in the processElement
> to check if it should continue opposed to 'while (true)'. Let me know if
> there is a uniform way, or you have a better idea on how to write a test
> for my PTransform that uses a Splittable DoFn.
>
> Thanks again.
>
>
> *~Vincent*
>
>
> On Mon, Nov 30, 2020 at 10:25 AM Boyuan Zhang <bo...@google.com> wrote:
>
>> In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
>> performing checkpointing, keeping primary as current restriction and
>> returning residuals. In the DoFn, you can do Splittable DoFn initiated
>> checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
>> guide[3] also talks about Splittable DoFn initiated checkpoint and runner
>> initiated checkpoint.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
>> [3]
>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>
>> On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>>
>>> Regarding checkpointing:
>>>
>>> I'm confused how the Splittable DoFn can make use of checkpoints to
>>> resume and not have data loss.  Unlike the old API that had a very easy to
>>> understand method called 'getCheckpointMark' that allows me to return the
>>> completed work, I don't see where that is done with the current API.
>>>
>>> I tried looking at the OffsetRangeTracker and how it is used by Kafka
>>> but I'm failing to understand it.  The process method takes the
>>> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
>>> to represent half completed work (in the event of an exception/crash during
>>> a previous 'process' method run.   Is there some documentation that could
>>> help me understand this part?  Thanks in advance.
>>>
>>> *~Vincent*
>>>
>>>
>>> On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> Just want to mention that we have been working with Vincent in the
>>>> ReadAll implementation for Cassandra based on normal DoFn, and we
>>>> expect to get it merged for the next release of Beam. Vincent is
>>>> familiarized now with DoFn based IO composition, a first step towards
>>>> SDF understanding. Vincent you can think of our Cassandra RingRange as
>>>> a Restriction in the context of SDF. Just for reference it would be
>>>> good to read in advance these two:
>>>>
>>>> https://beam.apache.org/blog/splittable-do-fn/
>>>> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>>>>
>>>> Thanks Boyuan for offering your help I think it is really needed
>>>> considering that we don't have many Unbounded SDF connectors to use as
>>>> reference.
>>>>
>>>> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <bo...@google.com>
>>>> wrote:
>>>> >
>>>> >
>>>> >
>>>> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com>
>>>> wrote:
>>>> >>>
>>>> >>> Hi Vincent,
>>>> >>>
>>>> >>> Thanks for your contribution! I'm happy to work with you on this
>>>> when you contribute the code into Beam.
>>>> >>
>>>> >>
>>>> >> Should I write up a JIRA to start?  I have access, I've already been
>>>> in the process of contributing some big changes to the CassandraIO
>>>> connector.
>>>> >
>>>> >
>>>> > Yes, please create a JIRA and assign it to yourself.
>>>> >
>>>> >>
>>>> >>
>>>> >>>
>>>> >>>
>>>> >>> Another thing is that it would be preferable to use Splittable DoFn
>>>> instead of using UnboundedSource to write a new IO.
>>>> >>
>>>> >>
>>>> >> I would prefer to use the UnboundedSource connector, I've already
>>>> written most of it, but also, I see some challenges using Splittable DoFn
>>>> for Redis streams.
>>>> >>
>>>> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
>>>> monotonically increasing counters, so there is not a way  to just claim a
>>>> chunk of work and know that the chunk has any actual data in it.
>>>> >>
>>>> >> Since UnboundedSource is not yet deprecated, could I contribute that
>>>> after finishing up some test aspects, and then perhaps we can implement a
>>>> Splittable DoFn version?
>>>> >
>>>> >
>>>> > It would be nice not to build new IOs on top of UnboundedSource.
>>>> Currently we already have the wrapper class which translates the existing
>>>> UnboundedSource into Unbounded Splittable DoFn and executes the
>>>> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
>>>> go through the UnboundedSource implementation together to figure out a
>>>> design for using Splittable DoFn?
>>>> >
>>>> >
>>>> >>
>>>> >>
>>>> >>
>>>> >>>
>>>> >>>
>>>> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>> >>>>
>>>> >>>> Currently, Redis offers a streaming queue functionality similar to
>>>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>>> >>>>
>>>> >>>> I've written an UnboundedSource connector that makes use of Redis
>>>> Streams as a POC and it seems to work well.
>>>> >>>>
>>>> >>>> If someone is willing to work with me, I could write up a JIRA
>>>> and/or open up a WIP pull request if there is interest in getting this as
>>>> an official connector.  I would mostly need guidance on naming/testing
>>>> aspects.
>>>> >>>>
>>>> >>>> https://redis.io/topics/streams-intro
>>>> >>>>
>>>> >>>> ~Vincent
>>>> >>
>>>> >>
>>>> >> ~Vincent
>>>>
>>>

Re: Proposal: Redis Stream Connector

Posted by Vincent Marquez <vi...@gmail.com>.
Thank you for your help Boyuan.

1. Just to be clear, for a streaming pipeline (say, on the dataflow
runner)it  will use the 'residual' result of the SplitRestriction
(retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
due to an error, then restarted with the same checkpoint it would resume
off from the last written Residual checkpoint position?

2. Another question I have for writing an unbounded Splittable DoFn is how
to write a test. With UnboundedSource, there was a way to wrap it to make
it bounded so it would stop after processing N number of elements.  I would
like to do the same, is there a uniform way of doing this currently?  If
not I will just write a something custom for the loop in the processElement
to check if it should continue opposed to 'while (true)'. Let me know if
there is a uniform way, or you have a better idea on how to write a test
for my PTransform that uses a Splittable DoFn.

Thanks again.


*~Vincent*


On Mon, Nov 30, 2020 at 10:25 AM Boyuan Zhang <bo...@google.com> wrote:

> In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
> performing checkpointing, keeping primary as current restriction and
> returning residuals. In the DoFn, you can do Splittable DoFn initiated
> checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
> guide[3] also talks about Splittable DoFn initiated checkpoint and runner
> initiated checkpoint.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
> [3]
> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>
> On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez <
> vincent.marquez@gmail.com> wrote:
>
>> Regarding checkpointing:
>>
>> I'm confused how the Splittable DoFn can make use of checkpoints to
>> resume and not have data loss.  Unlike the old API that had a very easy to
>> understand method called 'getCheckpointMark' that allows me to return the
>> completed work, I don't see where that is done with the current API.
>>
>> I tried looking at the OffsetRangeTracker and how it is used by Kafka but
>> I'm failing to understand it.  The process method takes the
>> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
>> to represent half completed work (in the event of an exception/crash during
>> a previous 'process' method run.   Is there some documentation that could
>> help me understand this part?  Thanks in advance.
>>
>> *~Vincent*
>>
>>
>> On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> Just want to mention that we have been working with Vincent in the
>>> ReadAll implementation for Cassandra based on normal DoFn, and we
>>> expect to get it merged for the next release of Beam. Vincent is
>>> familiarized now with DoFn based IO composition, a first step towards
>>> SDF understanding. Vincent you can think of our Cassandra RingRange as
>>> a Restriction in the context of SDF. Just for reference it would be
>>> good to read in advance these two:
>>>
>>> https://beam.apache.org/blog/splittable-do-fn/
>>> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>>>
>>> Thanks Boyuan for offering your help I think it is really needed
>>> considering that we don't have many Unbounded SDF connectors to use as
>>> reference.
>>>
>>> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <bo...@google.com>
>>> wrote:
>>> >
>>> >
>>> >
>>> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
>>> vincent.marquez@gmail.com> wrote:
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com>
>>> wrote:
>>> >>>
>>> >>> Hi Vincent,
>>> >>>
>>> >>> Thanks for your contribution! I'm happy to work with you on this
>>> when you contribute the code into Beam.
>>> >>
>>> >>
>>> >> Should I write up a JIRA to start?  I have access, I've already been
>>> in the process of contributing some big changes to the CassandraIO
>>> connector.
>>> >
>>> >
>>> > Yes, please create a JIRA and assign it to yourself.
>>> >
>>> >>
>>> >>
>>> >>>
>>> >>>
>>> >>> Another thing is that it would be preferable to use Splittable DoFn
>>> instead of using UnboundedSource to write a new IO.
>>> >>
>>> >>
>>> >> I would prefer to use the UnboundedSource connector, I've already
>>> written most of it, but also, I see some challenges using Splittable DoFn
>>> for Redis streams.
>>> >>
>>> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
>>> monotonically increasing counters, so there is not a way  to just claim a
>>> chunk of work and know that the chunk has any actual data in it.
>>> >>
>>> >> Since UnboundedSource is not yet deprecated, could I contribute that
>>> after finishing up some test aspects, and then perhaps we can implement a
>>> Splittable DoFn version?
>>> >
>>> >
>>> > It would be nice not to build new IOs on top of UnboundedSource.
>>> Currently we already have the wrapper class which translates the existing
>>> UnboundedSource into Unbounded Splittable DoFn and executes the
>>> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
>>> go through the UnboundedSource implementation together to figure out a
>>> design for using Splittable DoFn?
>>> >
>>> >
>>> >>
>>> >>
>>> >>
>>> >>>
>>> >>>
>>> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>>> vincent.marquez@gmail.com> wrote:
>>> >>>>
>>> >>>> Currently, Redis offers a streaming queue functionality similar to
>>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>> >>>>
>>> >>>> I've written an UnboundedSource connector that makes use of Redis
>>> Streams as a POC and it seems to work well.
>>> >>>>
>>> >>>> If someone is willing to work with me, I could write up a JIRA
>>> and/or open up a WIP pull request if there is interest in getting this as
>>> an official connector.  I would mostly need guidance on naming/testing
>>> aspects.
>>> >>>>
>>> >>>> https://redis.io/topics/streams-intro
>>> >>>>
>>> >>>> ~Vincent
>>> >>
>>> >>
>>> >> ~Vincent
>>>
>>

Re: Proposal: Redis Stream Connector

Posted by Boyuan Zhang <bo...@google.com>.
In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
performing checkpointing, keeping primary as current restriction and
returning residuals. In the DoFn, you can do Splittable DoFn initiated
checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
guide[3] also talks about Splittable DoFn initiated checkpoint and runner
initiated checkpoint.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
[3]
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez <vi...@gmail.com>
wrote:

> Regarding checkpointing:
>
> I'm confused how the Splittable DoFn can make use of checkpoints to resume
> and not have data loss.  Unlike the old API that had a very easy to
> understand method called 'getCheckpointMark' that allows me to return the
> completed work, I don't see where that is done with the current API.
>
> I tried looking at the OffsetRangeTracker and how it is used by Kafka but
> I'm failing to understand it.  The process method takes the
> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
> to represent half completed work (in the event of an exception/crash during
> a previous 'process' method run.   Is there some documentation that could
> help me understand this part?  Thanks in advance.
>
> *~Vincent*
>
>
> On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Just want to mention that we have been working with Vincent in the
>> ReadAll implementation for Cassandra based on normal DoFn, and we
>> expect to get it merged for the next release of Beam. Vincent is
>> familiarized now with DoFn based IO composition, a first step towards
>> SDF understanding. Vincent you can think of our Cassandra RingRange as
>> a Restriction in the context of SDF. Just for reference it would be
>> good to read in advance these two:
>>
>> https://beam.apache.org/blog/splittable-do-fn/
>> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>>
>> Thanks Boyuan for offering your help I think it is really needed
>> considering that we don't have many Unbounded SDF connectors to use as
>> reference.
>>
>> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <bo...@google.com> wrote:
>> >
>> >
>> >
>> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>> >>
>> >>
>> >>
>> >>
>> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com>
>> wrote:
>> >>>
>> >>> Hi Vincent,
>> >>>
>> >>> Thanks for your contribution! I'm happy to work with you on this when
>> you contribute the code into Beam.
>> >>
>> >>
>> >> Should I write up a JIRA to start?  I have access, I've already been
>> in the process of contributing some big changes to the CassandraIO
>> connector.
>> >
>> >
>> > Yes, please create a JIRA and assign it to yourself.
>> >
>> >>
>> >>
>> >>>
>> >>>
>> >>> Another thing is that it would be preferable to use Splittable DoFn
>> instead of using UnboundedSource to write a new IO.
>> >>
>> >>
>> >> I would prefer to use the UnboundedSource connector, I've already
>> written most of it, but also, I see some challenges using Splittable DoFn
>> for Redis streams.
>> >>
>> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
>> monotonically increasing counters, so there is not a way  to just claim a
>> chunk of work and know that the chunk has any actual data in it.
>> >>
>> >> Since UnboundedSource is not yet deprecated, could I contribute that
>> after finishing up some test aspects, and then perhaps we can implement a
>> Splittable DoFn version?
>> >
>> >
>> > It would be nice not to build new IOs on top of UnboundedSource.
>> Currently we already have the wrapper class which translates the existing
>> UnboundedSource into Unbounded Splittable DoFn and executes the
>> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
>> go through the UnboundedSource implementation together to figure out a
>> design for using Splittable DoFn?
>> >
>> >
>> >>
>> >>
>> >>
>> >>>
>> >>>
>> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>> >>>>
>> >>>> Currently, Redis offers a streaming queue functionality similar to
>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>> >>>>
>> >>>> I've written an UnboundedSource connector that makes use of Redis
>> Streams as a POC and it seems to work well.
>> >>>>
>> >>>> If someone is willing to work with me, I could write up a JIRA
>> and/or open up a WIP pull request if there is interest in getting this as
>> an official connector.  I would mostly need guidance on naming/testing
>> aspects.
>> >>>>
>> >>>> https://redis.io/topics/streams-intro
>> >>>>
>> >>>> ~Vincent
>> >>
>> >>
>> >> ~Vincent
>>
>

Re: Proposal: Redis Stream Connector

Posted by Vincent Marquez <vi...@gmail.com>.
Regarding checkpointing:

I'm confused how the Splittable DoFn can make use of checkpoints to resume
and not have data loss.  Unlike the old API that had a very easy to
understand method called 'getCheckpointMark' that allows me to return the
completed work, I don't see where that is done with the current API.

I tried looking at the OffsetRangeTracker and how it is used by Kafka but
I'm failing to understand it.  The process method takes the
RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
to represent half completed work (in the event of an exception/crash during
a previous 'process' method run.   Is there some documentation that could
help me understand this part?  Thanks in advance.

*~Vincent*


On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía <ie...@gmail.com> wrote:

> Just want to mention that we have been working with Vincent in the
> ReadAll implementation for Cassandra based on normal DoFn, and we
> expect to get it merged for the next release of Beam. Vincent is
> familiarized now with DoFn based IO composition, a first step towards
> SDF understanding. Vincent you can think of our Cassandra RingRange as
> a Restriction in the context of SDF. Just for reference it would be
> good to read in advance these two:
>
> https://beam.apache.org/blog/splittable-do-fn/
> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>
> Thanks Boyuan for offering your help I think it is really needed
> considering that we don't have many Unbounded SDF connectors to use as
> reference.
>
> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <bo...@google.com> wrote:
> >
> >
> >
> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
> vincent.marquez@gmail.com> wrote:
> >>
> >>
> >>
> >>
> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com>
> wrote:
> >>>
> >>> Hi Vincent,
> >>>
> >>> Thanks for your contribution! I'm happy to work with you on this when
> you contribute the code into Beam.
> >>
> >>
> >> Should I write up a JIRA to start?  I have access, I've already been in
> the process of contributing some big changes to the CassandraIO connector.
> >
> >
> > Yes, please create a JIRA and assign it to yourself.
> >
> >>
> >>
> >>>
> >>>
> >>> Another thing is that it would be preferable to use Splittable DoFn
> instead of using UnboundedSource to write a new IO.
> >>
> >>
> >> I would prefer to use the UnboundedSource connector, I've already
> written most of it, but also, I see some challenges using Splittable DoFn
> for Redis streams.
> >>
> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
> monotonically increasing counters, so there is not a way  to just claim a
> chunk of work and know that the chunk has any actual data in it.
> >>
> >> Since UnboundedSource is not yet deprecated, could I contribute that
> after finishing up some test aspects, and then perhaps we can implement a
> Splittable DoFn version?
> >
> >
> > It would be nice not to build new IOs on top of UnboundedSource.
> Currently we already have the wrapper class which translates the existing
> UnboundedSource into Unbounded Splittable DoFn and executes the
> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
> go through the UnboundedSource implementation together to figure out a
> design for using Splittable DoFn?
> >
> >
> >>
> >>
> >>
> >>>
> >>>
> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
> vincent.marquez@gmail.com> wrote:
> >>>>
> >>>> Currently, Redis offers a streaming queue functionality similar to
> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
> >>>>
> >>>> I've written an UnboundedSource connector that makes use of Redis
> Streams as a POC and it seems to work well.
> >>>>
> >>>> If someone is willing to work with me, I could write up a JIRA and/or
> open up a WIP pull request if there is interest in getting this as an
> official connector.  I would mostly need guidance on naming/testing aspects.
> >>>>
> >>>> https://redis.io/topics/streams-intro
> >>>>
> >>>> ~Vincent
> >>
> >>
> >> ~Vincent
>

Re: Proposal: Redis Stream Connector

Posted by Ismaël Mejía <ie...@gmail.com>.
Just want to mention that we have been working with Vincent in the
ReadAll implementation for Cassandra based on normal DoFn, and we
expect to get it merged for the next release of Beam. Vincent is
familiarized now with DoFn based IO composition, a first step towards
SDF understanding. Vincent you can think of our Cassandra RingRange as
a Restriction in the context of SDF. Just for reference it would be
good to read in advance these two:

https://beam.apache.org/blog/splittable-do-fn/
https://beam.apache.org/documentation/programming-guide/#sdf-basics

Thanks Boyuan for offering your help I think it is really needed
considering that we don't have many Unbounded SDF connectors to use as
reference.

On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <bo...@google.com> wrote:
>
>
>
> On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <vi...@gmail.com> wrote:
>>
>>
>>
>>
>> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>> Hi Vincent,
>>>
>>> Thanks for your contribution! I'm happy to work with you on this when you contribute the code into Beam.
>>
>>
>> Should I write up a JIRA to start?  I have access, I've already been in the process of contributing some big changes to the CassandraIO connector.
>
>
> Yes, please create a JIRA and assign it to yourself.
>
>>
>>
>>>
>>>
>>> Another thing is that it would be preferable to use Splittable DoFn instead of using UnboundedSource to write a new IO.
>>
>>
>> I would prefer to use the UnboundedSource connector, I've already written most of it, but also, I see some challenges using Splittable DoFn for Redis streams.
>>
>> Unlike Kafka and Kinesis, Redis Streams offsets are not simply monotonically increasing counters, so there is not a way  to just claim a chunk of work and know that the chunk has any actual data in it.
>>
>> Since UnboundedSource is not yet deprecated, could I contribute that after finishing up some test aspects, and then perhaps we can implement a Splittable DoFn version?
>
>
> It would be nice not to build new IOs on top of UnboundedSource. Currently we already have the wrapper class which translates the existing UnboundedSource into Unbounded Splittable DoFn and executes the UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we go through the UnboundedSource implementation together to figure out a design for using Splittable DoFn?
>
>
>>
>>
>>
>>>
>>>
>>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <vi...@gmail.com> wrote:
>>>>
>>>> Currently, Redis offers a streaming queue functionality similar to Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>>>
>>>> I've written an UnboundedSource connector that makes use of Redis Streams as a POC and it seems to work well.
>>>>
>>>> If someone is willing to work with me, I could write up a JIRA and/or open up a WIP pull request if there is interest in getting this as an official connector.  I would mostly need guidance on naming/testing aspects.
>>>>
>>>> https://redis.io/topics/streams-intro
>>>>
>>>> ~Vincent
>>
>>
>> ~Vincent

Re: Proposal: Redis Stream Connector

Posted by Boyuan Zhang <bo...@google.com>.
On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <vi...@gmail.com>
wrote:

>
>
>
> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi Vincent,
>>
>> Thanks for your contribution! I'm happy to work with you on this when you
>> contribute the code into Beam.
>>
>
> Should I write up a JIRA to start?  I have access, I've already been in
> the process of contributing some big changes to the CassandraIO connector.
>

Yes, please create a JIRA and assign it to yourself.


>
>
>>
>> Another thing is that it would be preferable to use Splittable DoFn
>> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> instead
>> of using UnboundedSource to write a new IO.
>>
>
> I would prefer to use the UnboundedSource connector, I've already written
> most of it, but also, I see some challenges using Splittable DoFn for Redis
> streams.
>
> Unlike Kafka and Kinesis, Redis Streams offsets are *not* simply
> monotonically increasing counters, so there is not a way  to just claim a
> chunk of work and know that the chunk has any actual data in it.
>
> Since UnboundedSource is not yet deprecated, could I contribute that after
> finishing up some test aspects, and then perhaps we can implement a
> Splittable DoFn version?
>

It would be nice *not* to build new IOs on top of UnboundedSource.
Currently we already have the wrapper class which translates the existing
UnboundedSource into Unbounded Splittable DoFn and executes the
UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
go through the UnboundedSource implementation together to figure out a
design for using Splittable DoFn?



>
>
>
>>
>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>>
>>> Currently, Redis offers a streaming queue functionality similar to
>>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>>
>>> I've written an UnboundedSource connector that makes use of Redis
>>> Streams as a POC and it seems to work well.
>>>
>>> If someone is willing to work with me, I could write up a JIRA and/or
>>> open up a WIP pull request if there is interest in getting this as an
>>> official connector.  I would mostly need guidance on naming/testing aspects.
>>>
>>> https://redis.io/topics/streams-intro
>>>
>>> *~Vincent*
>>>
>>
> ~Vincent
>

Re: Proposal: Redis Stream Connector

Posted by Vincent Marquez <vi...@gmail.com>.
On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <bo...@google.com> wrote:

> Hi Vincent,
>
> Thanks for your contribution! I'm happy to work with you on this when you
> contribute the code into Beam.
>

Should I write up a JIRA to start?  I have access, I've already been in the
process of contributing some big changes to the CassandraIO connector.


>
> Another thing is that it would be preferable to use Splittable DoFn
> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> instead
> of using UnboundedSource to write a new IO.
>

I would prefer to use the UnboundedSource connector, I've already written
most of it, but also, I see some challenges using Splittable DoFn for Redis
streams.

Unlike Kafka and Kinesis, Redis Streams offsets are *not* simply
monotonically increasing counters, so there is not a way  to just claim a
chunk of work and know that the chunk has any actual data in it.

Since UnboundedSource is not yet deprecated, could I contribute that after
finishing up some test aspects, and then perhaps we can implement a
Splittable DoFn version?



>
> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
> vincent.marquez@gmail.com> wrote:
>
>> Currently, Redis offers a streaming queue functionality similar to
>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>
>> I've written an UnboundedSource connector that makes use of Redis Streams
>> as a POC and it seems to work well.
>>
>> If someone is willing to work with me, I could write up a JIRA and/or
>> open up a WIP pull request if there is interest in getting this as an
>> official connector.  I would mostly need guidance on naming/testing aspects.
>>
>> https://redis.io/topics/streams-intro
>>
>> *~Vincent*
>>
>
~Vincent

Re: Proposal: Redis Stream Connector

Posted by Boyuan Zhang <bo...@google.com>.
Hi Vincent,

Thanks for your contribution! I'm happy to work with you on this when you
contribute the code into Beam.

Another thing is that it would be preferable to use Splittable DoFn
<https://beam.apache.org/documentation/programming-guide/#splittable-dofns>
instead
of using UnboundedSource to write a new IO.

On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <vi...@gmail.com>
wrote:

> Currently, Redis offers a streaming queue functionality similar to
> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>
> I've written an UnboundedSource connector that makes use of Redis Streams
> as a POC and it seems to work well.
>
> If someone is willing to work with me, I could write up a JIRA and/or open
> up a WIP pull request if there is interest in getting this as an official
> connector.  I would mostly need guidance on naming/testing aspects.
>
> https://redis.io/topics/streams-intro
>
> *~Vincent*
>