You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2019/11/25 22:44:51 UTC

Pre-process data before it hits the Source

Hi,
Need to pre-process data(transform incoming data to a different format)
before it hits the Source I have defined. How can I do that ?

I tried to use a .map on the DataStream but that is too late as the data
has already hit the Source I defined.
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
socketTimeout);
DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
env.addSource(kinesisConsumer);

DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
TransformFunction(...));//too late here

TIA,

Re: Pre-process data before it hits the Source

Posted by Felipe Gutierrez <fe...@gmail.com>.
ok. I am sorry, I thought that was you that said this.

Maybe it is just a matter of expression that made the question confused.
But, yes. In the source function something can be done. Not before.

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Nov 26, 2019 at 12:35 PM vino yang <ya...@gmail.com> wrote:

> Hi Felipe,
>
> >> But you said, "before it hits the Source".
>
> I did not say this. Vijay said it. About this question, he may not think
> about customizing the source connector.
>
> If he does not try to find a solution in the Flink domain. Why he asked
> Flink questions and pasted Flink program?
>
> IMO, It's just a matter of expression. WDYT?
>
> Best,
> Vino
>
> Felipe Gutierrez <fe...@gmail.com> 于2019年11月26日周二 下午5:16写道:
>
>> Hi Vino,
>>
>> yes, in the source function it is possible. But you said, "before it
>> hits the Source". So, IMO I think it is outside of the flink workflow.
>> Best,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Nov 26, 2019 at 10:09 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> Why do you think it's not possible.
>>>
>>> My thought is we can do the data pre-procession in the source function.
>>> If so, source function would contain consume upstream events then do
>>> pre-processing then emits to the downstream.
>>>
>>> Best,
>>> Vino
>>>
>>>
>>> Felipe Gutierrez <fe...@gmail.com> 于2019年11月26日周二 下午4:56写道:
>>>
>>>> I am afraid that this is not possible in FLink, since the entry point
>>>> of all transformation is the source function. Everything that we can
>>>> pre-process is in the source function or on the downstream operators.
>>>> If you want to pre-process something before the data hits the source
>>>> you will have to rely on the broker/storage/queue that the source consumes
>>>> your data, not in FLink.
>>>>
>>>> Best,
>>>> Felipe
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Tue, Nov 26, 2019 at 2:57 AM vino yang <ya...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Vijay,
>>>>>
>>>>> IMO, the semantics of the source is not changeless. It can contain
>>>>> integrate with third-party systems and consume events. However, it can also
>>>>> contain more business logic about your data pre-process after consuming
>>>>> events.
>>>>>
>>>>> Maybe it needs some customization. WDYT?
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> Vijay Balakrishnan <bv...@gmail.com> 于2019年11月26日周二 上午6:45写道:
>>>>>
>>>>>> Hi,
>>>>>> Need to pre-process data(transform incoming data to a different
>>>>>> format) before it hits the Source I have defined. How can I do that ?
>>>>>>
>>>>>> I tried to use a .map on the DataStream but that is too late as the
>>>>>> data has already hit the Source I defined.
>>>>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
>>>>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>>>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>>>>> socketTimeout);
>>>>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>>>>> env.addSource(kinesisConsumer);
>>>>>>
>>>>>> DataStream<Map<String, Object>> kinesisStream1 =
>>>>>> kinesisStream.map(new TransformFunction(...));//too late here
>>>>>>
>>>>>> TIA,
>>>>>>
>>>>>

Re: Pre-process data before it hits the Source

Posted by vino yang <ya...@gmail.com>.
Hi Felipe,

>> But you said, "before it hits the Source".

I did not say this. Vijay said it. About this question, he may not think
about customizing the source connector.

If he does not try to find a solution in the Flink domain. Why he asked
Flink questions and pasted Flink program?

IMO, It's just a matter of expression. WDYT?

Best,
Vino

Felipe Gutierrez <fe...@gmail.com> 于2019年11月26日周二 下午5:16写道:

> Hi Vino,
>
> yes, in the source function it is possible. But you said, "before it hits
> the Source". So, IMO I think it is outside of the flink workflow.
> Best,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 26, 2019 at 10:09 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> Why do you think it's not possible.
>>
>> My thought is we can do the data pre-procession in the source function.
>> If so, source function would contain consume upstream events then do
>> pre-processing then emits to the downstream.
>>
>> Best,
>> Vino
>>
>>
>> Felipe Gutierrez <fe...@gmail.com> 于2019年11月26日周二 下午4:56写道:
>>
>>> I am afraid that this is not possible in FLink, since the entry point of
>>> all transformation is the source function. Everything that we can
>>> pre-process is in the source function or on the downstream operators.
>>> If you want to pre-process something before the data hits the source you
>>> will have to rely on the broker/storage/queue that the source consumes your
>>> data, not in FLink.
>>>
>>> Best,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Nov 26, 2019 at 2:57 AM vino yang <ya...@gmail.com> wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> IMO, the semantics of the source is not changeless. It can contain
>>>> integrate with third-party systems and consume events. However, it can also
>>>> contain more business logic about your data pre-process after consuming
>>>> events.
>>>>
>>>> Maybe it needs some customization. WDYT?
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> Vijay Balakrishnan <bv...@gmail.com> 于2019年11月26日周二 上午6:45写道:
>>>>
>>>>> Hi,
>>>>> Need to pre-process data(transform incoming data to a different
>>>>> format) before it hits the Source I have defined. How can I do that ?
>>>>>
>>>>> I tried to use a .map on the DataStream but that is too late as the
>>>>> data has already hit the Source I defined.
>>>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
>>>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>>>> socketTimeout);
>>>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>>>> env.addSource(kinesisConsumer);
>>>>>
>>>>> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
>>>>> TransformFunction(...));//too late here
>>>>>
>>>>> TIA,
>>>>>
>>>>

Re: Pre-process data before it hits the Source

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Vino,

yes, in the source function it is possible. But you said, "before it hits
the Source". So, IMO I think it is outside of the flink workflow.
Best,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Nov 26, 2019 at 10:09 AM vino yang <ya...@gmail.com> wrote:

> Hi Felipe,
>
> Why do you think it's not possible.
>
> My thought is we can do the data pre-procession in the source function. If
> so, source function would contain consume upstream events then do
> pre-processing then emits to the downstream.
>
> Best,
> Vino
>
>
> Felipe Gutierrez <fe...@gmail.com> 于2019年11月26日周二 下午4:56写道:
>
>> I am afraid that this is not possible in FLink, since the entry point of
>> all transformation is the source function. Everything that we can
>> pre-process is in the source function or on the downstream operators.
>> If you want to pre-process something before the data hits the source you
>> will have to rely on the broker/storage/queue that the source consumes your
>> data, not in FLink.
>>
>> Best,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Nov 26, 2019 at 2:57 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> IMO, the semantics of the source is not changeless. It can contain
>>> integrate with third-party systems and consume events. However, it can also
>>> contain more business logic about your data pre-process after consuming
>>> events.
>>>
>>> Maybe it needs some customization. WDYT?
>>>
>>> Best,
>>> Vino
>>>
>>> Vijay Balakrishnan <bv...@gmail.com> 于2019年11月26日周二 上午6:45写道:
>>>
>>>> Hi,
>>>> Need to pre-process data(transform incoming data to a different format)
>>>> before it hits the Source I have defined. How can I do that ?
>>>>
>>>> I tried to use a .map on the DataStream but that is too late as the
>>>> data has already hit the Source I defined.
>>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
>>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>>> socketTimeout);
>>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>>> env.addSource(kinesisConsumer);
>>>>
>>>> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
>>>> TransformFunction(...));//too late here
>>>>
>>>> TIA,
>>>>
>>>

Re: Pre-process data before it hits the Source

Posted by vino yang <ya...@gmail.com>.
Hi Felipe,

Why do you think it's not possible.

My thought is we can do the data pre-procession in the source function. If
so, source function would contain consume upstream events then do
pre-processing then emits to the downstream.

Best,
Vino


Felipe Gutierrez <fe...@gmail.com> 于2019年11月26日周二 下午4:56写道:

> I am afraid that this is not possible in FLink, since the entry point of
> all transformation is the source function. Everything that we can
> pre-process is in the source function or on the downstream operators.
> If you want to pre-process something before the data hits the source you
> will have to rely on the broker/storage/queue that the source consumes your
> data, not in FLink.
>
> Best,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 26, 2019 at 2:57 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> IMO, the semantics of the source is not changeless. It can contain
>> integrate with third-party systems and consume events. However, it can also
>> contain more business logic about your data pre-process after consuming
>> events.
>>
>> Maybe it needs some customization. WDYT?
>>
>> Best,
>> Vino
>>
>> Vijay Balakrishnan <bv...@gmail.com> 于2019年11月26日周二 上午6:45写道:
>>
>>> Hi,
>>> Need to pre-process data(transform incoming data to a different format)
>>> before it hits the Source I have defined. How can I do that ?
>>>
>>> I tried to use a .map on the DataStream but that is too late as the data
>>> has already hit the Source I defined.
>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>> socketTimeout);
>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>> env.addSource(kinesisConsumer);
>>>
>>> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
>>> TransformFunction(...));//too late here
>>>
>>> TIA,
>>>
>>

Re: Pre-process data before it hits the Source

Posted by Felipe Gutierrez <fe...@gmail.com>.
I am afraid that this is not possible in FLink, since the entry point of
all transformation is the source function. Everything that we can
pre-process is in the source function or on the downstream operators.
If you want to pre-process something before the data hits the source you
will have to rely on the broker/storage/queue that the source consumes your
data, not in FLink.

Best,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Nov 26, 2019 at 2:57 AM vino yang <ya...@gmail.com> wrote:

> Hi Vijay,
>
> IMO, the semantics of the source is not changeless. It can contain
> integrate with third-party systems and consume events. However, it can also
> contain more business logic about your data pre-process after consuming
> events.
>
> Maybe it needs some customization. WDYT?
>
> Best,
> Vino
>
> Vijay Balakrishnan <bv...@gmail.com> 于2019年11月26日周二 上午6:45写道:
>
>> Hi,
>> Need to pre-process data(transform incoming data to a different format)
>> before it hits the Source I have defined. How can I do that ?
>>
>> I tried to use a .map on the DataStream but that is too late as the data
>> has already hit the Source I defined.
>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>> socketTimeout);
>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>> env.addSource(kinesisConsumer);
>>
>> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
>> TransformFunction(...));//too late here
>>
>> TIA,
>>
>

Re: Pre-process data before it hits the Source

Posted by vino yang <ya...@gmail.com>.
Hi Vijay,

IMO, the semantics of the source is not changeless. It can contain
integrate with third-party systems and consume events. However, it can also
contain more business logic about your data pre-process after consuming
events.

Maybe it needs some customization. WDYT?

Best,
Vino

Vijay Balakrishnan <bv...@gmail.com> 于2019年11月26日周二 上午6:45写道:

> Hi,
> Need to pre-process data(transform incoming data to a different format)
> before it hits the Source I have defined. How can I do that ?
>
> I tried to use a .map on the DataStream but that is too late as the data
> has already hit the Source I defined.
> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
> socketTimeout);
> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
>
> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
> TransformFunction(...));//too late here
>
> TIA,
>