You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sachin Mittal <sj...@gmail.com> on 2023/05/10 09:50:31 UTC

Can we write to and read from and then write to same kinesis stream using KinesisIO

Hi,
I am using aws beam sdk1 to read from and write to a kinesis stream.
*org.apache.beam.sdk.io.kinesis.KinesisIO*


My pipeline is something like this: (*note the kinesis stream used to write
to and then again read from is empty before starting the app*)
---------------------------------------------------------------------------------------------------------------------------------------
Pipeline pipeline = Pipeline.create(options);

PCollection<> input = pipeline.apply(/* read from some source */);

// populate an empty kinesis stream
input
.apply(
KinesisIO.write()
.withStreamName(streamName)
// other IO configs ....
);

// within same application start another pipeline
// to read from some kinesis stream from start
PCollection<> output = pipeline
.apply(
KinesisIO.read()
.withStreamName(streamName)
.withMaxReadTime(duration) // wait for some duration before deciding to
close the pipeline
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // from
start
// other IO configs
)
.apply(/* apply other transformations */);


// write transformed output to same kinesis stream
output
.apply(
KinesisIO.write()
.withStreamName(streamName)
// other IO configs
);

// also write transformed output to some other kinesis stream
output
.apply(
KinesisIO.write()
.withStreamName(otherStreamName) // a different kinesis stream
// other IO configs
);


pipeline.run().waitUntilFinish();

---------------------------------------------------------------------------------------------------------------------------------------

Will something like this work in a single beam application ?
Is there a better way of designing this ?

I am right now trying to run this using a direct runner but I am facing
some issues in reading from the same kinesis stream again.
It is actually able to read the records but somehow read records are not
pushed downstream for further processing.

Before debugging it further and looking into any logic issues or bugs in my
code, I wanted to be sure if something like this is possible under beam
constructs.

Please let me know your thoughts.

Thanks
Sachin

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

Posted by Sachin Mittal <sj...@gmail.com>.
Only direct runner.

I have right now disabled aggregation on kpl and it looks like to be
working.

On Sat, 13 May 2023 at 3:35 AM, Pavel Solomin <p....@gmail.com> wrote:

> > 100,000's of data records are accumulated and they are tried to be
> pushed to Kinesis all at once
>
> Does that happen only in direct runner? Or Flink runner behaves similarly?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 12 May 2023 at 16:43, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Hi,
>> So I have prepared the write pipeline something like this:
>>
>>
>> --------------------------------------------------------------------------------------------------------------
>> writePipeline
>> .apply(GenerateSequence.from(0).to(100))
>> .apply(ParDo.of(new DoFn<Long, byte[]>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> long i = c.element();
>> // Fetching data for step=i
>> List<> data = fetchForInputStep(i);
>> // output all the data one by one
>> for (Data d : data) {
>> out.output(d.asBytes());
>> }
>> }
>> }))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>> writePipeline.run().waitUntilFinish()
>>
>> What I observe is that pipeline part to push data to kinesis is only
>> happening after the entire data is loaded by a second apply function.
>> So what happens is that 100,000's of data records are accumulated and
>> they are tried to be pushed to Kinesis all at once and we get following
>> error:
>> *KPL Expiration reached while waiting in limiter*
>>
>> The logs are generated like this:
>>
>> --------------------------------------------------------------------------------------------------------------
>> Extracting binaries to
>> /var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries
>> .........
>> [main.cc:384] Starting up main producer
>> .........
>> [main.cc:395] Entering join
>> .........
>> Fetching data for step=1
>> .........
>> Fetching data for step=100
>> .........
>> [kinesis_producer.cc:200] Created pipeline for stream "xxxxxx"
>> [shard_map.cc:87] Updating shard map for stream "xxxxxx"
>> [shard_map.cc:148] Successfully updated shard map for stream "xxxxxx"
>> found 1 shards
>> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
>> 'xxxxxx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
>> UserRecords: 742018, KinesisRecords: 4698 }
>>
>>
>> I had assumed that as soon as step 1 data was fetched it would pass the
>> data downstream and
>> the kinesis pipeline would have been created much before and would have
>> started writing to Kinesis much earlier, but this is happening only after
>> all the data is collected.
>>
>> Is there a way to fix this ?
>>
>> Thanks
>> Sachin
>>
>>
>>
>> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p....@gmail.com>
>> wrote:
>>
>>> > two pipeline objects in my application
>>>
>>> I think this should work. I meant to have 2 separate artifacts and
>>> deploy them separately, but if your app runs batch processing with 2
>>> sequential steps, 2 pipelines should work too:
>>>
>>> - writePipeline.run().waitUntilFinish()
>>> - readAndWritePipeline.run().waitUntilFinish()
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sj...@gmail.com> wrote:
>>>
>>>> Use case is something like this:
>>>> A source writes source data to kinesis and same is used to compute
>>>> derived data which is again written back to same stream so next level of
>>>> derived data can be computed from previous derived data and so on.
>>>>
>>>> Would there be any issues from beam side to do the same within a single
>>>> pipeline?
>>>>
>>>> When you say I have to split my app into two do you mean that I have to
>>>> create two pipeline objects in my application?
>>>>
>>>> If so then how will application end?
>>>>
>>>> Note that source is of finite size which gets written into kinesis.
>>>>
>>>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>>>> limitations in achieving what we want then please let me know.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I've never seen use-cases where it would be necessary. What are you
>>>>> trying to achieve? Some context would be helpful.
>>>>> Your example looks like you can split your app into two - one writes
>>>>> into streamName and the others read from streamName.
>>>>>
>>>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>>>> is not maintained anymore. Better to use this instead:
>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>>>
>>>>> Best Regards,
>>>>> Pavel Solomin
>>>>>
>>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>>>
>>>>>>
>>>>>> My pipeline is something like this: (*note the kinesis stream used
>>>>>> to write to and then again read from is empty before starting the app*
>>>>>> )
>>>>>>
>>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>
>>>>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>>>>
>>>>>> // populate an empty kinesis stream
>>>>>> input
>>>>>> .apply(
>>>>>> KinesisIO.write()
>>>>>> .withStreamName(streamName)
>>>>>> // other IO configs ....
>>>>>> );
>>>>>>
>>>>>> // within same application start another pipeline
>>>>>> // to read from some kinesis stream from start
>>>>>> PCollection<> output = pipeline
>>>>>> .apply(
>>>>>> KinesisIO.read()
>>>>>> .withStreamName(streamName)
>>>>>> .withMaxReadTime(duration) // wait for some duration before deciding
>>>>>> to close the pipeline
>>>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>>>>> from start
>>>>>> // other IO configs
>>>>>> )
>>>>>> .apply(/* apply other transformations */);
>>>>>>
>>>>>>
>>>>>> // write transformed output to same kinesis stream
>>>>>> output
>>>>>> .apply(
>>>>>> KinesisIO.write()
>>>>>> .withStreamName(streamName)
>>>>>> // other IO configs
>>>>>> );
>>>>>>
>>>>>> // also write transformed output to some other kinesis stream
>>>>>> output
>>>>>> .apply(
>>>>>> KinesisIO.write()
>>>>>> .withStreamName(otherStreamName) // a different kinesis stream
>>>>>> // other IO configs
>>>>>> );
>>>>>>
>>>>>>
>>>>>> pipeline.run().waitUntilFinish();
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> Will something like this work in a single beam application ?
>>>>>> Is there a better way of designing this ?
>>>>>>
>>>>>> I am right now trying to run this using a direct runner but I am
>>>>>> facing some issues in reading from the same kinesis stream again.
>>>>>> It is actually able to read the records but somehow read records are
>>>>>> not pushed downstream for further processing.
>>>>>>
>>>>>> Before debugging it further and looking into any logic issues or bugs
>>>>>> in my code, I wanted to be sure if something like this is possible under
>>>>>> beam constructs.
>>>>>>
>>>>>> Please let me know your thoughts.
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

Posted by Pavel Solomin <p....@gmail.com>.
> 100,000's of data records are accumulated and they are tried to be pushed
to Kinesis all at once

Does that happen only in direct runner? Or Flink runner behaves similarly?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 12 May 2023 at 16:43, Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> So I have prepared the write pipeline something like this:
>
>
> --------------------------------------------------------------------------------------------------------------
> writePipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(ParDo.of(new DoFn<Long, byte[]>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> long i = c.element();
> // Fetching data for step=i
> List<> data = fetchForInputStep(i);
> // output all the data one by one
> for (Data d : data) {
> out.output(d.asBytes());
> }
> }
> }))
> .apply(KinesisIO.write()
> .withStreamName(streamName)
> // other configs
> );
>
> writePipeline.run().waitUntilFinish()
>
> What I observe is that pipeline part to push data to kinesis is only
> happening after the entire data is loaded by a second apply function.
> So what happens is that 100,000's of data records are accumulated and they
> are tried to be pushed to Kinesis all at once and we get following error:
> *KPL Expiration reached while waiting in limiter*
>
> The logs are generated like this:
>
> --------------------------------------------------------------------------------------------------------------
> Extracting binaries to
> /var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries
> .........
> [main.cc:384] Starting up main producer
> .........
> [main.cc:395] Entering join
> .........
> Fetching data for step=1
> .........
> Fetching data for step=100
> .........
> [kinesis_producer.cc:200] Created pipeline for stream "xxxxxx"
> [shard_map.cc:87] Updating shard map for stream "xxxxxx"
> [shard_map.cc:148] Successfully updated shard map for stream "xxxxxx"
> found 1 shards
> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
> 'xxxxxx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
> UserRecords: 742018, KinesisRecords: 4698 }
>
>
> I had assumed that as soon as step 1 data was fetched it would pass the
> data downstream and
> the kinesis pipeline would have been created much before and would have
> started writing to Kinesis much earlier, but this is happening only after
> all the data is collected.
>
> Is there a way to fix this ?
>
> Thanks
> Sachin
>
>
>
> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p....@gmail.com>
> wrote:
>
>> > two pipeline objects in my application
>>
>> I think this should work. I meant to have 2 separate artifacts and deploy
>> them separately, but if your app runs batch processing with 2 sequential
>> steps, 2 pipelines should work too:
>>
>> - writePipeline.run().waitUntilFinish()
>> - readAndWritePipeline.run().waitUntilFinish()
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> Use case is something like this:
>>> A source writes source data to kinesis and same is used to compute
>>> derived data which is again written back to same stream so next level of
>>> derived data can be computed from previous derived data and so on.
>>>
>>> Would there be any issues from beam side to do the same within a single
>>> pipeline?
>>>
>>> When you say I have to split my app into two do you mean that I have to
>>> create two pipeline objects in my application?
>>>
>>> If so then how will application end?
>>>
>>> Note that source is of finite size which gets written into kinesis.
>>>
>>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>>> limitations in achieving what we want then please let me know.
>>>
>>> Thanks
>>>
>>>
>>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p....@gmail.com>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I've never seen use-cases where it would be necessary. What are you
>>>> trying to achieve? Some context would be helpful.
>>>> Your example looks like you can split your app into two - one writes
>>>> into streamName and the others read from streamName.
>>>>
>>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>>> is not maintained anymore. Better to use this instead:
>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sj...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>>
>>>>>
>>>>> My pipeline is something like this: (*note the kinesis stream used to
>>>>> write to and then again read from is empty before starting the app*)
>>>>>
>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>
>>>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>>>
>>>>> // populate an empty kinesis stream
>>>>> input
>>>>> .apply(
>>>>> KinesisIO.write()
>>>>> .withStreamName(streamName)
>>>>> // other IO configs ....
>>>>> );
>>>>>
>>>>> // within same application start another pipeline
>>>>> // to read from some kinesis stream from start
>>>>> PCollection<> output = pipeline
>>>>> .apply(
>>>>> KinesisIO.read()
>>>>> .withStreamName(streamName)
>>>>> .withMaxReadTime(duration) // wait for some duration before deciding
>>>>> to close the pipeline
>>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>>>> from start
>>>>> // other IO configs
>>>>> )
>>>>> .apply(/* apply other transformations */);
>>>>>
>>>>>
>>>>> // write transformed output to same kinesis stream
>>>>> output
>>>>> .apply(
>>>>> KinesisIO.write()
>>>>> .withStreamName(streamName)
>>>>> // other IO configs
>>>>> );
>>>>>
>>>>> // also write transformed output to some other kinesis stream
>>>>> output
>>>>> .apply(
>>>>> KinesisIO.write()
>>>>> .withStreamName(otherStreamName) // a different kinesis stream
>>>>> // other IO configs
>>>>> );
>>>>>
>>>>>
>>>>> pipeline.run().waitUntilFinish();
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>> Will something like this work in a single beam application ?
>>>>> Is there a better way of designing this ?
>>>>>
>>>>> I am right now trying to run this using a direct runner but I am
>>>>> facing some issues in reading from the same kinesis stream again.
>>>>> It is actually able to read the records but somehow read records are
>>>>> not pushed downstream for further processing.
>>>>>
>>>>> Before debugging it further and looking into any logic issues or bugs
>>>>> in my code, I wanted to be sure if something like this is possible under
>>>>> beam constructs.
>>>>>
>>>>> Please let me know your thoughts.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
So I have prepared the write pipeline something like this:

--------------------------------------------------------------------------------------------------------------
writePipeline
.apply(GenerateSequence.from(0).to(100))
.apply(ParDo.of(new DoFn<Long, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
long i = c.element();
// Fetching data for step=i
List<> data = fetchForInputStep(i);
// output all the data one by one
for (Data d : data) {
out.output(d.asBytes());
}
}
}))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);

writePipeline.run().waitUntilFinish()

What I observe is that pipeline part to push data to kinesis is only
happening after the entire data is loaded by a second apply function.
So what happens is that 100,000's of data records are accumulated and they
are tried to be pushed to Kinesis all at once and we get following error:
*KPL Expiration reached while waiting in limiter*

The logs are generated like this:
--------------------------------------------------------------------------------------------------------------
Extracting binaries to
/var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries
.........
[main.cc:384] Starting up main producer
.........
[main.cc:395] Entering join
.........
Fetching data for step=1
.........
Fetching data for step=100
.........
[kinesis_producer.cc:200] Created pipeline for stream "xxxxxx"
[shard_map.cc:87] Updating shard map for stream "xxxxxx"
[shard_map.cc:148] Successfully updated shard map for stream "xxxxxx" found
1 shards
[processing_statistics_logger.cc:111] Stage 1 Triggers: { stream: 'xxxxxx',
manual: 10, count: 0, size: 4688, matches: 0, timed: 0, UserRecords:
742018, KinesisRecords: 4698 }


I had assumed that as soon as step 1 data was fetched it would pass the
data downstream and
the kinesis pipeline would have been created much before and would have
started writing to Kinesis much earlier, but this is happening only after
all the data is collected.

Is there a way to fix this ?

Thanks
Sachin



On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p....@gmail.com> wrote:

> > two pipeline objects in my application
>
> I think this should work. I meant to have 2 separate artifacts and deploy
> them separately, but if your app runs batch processing with 2 sequential
> steps, 2 pipelines should work too:
>
> - writePipeline.run().waitUntilFinish()
> - readAndWritePipeline.run().waitUntilFinish()
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Use case is something like this:
>> A source writes source data to kinesis and same is used to compute
>> derived data which is again written back to same stream so next level of
>> derived data can be computed from previous derived data and so on.
>>
>> Would there be any issues from beam side to do the same within a single
>> pipeline?
>>
>> When you say I have to split my app into two do you mean that I have to
>> create two pipeline objects in my application?
>>
>> If so then how will application end?
>>
>> Note that source is of finite size which gets written into kinesis.
>>
>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>> limitations in achieving what we want then please let me know.
>>
>> Thanks
>>
>>
>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p....@gmail.com>
>> wrote:
>>
>>> Hello!
>>>
>>> I've never seen use-cases where it would be necessary. What are you
>>> trying to achieve? Some context would be helpful.
>>> Your example looks like you can split your app into two - one writes
>>> into streamName and the others read from streamName.
>>>
>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>> is not maintained anymore. Better to use this instead:
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>
>>>>
>>>> My pipeline is something like this: (*note the kinesis stream used to
>>>> write to and then again read from is empty before starting the app*)
>>>>
>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>> Pipeline pipeline = Pipeline.create(options);
>>>>
>>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>>
>>>> // populate an empty kinesis stream
>>>> input
>>>> .apply(
>>>> KinesisIO.write()
>>>> .withStreamName(streamName)
>>>> // other IO configs ....
>>>> );
>>>>
>>>> // within same application start another pipeline
>>>> // to read from some kinesis stream from start
>>>> PCollection<> output = pipeline
>>>> .apply(
>>>> KinesisIO.read()
>>>> .withStreamName(streamName)
>>>> .withMaxReadTime(duration) // wait for some duration before deciding
>>>> to close the pipeline
>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>>> from start
>>>> // other IO configs
>>>> )
>>>> .apply(/* apply other transformations */);
>>>>
>>>>
>>>> // write transformed output to same kinesis stream
>>>> output
>>>> .apply(
>>>> KinesisIO.write()
>>>> .withStreamName(streamName)
>>>> // other IO configs
>>>> );
>>>>
>>>> // also write transformed output to some other kinesis stream
>>>> output
>>>> .apply(
>>>> KinesisIO.write()
>>>> .withStreamName(otherStreamName) // a different kinesis stream
>>>> // other IO configs
>>>> );
>>>>
>>>>
>>>> pipeline.run().waitUntilFinish();
>>>>
>>>>
>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> Will something like this work in a single beam application ?
>>>> Is there a better way of designing this ?
>>>>
>>>> I am right now trying to run this using a direct runner but I am facing
>>>> some issues in reading from the same kinesis stream again.
>>>> It is actually able to read the records but somehow read records are
>>>> not pushed downstream for further processing.
>>>>
>>>> Before debugging it further and looking into any logic issues or bugs
>>>> in my code, I wanted to be sure if something like this is possible under
>>>> beam constructs.
>>>>
>>>> Please let me know your thoughts.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

Posted by Pavel Solomin <p....@gmail.com>.
> two pipeline objects in my application

I think this should work. I meant to have 2 separate artifacts and deploy
them separately, but if your app runs batch processing with 2 sequential
steps, 2 pipelines should work too:

- writePipeline.run().waitUntilFinish()
- readAndWritePipeline.run().waitUntilFinish()

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 10 May 2023 at 11:49, Sachin Mittal <sj...@gmail.com> wrote:

> Use case is something like this:
> A source writes source data to kinesis and same is used to compute derived
> data which is again written back to same stream so next level of derived
> data can be computed from previous derived data and so on.
>
> Would there be any issues from beam side to do the same within a single
> pipeline?
>
> When you say I have to split my app into two do you mean that I have to
> create two pipeline objects in my application?
>
> If so then how will application end?
>
> Note that source is of finite size which gets written into kinesis.
>
> Also we do plan to migrate to aws2 io, but later. If aws1 has some
> limitations in achieving what we want then please let me know.
>
> Thanks
>
>
> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p....@gmail.com>
> wrote:
>
>> Hello!
>>
>> I've never seen use-cases where it would be necessary. What are you
>> trying to achieve? Some context would be helpful.
>> Your example looks like you can split your app into two - one writes into
>> streamName and the others read from streamName.
>>
>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is
>> not maintained anymore. Better to use this instead:
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> Hi,
>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>
>>>
>>> My pipeline is something like this: (*note the kinesis stream used to
>>> write to and then again read from is empty before starting the app*)
>>>
>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>
>>> // populate an empty kinesis stream
>>> input
>>> .apply(
>>> KinesisIO.write()
>>> .withStreamName(streamName)
>>> // other IO configs ....
>>> );
>>>
>>> // within same application start another pipeline
>>> // to read from some kinesis stream from start
>>> PCollection<> output = pipeline
>>> .apply(
>>> KinesisIO.read()
>>> .withStreamName(streamName)
>>> .withMaxReadTime(duration) // wait for some duration before deciding to
>>> close the pipeline
>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>> from start
>>> // other IO configs
>>> )
>>> .apply(/* apply other transformations */);
>>>
>>>
>>> // write transformed output to same kinesis stream
>>> output
>>> .apply(
>>> KinesisIO.write()
>>> .withStreamName(streamName)
>>> // other IO configs
>>> );
>>>
>>> // also write transformed output to some other kinesis stream
>>> output
>>> .apply(
>>> KinesisIO.write()
>>> .withStreamName(otherStreamName) // a different kinesis stream
>>> // other IO configs
>>> );
>>>
>>>
>>> pipeline.run().waitUntilFinish();
>>>
>>>
>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> Will something like this work in a single beam application ?
>>> Is there a better way of designing this ?
>>>
>>> I am right now trying to run this using a direct runner but I am facing
>>> some issues in reading from the same kinesis stream again.
>>> It is actually able to read the records but somehow read records are not
>>> pushed downstream for further processing.
>>>
>>> Before debugging it further and looking into any logic issues or bugs in
>>> my code, I wanted to be sure if something like this is possible under beam
>>> constructs.
>>>
>>> Please let me know your thoughts.
>>>
>>> Thanks
>>> Sachin
>>>
>>>

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

Posted by Sachin Mittal <sj...@gmail.com>.
Use case is something like this:
A source writes source data to kinesis and same is used to compute derived
data which is again written back to same stream so next level of derived
data can be computed from previous derived data and so on.

Would there be any issues from beam side to do the same within a single
pipeline?

When you say I have to split my app into two do you mean that I have to
create two pipeline objects in my application?

If so then how will application end?

Note that source is of finite size which gets written into kinesis.

Also we do plan to migrate to aws2 io, but later. If aws1 has some
limitations in achieving what we want then please let me know.

Thanks


On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p....@gmail.com> wrote:

> Hello!
>
> I've never seen use-cases where it would be necessary. What are you trying
> to achieve? Some context would be helpful.
> Your example looks like you can split your app into two - one writes into
> streamName and the others read from streamName.
>
> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is
> not maintained anymore. Better to use this instead:
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sj...@gmail.com> wrote:
>
>> Hi,
>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>
>>
>> My pipeline is something like this: (*note the kinesis stream used to
>> write to and then again read from is empty before starting the app*)
>>
>> ---------------------------------------------------------------------------------------------------------------------------------------
>> Pipeline pipeline = Pipeline.create(options);
>>
>> PCollection<> input = pipeline.apply(/* read from some source */);
>>
>> // populate an empty kinesis stream
>> input
>> .apply(
>> KinesisIO.write()
>> .withStreamName(streamName)
>> // other IO configs ....
>> );
>>
>> // within same application start another pipeline
>> // to read from some kinesis stream from start
>> PCollection<> output = pipeline
>> .apply(
>> KinesisIO.read()
>> .withStreamName(streamName)
>> .withMaxReadTime(duration) // wait for some duration before deciding to
>> close the pipeline
>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>> from start
>> // other IO configs
>> )
>> .apply(/* apply other transformations */);
>>
>>
>> // write transformed output to same kinesis stream
>> output
>> .apply(
>> KinesisIO.write()
>> .withStreamName(streamName)
>> // other IO configs
>> );
>>
>> // also write transformed output to some other kinesis stream
>> output
>> .apply(
>> KinesisIO.write()
>> .withStreamName(otherStreamName) // a different kinesis stream
>> // other IO configs
>> );
>>
>>
>> pipeline.run().waitUntilFinish();
>>
>>
>> ---------------------------------------------------------------------------------------------------------------------------------------
>>
>> Will something like this work in a single beam application ?
>> Is there a better way of designing this ?
>>
>> I am right now trying to run this using a direct runner but I am facing
>> some issues in reading from the same kinesis stream again.
>> It is actually able to read the records but somehow read records are not
>> pushed downstream for further processing.
>>
>> Before debugging it further and looking into any logic issues or bugs in
>> my code, I wanted to be sure if something like this is possible under beam
>> constructs.
>>
>> Please let me know your thoughts.
>>
>> Thanks
>> Sachin
>>
>>

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

Posted by Pavel Solomin <p....@gmail.com>.
Hello!

I've never seen use-cases where it would be necessary. What are you trying
to achieve? Some context would be helpful.
Your example looks like you can split your app into two - one writes into
streamName and the others read from streamName.

P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is
not maintained anymore. Better to use this instead:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 10 May 2023 at 10:50, Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> I am using aws beam sdk1 to read from and write to a kinesis stream.
> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>
>
> My pipeline is something like this: (*note the kinesis stream used to
> write to and then again read from is empty before starting the app*)
>
> ---------------------------------------------------------------------------------------------------------------------------------------
> Pipeline pipeline = Pipeline.create(options);
>
> PCollection<> input = pipeline.apply(/* read from some source */);
>
> // populate an empty kinesis stream
> input
> .apply(
> KinesisIO.write()
> .withStreamName(streamName)
> // other IO configs ....
> );
>
> // within same application start another pipeline
> // to read from some kinesis stream from start
> PCollection<> output = pipeline
> .apply(
> KinesisIO.read()
> .withStreamName(streamName)
> .withMaxReadTime(duration) // wait for some duration before deciding to
> close the pipeline
> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
> from start
> // other IO configs
> )
> .apply(/* apply other transformations */);
>
>
> // write transformed output to same kinesis stream
> output
> .apply(
> KinesisIO.write()
> .withStreamName(streamName)
> // other IO configs
> );
>
> // also write transformed output to some other kinesis stream
> output
> .apply(
> KinesisIO.write()
> .withStreamName(otherStreamName) // a different kinesis stream
> // other IO configs
> );
>
>
> pipeline.run().waitUntilFinish();
>
>
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Will something like this work in a single beam application ?
> Is there a better way of designing this ?
>
> I am right now trying to run this using a direct runner but I am facing
> some issues in reading from the same kinesis stream again.
> It is actually able to read the records but somehow read records are not
> pushed downstream for further processing.
>
> Before debugging it further and looking into any logic issues or bugs in
> my code, I wanted to be sure if something like this is possible under beam
> constructs.
>
> Please let me know your thoughts.
>
> Thanks
> Sachin
>
>