You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2021/12/06 16:42:07 UTC

Re: Table DataStream Conversion Lost Watermark

Hi Yunfeng,

it seems this is a deeper issue with the fromValues implementation. 
Under the hood, it still uses the deprecated InputFormat stack. And as 
far as I can see, there we don't emit a final MAX_WATERMARK. I will 
definitely forward this.

But toDataStream forwards watermarks correctly.

I hope this helps. Or do you think we should also rediscuss the 
fromDataStream watermark behavior?

Regards,
Timo


On 06.12.21 10:26, Yunfeng Zhou wrote:
> Hi Timo,
> 
> Thanks for your response. I encountered another problem that might be 
> relevant to the watermark as we discussed above.
> 
> In the test cases shown below. I would create a table from some data, 
> convert it to datastream and do windowAll().reduce() on it. If we need 
> to explicitly specify a `rowtime` metadata column in order to make the 
> table pass timestamps to the converted datastream, then both the test 
> cases should print out empty lists. In fact, one of them could print out 
> a list with some data. The only difference between them is that I 
> changed the value of some input data. This behavior can be reproduced 
> under Flink ML's latest java environment and configurations.
> 
> Is this the expected behavior of `toDataStream`, or I have accidentally 
> encountered a bug?
> 
> Best regards,
> Yunfeng
> 
> ```java
> 
> public class SimpleTest {
>      @Test
> public void testSimple1()throws Exception {
>          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> 
> Table inputTable = tEnv.fromValues(
>                  DataTypes.ROW(
>                          DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> DataTypes.FIELD("f4", DataTypes.INT()),
> DataTypes.FIELD("label", DataTypes.STRING())
>                  ),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "b", 0., 1., 1., 3, "l1"),
> Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> Row.of(1., "a", 1., 1., 0., 1, "l0"),
> Row.of(2., "d", 1., 1., 0., 1, "l0")
>          );
> 
> DataStream<Row> input = tEnv.toDataStream(inputTable);
> 
> System.out.println(IteratorUtils.toList(input
>                  .windowAll(EndOfStreamWindows.get())
>                  .reduce((ReduceFunction<Row>) (row, t1) -> row)
>                  .executeAndCollect()
>          ));
> }
> 
> 
>      @Test
> public void testSimple2()throws Exception {
>          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> 
> Table inputTable = tEnv.fromValues(
>                  DataTypes.ROW(
>                          DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> DataTypes.FIELD("f4", DataTypes.INT()),
> DataTypes.FIELD("label", DataTypes.STRING())
>                  ),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "a", 1., 0., 1., 2, "l1"),
> Row.of(1., "b", 0., 1., 1., 3, "l1"),
> Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
> Row.of(2., "d", 1., 1., 0., 1, "l0")
>          );
> 
> DataStream<Row> input = tEnv.toDataStream(inputTable);
> 
> System.out.println(IteratorUtils.toList(input
>                  .windowAll(EndOfStreamWindows.get())
>                  .reduce((ReduceFunction<Row>) (row, t1) -> row)
>                  .executeAndCollect()
>          ));
> }
> }
> 
> ```
> 
> ```java
> 
> /**
> * A {@link WindowAssigner} that assigns all elements of a bounded input 
> stream into one window
> * pane. The results are emitted once the input stream has ended.
> */
> public class EndOfStreamWindowsextends WindowAssigner<Object, TimeWindow> {
> 
>      private static final EndOfStreamWindowsINSTANCE =new EndOfStreamWindows();
> 
> private EndOfStreamWindows() {}
> 
>      public static EndOfStreamWindowsget() {
>          return INSTANCE;
> }
> 
>      @Override
> public Collection<TimeWindow>assignWindows(
>              Object element, long timestamp, WindowAssignerContext context) {
>          return Collections.singletonList(new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE));
> }
> 
>      @Override
> public Trigger<Object, TimeWindow>getDefaultTrigger(StreamExecutionEnvironment env) {
>          return EventTimeTrigger.create();
> }
> 
>      @Override
> public StringtoString() {
>          return "EndOfStreamWindows()";
> }
> 
>      @Override
> public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig executionConfig) {
>          return new TimeWindow.Serializer();
> }
> 
>      @Override
> public boolean isEventTime() {
>          return true;
> }
> }
> 
> ```
> 
> On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Yunfeng,
> 
>     by default the fromDataStream does not propagate watermarks into Table
>     API. Because Table API needs a time attribute in the schema that
>     corresponds to the watermarking. A time attribute will also put back
>     into the stream record during toDataStream.
> 
>     Please take a look at:
> 
>     https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
>     <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream>
> 
>     Esp. example 4 should solve your use case:
> 
>     // === EXAMPLE 4 ===
> 
>     // derive all physical columns automatically
>     // but access the stream record's timestamp for creating a rowtime
>     attribute column
>     // also rely on the watermarks generated in the DataStream API
> 
>     // we assume that a watermark strategy has been defined for
>     `dataStream`
>     before
>     // (not part of this example)
>     Table table =
>           tableEnv.fromDataStream(
>               dataStream,
>               Schema.newBuilder()
>                   .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
>                   .watermark("rowtime", "SOURCE_WATERMARK()")
>                   .build());
> 
>     I hope this helps.
> 
>     Regards,
>     Timo
> 
> 
>     On 04.11.21 12:00, Yunfeng Zhou wrote:
>      > Hi,
>      >
>      > I found that if I convert a Datastream into Table and back into
>      > Datastream, watermark of the stream will be lost. As shown in the
>      > program below, the TestOperator before the conversion will have its
>      > processWatermark() method triggered and watermark value printed,
>     but the
>      > one after the conversion will not.
>      >
>      > Is my observation correct? If so, is it the expected behavior of the
>      > conversion API? My current work needs me to convert a table into
>      > datastream and to do window operation on it, but this problem
>     blocks me
>      > from creating a window.
>      >
>      > Regards,
>      > Yunfeng
>      >
>      > ```java
>      > public class SimpleTest {
>      > public static void main(String[] args) throws Exception {
>      > StreamExecutionEnvironment env =
>      > StreamExecutionEnvironment.createLocalEnvironment();
>      > env.setParallelism(1);
>      > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>      >
>      > DataStream<Row> input = env.fromElements(Row.of(1));
>      >
>      > input = input.transform(
>      > "TestOperator",
>      > new RowTypeInfo(new
>      > TypeInformation[]{TypeInformation.of(Integer.class)}, new
>     String[]{"f0"}),
>      > new TestOperator("0")
>      > );
>      >
>      > input = tEnv.toDataStream(tEnv.fromDataStream(input));
>      >
>      > input = input.transform(
>      > "TestOperator",
>      > new RowTypeInfo(new
>      > TypeInformation[]{TypeInformation.of(Integer.class)}, new
>     String[]{"f0"}),
>      > new TestOperator("1")
>      > );
>      >
>      > System.out.println(IteratorUtils.toList(input.executeAndCollect()));
>      > }
>      >
>      > private static class TestOperator extends AbstractStreamOperator<Row>
>      > implements OneInputStreamOperator<Row, Row>{
>      > private final String prefix;
>      >
>      > private TestOperator(String prefix) {
>      > this.prefix = prefix;
>      > }
>      >
>      > @Override
>      > public void processElement(StreamRecord<Row> streamRecord) throws
>      > Exception {
>      > System.out.println(prefix + streamRecord.getValue());
>      > output.collect(streamRecord);
>      > }
>      >
>      > @Override
>      > public void processWatermark(Watermark mark) throws Exception {
>      > super.processWatermark(mark);
>      > System.out.println(prefix + mark.toString());
>      > }
>      > }
>      > }
>      > ```
> 


Re: Table DataStream Conversion Lost Watermark

Posted by Yunfeng Zhou <fl...@gmail.com>.
Hi Timo,

Thanks for this information. Since it is confirmed that toDataStream is
functioning correctly and that I can avoid this problem by not using
fromValues in my implementation, I think I have got enough information for
my current work and don't need to rediscuss fromDatastream's behavior.

Best regards,
Yunfeng

On Tue, Dec 7, 2021 at 12:42 AM Timo Walther <tw...@apache.org> wrote:

> Hi Yunfeng,
>
> it seems this is a deeper issue with the fromValues implementation.
> Under the hood, it still uses the deprecated InputFormat stack. And as
> far as I can see, there we don't emit a final MAX_WATERMARK. I will
> definitely forward this.
>
> But toDataStream forwards watermarks correctly.
>
> I hope this helps. Or do you think we should also rediscuss the
> fromDataStream watermark behavior?
>
> Regards,
> Timo
>
>
> On 06.12.21 10:26, Yunfeng Zhou wrote:
> > Hi Timo,
> >
> > Thanks for your response. I encountered another problem that might be
> > relevant to the watermark as we discussed above.
> >
> > In the test cases shown below. I would create a table from some data,
> > convert it to datastream and do windowAll().reduce() on it. If we need
> > to explicitly specify a `rowtime` metadata column in order to make the
> > table pass timestamps to the converted datastream, then both the test
> > cases should print out empty lists. In fact, one of them could print out
> > a list with some data. The only difference between them is that I
> > changed the value of some input data. This behavior can be reproduced
> > under Flink ML's latest java environment and configurations.
> >
> > Is this the expected behavior of `toDataStream`, or I have accidentally
> > encountered a bug?
> >
> > Best regards,
> > Yunfeng
> >
> > ```java
> >
> > public class SimpleTest {
> >      @Test
> > public void testSimple1()throws Exception {
> >          StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > Table inputTable = tEnv.fromValues(
> >                  DataTypes.ROW(
> >                          DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f0", DataTypes.STRING()),
> > DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f4", DataTypes.INT()),
> > DataTypes.FIELD("label", DataTypes.STRING())
> >                  ),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "b", 0., 1., 1., 3, "l1"),
> > Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> > Row.of(1., "a", 1., 1., 0., 1, "l0"),
> > Row.of(2., "d", 1., 1., 0., 1, "l0")
> >          );
> >
> > DataStream<Row> input = tEnv.toDataStream(inputTable);
> >
> > System.out.println(IteratorUtils.toList(input
> >                  .windowAll(EndOfStreamWindows.get())
> >                  .reduce((ReduceFunction<Row>) (row, t1) -> row)
> >                  .executeAndCollect()
> >          ));
> > }
> >
> >
> >      @Test
> > public void testSimple2()throws Exception {
> >          StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > Table inputTable = tEnv.fromValues(
> >                  DataTypes.ROW(
> >                          DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f0", DataTypes.STRING()),
> > DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f4", DataTypes.INT()),
> > DataTypes.FIELD("label", DataTypes.STRING())
> >                  ),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "a", 1., 0., 1., 2, "l1"),
> > Row.of(1., "b", 0., 1., 1., 3, "l1"),
> > Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> > Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
> > Row.of(2., "d", 1., 1., 0., 1, "l0")
> >          );
> >
> > DataStream<Row> input = tEnv.toDataStream(inputTable);
> >
> > System.out.println(IteratorUtils.toList(input
> >                  .windowAll(EndOfStreamWindows.get())
> >                  .reduce((ReduceFunction<Row>) (row, t1) -> row)
> >                  .executeAndCollect()
> >          ));
> > }
> > }
> >
> > ```
> >
> > ```java
> >
> > /**
> > * A {@link WindowAssigner} that assigns all elements of a bounded input
> > stream into one window
> > * pane. The results are emitted once the input stream has ended.
> > */
> > public class EndOfStreamWindowsextends WindowAssigner<Object,
> TimeWindow> {
> >
> >      private static final EndOfStreamWindowsINSTANCE =new
> EndOfStreamWindows();
> >
> > private EndOfStreamWindows() {}
> >
> >      public static EndOfStreamWindowsget() {
> >          return INSTANCE;
> > }
> >
> >      @Override
> > public Collection<TimeWindow>assignWindows(
> >              Object element, long timestamp, WindowAssignerContext
> context) {
> >          return Collections.singletonList(new TimeWindow(Long.MIN_VALUE,
> Long.MAX_VALUE));
> > }
> >
> >      @Override
> > public Trigger<Object,
> TimeWindow>getDefaultTrigger(StreamExecutionEnvironment env) {
> >          return EventTimeTrigger.create();
> > }
> >
> >      @Override
> > public StringtoString() {
> >          return "EndOfStreamWindows()";
> > }
> >
> >      @Override
> > public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig
> executionConfig) {
> >          return new TimeWindow.Serializer();
> > }
> >
> >      @Override
> > public boolean isEventTime() {
> >          return true;
> > }
> > }
> >
> > ```
> >
> > On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Yunfeng,
> >
> >     by default the fromDataStream does not propagate watermarks into
> Table
> >     API. Because Table API needs a time attribute in the schema that
> >     corresponds to the watermarking. A time attribute will also put back
> >     into the stream record during toDataStream.
> >
> >     Please take a look at:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
> >     <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
> >
> >
> >     Esp. example 4 should solve your use case:
> >
> >     // === EXAMPLE 4 ===
> >
> >     // derive all physical columns automatically
> >     // but access the stream record's timestamp for creating a rowtime
> >     attribute column
> >     // also rely on the watermarks generated in the DataStream API
> >
> >     // we assume that a watermark strategy has been defined for
> >     `dataStream`
> >     before
> >     // (not part of this example)
> >     Table table =
> >           tableEnv.fromDataStream(
> >               dataStream,
> >               Schema.newBuilder()
> >                   .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
> >                   .watermark("rowtime", "SOURCE_WATERMARK()")
> >                   .build());
> >
> >     I hope this helps.
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 04.11.21 12:00, Yunfeng Zhou wrote:
> >      > Hi,
> >      >
> >      > I found that if I convert a Datastream into Table and back into
> >      > Datastream, watermark of the stream will be lost. As shown in the
> >      > program below, the TestOperator before the conversion will have
> its
> >      > processWatermark() method triggered and watermark value printed,
> >     but the
> >      > one after the conversion will not.
> >      >
> >      > Is my observation correct? If so, is it the expected behavior of
> the
> >      > conversion API? My current work needs me to convert a table into
> >      > datastream and to do window operation on it, but this problem
> >     blocks me
> >      > from creating a window.
> >      >
> >      > Regards,
> >      > Yunfeng
> >      >
> >      > ```java
> >      > public class SimpleTest {
> >      > public static void main(String[] args) throws Exception {
> >      > StreamExecutionEnvironment env =
> >      > StreamExecutionEnvironment.createLocalEnvironment();
> >      > env.setParallelism(1);
> >      > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >      >
> >      > DataStream<Row> input = env.fromElements(Row.of(1));
> >      >
> >      > input = input.transform(
> >      > "TestOperator",
> >      > new RowTypeInfo(new
> >      > TypeInformation[]{TypeInformation.of(Integer.class)}, new
> >     String[]{"f0"}),
> >      > new TestOperator("0")
> >      > );
> >      >
> >      > input = tEnv.toDataStream(tEnv.fromDataStream(input));
> >      >
> >      > input = input.transform(
> >      > "TestOperator",
> >      > new RowTypeInfo(new
> >      > TypeInformation[]{TypeInformation.of(Integer.class)}, new
> >     String[]{"f0"}),
> >      > new TestOperator("1")
> >      > );
> >      >
> >      >
> System.out.println(IteratorUtils.toList(input.executeAndCollect()));
> >      > }
> >      >
> >      > private static class TestOperator extends
> AbstractStreamOperator<Row>
> >      > implements OneInputStreamOperator<Row, Row>{
> >      > private final String prefix;
> >      >
> >      > private TestOperator(String prefix) {
> >      > this.prefix = prefix;
> >      > }
> >      >
> >      > @Override
> >      > public void processElement(StreamRecord<Row> streamRecord) throws
> >      > Exception {
> >      > System.out.println(prefix + streamRecord.getValue());
> >      > output.collect(streamRecord);
> >      > }
> >      >
> >      > @Override
> >      > public void processWatermark(Watermark mark) throws Exception {
> >      > super.processWatermark(mark);
> >      > System.out.println(prefix + mark.toString());
> >      > }
> >      > }
> >      > }
> >      > ```
> >
>
>