You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "SmallWong (Jira)" <ji...@apache.org> on 2020/04/16 13:33:00 UTC
[jira] [Commented] (FLINK-8663) Execution of DataStreams result in
non functionality of size of Window for countWindow
[ https://issues.apache.org/jira/browse/FLINK-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084870#comment-17084870 ]
SmallWong commented on FLINK-8663:
----------------------------------
The first 5 values is be trigger to calcalate when using window(10, 5), at this moment the window size is less then 10.
> Execution of DataStreams result in non functionality of size of Window for countWindow
> --------------------------------------------------------------------------------------
>
> Key: FLINK-8663
> URL: https://issues.apache.org/jira/browse/FLINK-8663
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.4.0
> Environment: package com.vnl.stocks;
> import java.util.concurrent.TimeUnit;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.AllWindowedStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.WindowedStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> public class StocksProcessing {
>
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //Read from a socket stream at map it to StockPrice objects
> DataStream<StockPrice> socketStockStream = env
> .socketTextStream("localhost", 9999)
> .map(new MapFunction<String, StockPrice>() {
> private String[] tokens;
>
> @Override
> public StockPrice map(String value) throws Exception {
> tokens = value.split(",");
> return new StockPrice(tokens[0],
> Double.parseDouble(tokens[1]));
> }
> });
>
> socketStockStream.print();
> //Generate other stock streams
> DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
> // DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
> // DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
> // DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
>
> //Merge all stock streams together
>
> DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream/*, FTSE_stream, DJI_stream, BUX_stream*/);
>
>
> // stockStream.print();
> Thread.sleep(100);
>
> AllWindowedStream<StockPrice, GlobalWindow> windowedStream = stockStream
> .countWindowAll(10, 5);
>
> //.keyBy("symbol")
> //.timeWindowAll(Time.of(10, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));
>
> //stockStream.keyBy("symbol");
> //Compute some simple statistics on a rolling window
> DataStream<StockPrice> lowest = windowedStream.maxBy("price");
> //DataStream<StockPrice> highest = windowedStream.;
> /*DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol")
> .maxBy("price").flatten();
> DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol")
> .mapWindow(new WindowMean()).flatten();*/
> lowest.print();
>
> Thread.sleep(100);
> /*
> AllWindowedStream<StockPrice, GlobalWindow> windowedStream1 = lowest
> .countWindowAll(5,2);
> //windowedStream1.print();
> DataStream<StockPrice> highest = windowedStream1.minBy("price");*/
> //highest.print();
>
> env.execute("Stock stream");
> }
> }
> Reporter: Subham
> Priority: Major
>
> I used AllWindowedStream<?,GlobalWindow> to process a stream and generate maximum of my window using countWindowAll functions. In this output the size and slide of window works incorrectly.
> Refer below example for the bug
> Initial stream : 1,2,3,4,5,6.........
> Output 1: (Find min for window 10,5) : 1,6,11.....(This is correct)
> However if i calculate maximum, I get output as:
> Output 2: (Find max for window 10,5) : 5,10,15.... (which is wrong)
> Expected: 10,15,20....
>
> Please resolve this error.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)