You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dhruv Kumar <ga...@gmail.com> on 2018/05/07 11:18:47 UTC

Signal for End of Stream

Hi

Is there a way I can capture the end of stream signal for streams which are replayed from historical data? I need the end of stream signal to tell the Flink program to finish its execution.

Below is the use case in detail:
1. An independent log replayer program sends the records to a socket (identified by ip address and port).
2. Flink program reads the incoming records via socketTextStream from the above mentioned socket, applies a KeyBy operator on the incoming records and then does some processing, finally writing them to another socket.

How do I tell the Flink program to finish its execution? Is there any information which I can add to the records while they are sent from the replayer program and which can be parsed when the records arrive inside the Flink program?

Let me know if anything is not clear.

Thanks

--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me


Re: Signal for End of Stream

Posted by Dhruv Kumar <ga...@gmail.com>.
Fabian, Thanks a lot for your continuous help! Really appreciate it.

Sent from Phone.

> On May 8, 2018, at 03:06, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Dhruv,
> 
> The changes look good to me.
> 
> Best, Fabian
> 
> 2018-05-08 5:37 GMT+02:00 Dhruv Kumar <ga...@gmail.com>:
>> Thanks a lot, Fabian for your response.
>> 
>> What I understand is that if I write my own Sourcefunction such that it handles the "end of stream” record and make the source exit from run() method, the flink program will terminate. 
>> 
>> I have been using SocketTextStreamFunction till now.
>> So, I duplicated the SocketTextStreamFunction class into another class named CustomSocketTextStreamFunction which is exactly the same as SocketTextStreamFunction except for one change in the run() method. Change is highlighted in BOLD below. Can you take a look and let me know if this will work and it won’t have much of performance impact? I tested it on my machine locally and seems to work fine. But I just want to make sure that it won’t have any side effects/race conditions etc.
>> 
>> ```
>> @Override
>>     public void run(SourceContext<String> ctx) throws Exception {
>>         final StringBuilder buffer = new StringBuilder();
>>         long attempt = 0;
>> 
>>         while (isRunning) {
>> 
>>             try (Socket socket = new Socket()) {
>>                 currentSocket = socket;
>> 
>>                 LOG.info("Custom: Connecting to server socket " + hostname + ':' + port);
>>                 socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
>>                 BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
>> 
>>                 char[] cbuf = new char[8192];
>>                 int bytesRead;
>>                 while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
>>                     buffer.append(cbuf, 0, bytesRead);
>>                     int delimPos;
>>                     while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
>>                         String record = buffer.substring(0, delimPos);
>>                         if(record.equals("END")) {
>>                             LOG.info("End of stream encountered");
>>                             isRunning = false;
>>                             buffer.delete(0, delimPos + delimiter.length());
>>                             break;
>>                         }
>>                         // truncate trailing carriage return
>>                         if (delimiter.equals("\n") && record.endsWith("\r")) {
>>                             record = record.substring(0, record.length() - 1);
>>                         }
>>                         ctx.collect(record);
>>                         buffer.delete(0, delimPos + delimiter.length());
>>                     }
>>                 }
>>             }
>> 
>>             // if we dropped out of this loop due to an EOF, sleep and retry
>>             if (isRunning) {
>>                 attempt++;
>>                 if (maxNumRetries == -1 || attempt < maxNumRetries) {
>>                     LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
>>                     Thread.sleep(delayBetweenRetries);
>>                 }
>>                 else {
>>                     // this should probably be here, but some examples expect simple exists of the stream source
>>                     // throw new EOFException("Reached end of stream and reconnects are not enabled.");
>>                     break;
>>                 }
>>             }
>>         }
>> 
>>         // collect trailing data
>>         if (buffer.length() > 0) {
>>             ctx.collect(buffer.toString());
>>         }
>>     }
>> ```
>> 
>> 
>> --------------------------------------------------
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>> 
>>> On May 7, 2018, at 11:04, Fabian Hueske <fh...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> Flink will automatically stop the execution of a DataStream program once all sources have finished to provide data, i.e., when all SourceFunction return from the run() method.
>>> The DeserializationSchema.isEndOfStream() method can be used to tell a built-in SourceFunction such as a KafkaConsumer that it should leave the run() method.
>>> If you implement your own SourceFunction you can leave run() after you ingested all data.
>>> 
>>> Note, that Flink won't wait for all processing time timers but will immediately shutdown the program after the last in-flight record was processed. 
>>> Event-time timers will be handled because each source emits a Long.MAX_VALUE watermark after it emitted its last record.
>>> 
>>> Best, Fabian
>>> 
>>> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <ga...@gmail.com>:
>>>> I notice that there is some DeserializationSchema in org.apache.flink.api.common.serialization which has a function isEndOfStream but I am not sure if I can use it in my use case. 
>>>> 
>>>> --------------------------------------------------
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me
>>>> 
>>>>> On May 7, 2018, at 06:18, Dhruv Kumar <ga...@gmail.com> wrote:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Is there a way I can capture the end of stream signal for streams which are replayed from historical data? I need the end of stream signal to tell the Flink program to finish its execution.
>>>>> 
>>>>> Below is the use case in detail:
>>>>> 1. An independent log replayer program sends the records to a socket (identified by ip address and port).
>>>>> 2. Flink program reads the incoming records via socketTextStream from the above mentioned socket, applies a KeyBy operator on the incoming records and then does some processing, finally writing them to another socket.
>>>>> 
>>>>> How do I tell the Flink program to finish its execution? Is there any information which I can add to the records while they are sent from the replayer program and which can be parsed when the records arrive inside the Flink program?
>>>>> 
>>>>> Let me know if anything is not clear.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> --------------------------------------------------
>>>>> Dhruv Kumar
>>>>> PhD Candidate
>>>>> Department of Computer Science and Engineering
>>>>> University of Minnesota
>>>>> www.dhruvkumar.me
>>>>> 
>>>> 
>>> 
>> 
> 

Re: Signal for End of Stream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dhruv,

The changes look good to me.

Best, Fabian

2018-05-08 5:37 GMT+02:00 Dhruv Kumar <ga...@gmail.com>:

> Thanks a lot, Fabian for your response.
>
> What I understand is that if I write my own Sourcefunction such that it
> handles the "end of stream” record and make the source exit from run()
> method, the flink program will terminate.
>
> I have been using *SocketTextStreamFunction* till now.
> So, I duplicated the *SocketTextStreamFunction* class into another class
> named *CustomSocketTextStreamFunction* which is exactly the same as
> *SocketTextStreamFunction* except for one change in the *run()* method.
> Change is highlighted in *BOLD* below. Can you take a look and let me
> know if this will work and it won’t have much of performance impact? I
> tested it on my machine locally and seems to work fine. But I just want to
> make sure that it won’t have any side effects/race conditions etc.
>
> ```
> @Override
>     public void run(SourceContext<String> ctx) throws Exception {
>         final StringBuilder buffer = new StringBuilder();
>         long attempt = 0;
>
>         while (isRunning) {
>
>             try (Socket socket = new Socket()) {
>                 currentSocket = socket;
>
>                 LOG.info("Custom: Connecting to server socket " +
> hostname + ':' + port);
>                 socket.connect(new InetSocketAddress(hostname, port),
> CONNECTION_TIMEOUT_TIME);
>                 BufferedReader reader = new BufferedReader(new
> InputStreamReader(socket.getInputStream()));
>
>                 char[] cbuf = new char[8192];
>                 int bytesRead;
>                 while (isRunning && (bytesRead = reader.read(cbuf)) != -1)
> {
>                     buffer.append(cbuf, 0, bytesRead);
>                     int delimPos;
>                     while (buffer.length() >= delimiter.length() &&
> (delimPos = buffer.indexOf(delimiter)) != -1) {
>                         String record = buffer.substring(0, delimPos);
>                         *if(record.equals("END")) {*
> *                            LOG.info <http://LOG.info>("End of stream
> encountered");*
> *                            isRunning = false;*
> *                            buffer.delete(0, delimPos +
> delimiter.length());*
> *                            break;*
> *                        }*
>                         // truncate trailing carriage return
>                         if (delimiter.equals("\n") &&
> record.endsWith("\r")) {
>                             record = record.substring(0, record.length() -
> 1);
>                         }
>                         ctx.collect(record);
>                         buffer.delete(0, delimPos + delimiter.length());
>                     }
>                 }
>             }
>
>             // if we dropped out of this loop due to an EOF, sleep and
> retry
>             if (isRunning) {
>                 attempt++;
>                 if (maxNumRetries == -1 || attempt < maxNumRetries) {
>                     LOG.warn("Lost connection to server socket. Retrying
> in " + delayBetweenRetries + " msecs...");
>                     Thread.sleep(delayBetweenRetries);
>                 }
>                 else {
>                     // this should probably be here, but some examples
> expect simple exists of the stream source
>                     // throw new EOFException("Reached end of stream and
> reconnects are not enabled.");
>                     break;
>                 }
>             }
>         }
>
>         // collect trailing data
>         if (buffer.length() > 0) {
>             ctx.collect(buffer.toString());
>         }
>     }
> ```
>
>
> --------------------------------------------------
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On May 7, 2018, at 11:04, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi,
>
> Flink will automatically stop the execution of a DataStream program once
> all sources have finished to provide data, i.e., when all SourceFunction
> return from the run() method.
> The DeserializationSchema.isEndOfStream() method can be used to tell a
> built-in SourceFunction such as a KafkaConsumer that it should leave the
> run() method.
> If you implement your own SourceFunction you can leave run() after you
> ingested all data.
>
> Note, that Flink won't wait for all processing time timers but will
> immediately shutdown the program after the last in-flight record was
> processed.
> Event-time timers will be handled because each source emits a
> Long.MAX_VALUE watermark after it emitted its last record.
>
> Best, Fabian
>
> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <ga...@gmail.com>:
>
>> I notice that there is some *DeserializationSchema* in *org.apache.flink.api.common.se
>> <http://org.apache.flink.api.common.se>rialization* which has a function
>> *isEndOfStream* but I am not sure if I can use it in my use case.
>>
>> --------------------------------------------------
>> *Dhruv Kumar*
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>>
>> On May 7, 2018, at 06:18, Dhruv Kumar <ga...@gmail.com> wrote:
>>
>> Hi
>>
>> Is there a way I can capture the end of stream signal for streams which
>> are replayed from historical data? I need the end of stream signal to tell
>> the Flink program to finish its execution.
>>
>> Below is the use case in detail:
>> 1. An independent log replayer program sends the records to a socket
>> (identified by ip address and port).
>> 2. Flink program reads the incoming records via socketTextStream from the
>> above mentioned socket, applies a KeyBy operator on the incoming records
>> and then does some processing, finally writing them to another socket.
>>
>> How do I tell the Flink program to finish its execution? Is there any
>> information which I can add to the records while they are sent from the
>> replayer program and which can be parsed when the records arrive inside the
>> Flink program?
>>
>> Let me know if anything is not clear.
>>
>> Thanks
>>
>> --------------------------------------------------
>> *Dhruv Kumar*
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>>
>>
>>
>
>

Re: Signal for End of Stream

Posted by Dhruv Kumar <ga...@gmail.com>.
Thanks a lot, Fabian for your response.

What I understand is that if I write my own Sourcefunction such that it handles the "end of stream” record and make the source exit from run() method, the flink program will terminate. 

I have been using SocketTextStreamFunction till now.
So, I duplicated the SocketTextStreamFunction class into another class named CustomSocketTextStreamFunction which is exactly the same as SocketTextStreamFunction except for one change in the run() method. Change is highlighted in BOLD below. Can you take a look and let me know if this will work and it won’t have much of performance impact? I tested it on my machine locally and seems to work fine. But I just want to make sure that it won’t have any side effects/race conditions etc.

```
@Override
    public void run(SourceContext<String> ctx) throws Exception {
        final StringBuilder buffer = new StringBuilder();
        long attempt = 0;

        while (isRunning) {

            try (Socket socket = new Socket()) {
                currentSocket = socket;

                LOG.info("Custom: Connecting to server socket " + hostname + ':' + port);
                socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                char[] cbuf = new char[8192];
                int bytesRead;
                while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
                    buffer.append(cbuf, 0, bytesRead);
                    int delimPos;
                    while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
                        String record = buffer.substring(0, delimPos);
                        if(record.equals("END")) {
                            LOG.info("End of stream encountered");
                            isRunning = false;
                            buffer.delete(0, delimPos + delimiter.length());
                            break;
                        }
                        // truncate trailing carriage return
                        if (delimiter.equals("\n") && record.endsWith("\r")) {
                            record = record.substring(0, record.length() - 1);
                        }
                        ctx.collect(record);
                        buffer.delete(0, delimPos + delimiter.length());
                    }
                }
            }

            // if we dropped out of this loop due to an EOF, sleep and retry
            if (isRunning) {
                attempt++;
                if (maxNumRetries == -1 || attempt < maxNumRetries) {
                    LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
                    Thread.sleep(delayBetweenRetries);
                }
                else {
                    // this should probably be here, but some examples expect simple exists of the stream source
                    // throw new EOFException("Reached end of stream and reconnects are not enabled.");
                    break;
                }
            }
        }

        // collect trailing data
        if (buffer.length() > 0) {
            ctx.collect(buffer.toString());
        }
    }
```


--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 7, 2018, at 11:04, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi,
> 
> Flink will automatically stop the execution of a DataStream program once all sources have finished to provide data, i.e., when all SourceFunction return from the run() method.
> The DeserializationSchema.isEndOfStream() method can be used to tell a built-in SourceFunction such as a KafkaConsumer that it should leave the run() method.
> If you implement your own SourceFunction you can leave run() after you ingested all data.
> 
> Note, that Flink won't wait for all processing time timers but will immediately shutdown the program after the last in-flight record was processed. 
> Event-time timers will be handled because each source emits a Long.MAX_VALUE watermark after it emitted its last record.
> 
> Best, Fabian
> 
> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>>:
> I notice that there is some DeserializationSchema in org.apache.flink.api.common.serialization which has a function isEndOfStream but I am not sure if I can use it in my use case. 
> 
> --------------------------------------------------
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
>> On May 7, 2018, at 06:18, Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi
>> 
>> Is there a way I can capture the end of stream signal for streams which are replayed from historical data? I need the end of stream signal to tell the Flink program to finish its execution.
>> 
>> Below is the use case in detail:
>> 1. An independent log replayer program sends the records to a socket (identified by ip address and port).
>> 2. Flink program reads the incoming records via socketTextStream from the above mentioned socket, applies a KeyBy operator on the incoming records and then does some processing, finally writing them to another socket.
>> 
>> How do I tell the Flink program to finish its execution? Is there any information which I can add to the records while they are sent from the replayer program and which can be parsed when the records arrive inside the Flink program?
>> 
>> Let me know if anything is not clear.
>> 
>> Thanks
>> 
>> --------------------------------------------------
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
> 


Re: Signal for End of Stream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink will automatically stop the execution of a DataStream program once
all sources have finished to provide data, i.e., when all SourceFunction
return from the run() method.
The DeserializationSchema.isEndOfStream() method can be used to tell a
built-in SourceFunction such as a KafkaConsumer that it should leave the
run() method.
If you implement your own SourceFunction you can leave run() after you
ingested all data.

Note, that Flink won't wait for all processing time timers but will
immediately shutdown the program after the last in-flight record was
processed.
Event-time timers will be handled because each source emits a
Long.MAX_VALUE watermark after it emitted its last record.

Best, Fabian

2018-05-07 17:18 GMT+02:00 Dhruv Kumar <ga...@gmail.com>:

> I notice that there is some *DeserializationSchema* in
> *org.apache.flink.api.common.serialization* which has a function
> *isEndOfStream* but I am not sure if I can use it in my use case.
>
> --------------------------------------------------
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On May 7, 2018, at 06:18, Dhruv Kumar <ga...@gmail.com> wrote:
>
> Hi
>
> Is there a way I can capture the end of stream signal for streams which
> are replayed from historical data? I need the end of stream signal to tell
> the Flink program to finish its execution.
>
> Below is the use case in detail:
> 1. An independent log replayer program sends the records to a socket
> (identified by ip address and port).
> 2. Flink program reads the incoming records via socketTextStream from the
> above mentioned socket, applies a KeyBy operator on the incoming records
> and then does some processing, finally writing them to another socket.
>
> How do I tell the Flink program to finish its execution? Is there any
> information which I can add to the records while they are sent from the
> replayer program and which can be parsed when the records arrive inside the
> Flink program?
>
> Let me know if anything is not clear.
>
> Thanks
>
> --------------------------------------------------
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
>
>

Re: Signal for End of Stream

Posted by Dhruv Kumar <ga...@gmail.com>.
I notice that there is some DeserializationSchema in org.apache.flink.api.common.serialization which has a function isEndOfStream but I am not sure if I can use it in my use case. 

--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 7, 2018, at 06:18, Dhruv Kumar <ga...@gmail.com> wrote:
> 
> Hi
> 
> Is there a way I can capture the end of stream signal for streams which are replayed from historical data? I need the end of stream signal to tell the Flink program to finish its execution.
> 
> Below is the use case in detail:
> 1. An independent log replayer program sends the records to a socket (identified by ip address and port).
> 2. Flink program reads the incoming records via socketTextStream from the above mentioned socket, applies a KeyBy operator on the incoming records and then does some processing, finally writing them to another socket.
> 
> How do I tell the Flink program to finish its execution? Is there any information which I can add to the records while they are sent from the replayer program and which can be parsed when the records arrive inside the Flink program?
> 
> Let me know if anything is not clear.
> 
> Thanks
> 
> --------------------------------------------------
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>