You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2021/12/06 16:42:07 UTC
Re: Table DataStream Conversion Lost Watermark
Hi Yunfeng,
it seems this is a deeper issue with the fromValues implementation.
Under the hood, it still uses the deprecated InputFormat stack. And as
far as I can see, there we don't emit a final MAX_WATERMARK. I will
definitely forward this.
But toDataStream forwards watermarks correctly.
I hope this helps. Or do you think we should also rediscuss the
fromDataStream watermark behavior?
Regards,
Timo
On 06.12.21 10:26, Yunfeng Zhou wrote:
> Hi Timo,
>
> Thanks for your response. I encountered another problem that might be
> relevant to the watermark as we discussed above.
>
> In the test cases shown below. I would create a table from some data,
> convert it to datastream and do windowAll().reduce() on it. If we need
> to explicitly specify a `rowtime` metadata column in order to make the
> table pass timestamps to the converted datastream, then both the test
> cases should print out empty lists. In fact, one of them could print out
> a list with some data. The only difference between them is that I
> changed the value of some input data. This behavior can be reproduced
> under Flink ML's latest java environment and configurations.
>
> Is this the expected behavior of `toDataStream`, or I have accidentally
> encountered a bug?
>
> Best regards,
> Yunfeng
>
> ```java
>
> public class SimpleTest {
> @Test
> public void testSimple1()throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> Table inputTable = tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> DataTypes.FIELD("f4", DataTypes.INT()),
> DataTypes.FIELD("label", DataTypes.STRING())
> ),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "b", 0., 1., 1., 3, "l1"),
> Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> Row.of(1., "a", 1., 1., 0., 1, "l0"),
> Row.of(2., "d", 1., 1., 0., 1, "l0")
> );
>
> DataStream<Row> input = tEnv.toDataStream(inputTable);
>
> System.out.println(IteratorUtils.toList(input
> .windowAll(EndOfStreamWindows.get())
> .reduce((ReduceFunction<Row>) (row, t1) -> row)
> .executeAndCollect()
> ));
> }
>
>
> @Test
> public void testSimple2()throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> Table inputTable = tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> DataTypes.FIELD("f4", DataTypes.INT()),
> DataTypes.FIELD("label", DataTypes.STRING())
> ),
> Row.of(1., "a", 1., 1., 1., 2, "l1"),
> Row.of(1., "a", 1., 0., 1., 2, "l1"),
> Row.of(1., "b", 0., 1., 1., 3, "l1"),
> Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
> Row.of(2., "d", 1., 1., 0., 1, "l0")
> );
>
> DataStream<Row> input = tEnv.toDataStream(inputTable);
>
> System.out.println(IteratorUtils.toList(input
> .windowAll(EndOfStreamWindows.get())
> .reduce((ReduceFunction<Row>) (row, t1) -> row)
> .executeAndCollect()
> ));
> }
> }
>
> ```
>
> ```java
>
> /**
> * A {@link WindowAssigner} that assigns all elements of a bounded input
> stream into one window
> * pane. The results are emitted once the input stream has ended.
> */
> public class EndOfStreamWindowsextends WindowAssigner<Object, TimeWindow> {
>
> private static final EndOfStreamWindowsINSTANCE =new EndOfStreamWindows();
>
> private EndOfStreamWindows() {}
>
> public static EndOfStreamWindowsget() {
> return INSTANCE;
> }
>
> @Override
> public Collection<TimeWindow>assignWindows(
> Object element, long timestamp, WindowAssignerContext context) {
> return Collections.singletonList(new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE));
> }
>
> @Override
> public Trigger<Object, TimeWindow>getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
> @Override
> public StringtoString() {
> return "EndOfStreamWindows()";
> }
>
> @Override
> public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig executionConfig) {
> return new TimeWindow.Serializer();
> }
>
> @Override
> public boolean isEventTime() {
> return true;
> }
> }
>
> ```
>
> On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twalthr@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Yunfeng,
>
> by default the fromDataStream does not propagate watermarks into Table
> API. Because Table API needs a time attribute in the schema that
> corresponds to the watermarking. A time attribute will also put back
> into the stream record during toDataStream.
>
> Please take a look at:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream>
>
> Esp. example 4 should solve your use case:
>
> // === EXAMPLE 4 ===
>
> // derive all physical columns automatically
> // but access the stream record's timestamp for creating a rowtime
> attribute column
> // also rely on the watermarks generated in the DataStream API
>
> // we assume that a watermark strategy has been defined for
> `dataStream`
> before
> // (not part of this example)
> Table table =
> tableEnv.fromDataStream(
> dataStream,
> Schema.newBuilder()
> .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
> .watermark("rowtime", "SOURCE_WATERMARK()")
> .build());
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 04.11.21 12:00, Yunfeng Zhou wrote:
> > Hi,
> >
> > I found that if I convert a Datastream into Table and back into
> > Datastream, watermark of the stream will be lost. As shown in the
> > program below, the TestOperator before the conversion will have its
> > processWatermark() method triggered and watermark value printed,
> but the
> > one after the conversion will not.
> >
> > Is my observation correct? If so, is it the expected behavior of the
> > conversion API? My current work needs me to convert a table into
> > datastream and to do window operation on it, but this problem
> blocks me
> > from creating a window.
> >
> > Regards,
> > Yunfeng
> >
> > ```java
> > public class SimpleTest {
> > public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.createLocalEnvironment();
> > env.setParallelism(1);
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > DataStream<Row> input = env.fromElements(Row.of(1));
> >
> > input = input.transform(
> > "TestOperator",
> > new RowTypeInfo(new
> > TypeInformation[]{TypeInformation.of(Integer.class)}, new
> String[]{"f0"}),
> > new TestOperator("0")
> > );
> >
> > input = tEnv.toDataStream(tEnv.fromDataStream(input));
> >
> > input = input.transform(
> > "TestOperator",
> > new RowTypeInfo(new
> > TypeInformation[]{TypeInformation.of(Integer.class)}, new
> String[]{"f0"}),
> > new TestOperator("1")
> > );
> >
> > System.out.println(IteratorUtils.toList(input.executeAndCollect()));
> > }
> >
> > private static class TestOperator extends AbstractStreamOperator<Row>
> > implements OneInputStreamOperator<Row, Row>{
> > private final String prefix;
> >
> > private TestOperator(String prefix) {
> > this.prefix = prefix;
> > }
> >
> > @Override
> > public void processElement(StreamRecord<Row> streamRecord) throws
> > Exception {
> > System.out.println(prefix + streamRecord.getValue());
> > output.collect(streamRecord);
> > }
> >
> > @Override
> > public void processWatermark(Watermark mark) throws Exception {
> > super.processWatermark(mark);
> > System.out.println(prefix + mark.toString());
> > }
> > }
> > }
> > ```
>
Re: Table DataStream Conversion Lost Watermark
Posted by Yunfeng Zhou <fl...@gmail.com>.
Hi Timo,
Thanks for this information. Since it is confirmed that toDataStream is
functioning correctly and that I can avoid this problem by not using
fromValues in my implementation, I think I have got enough information for
my current work and don't need to rediscuss fromDatastream's behavior.
Best regards,
Yunfeng
On Tue, Dec 7, 2021 at 12:42 AM Timo Walther <tw...@apache.org> wrote:
> Hi Yunfeng,
>
> it seems this is a deeper issue with the fromValues implementation.
> Under the hood, it still uses the deprecated InputFormat stack. And as
> far as I can see, there we don't emit a final MAX_WATERMARK. I will
> definitely forward this.
>
> But toDataStream forwards watermarks correctly.
>
> I hope this helps. Or do you think we should also rediscuss the
> fromDataStream watermark behavior?
>
> Regards,
> Timo
>
>
> On 06.12.21 10:26, Yunfeng Zhou wrote:
> > Hi Timo,
> >
> > Thanks for your response. I encountered another problem that might be
> > relevant to the watermark as we discussed above.
> >
> > In the test cases shown below. I would create a table from some data,
> > convert it to datastream and do windowAll().reduce() on it. If we need
> > to explicitly specify a `rowtime` metadata column in order to make the
> > table pass timestamps to the converted datastream, then both the test
> > cases should print out empty lists. In fact, one of them could print out
> > a list with some data. The only difference between them is that I
> > changed the value of some input data. This behavior can be reproduced
> > under Flink ML's latest java environment and configurations.
> >
> > Is this the expected behavior of `toDataStream`, or I have accidentally
> > encountered a bug?
> >
> > Best regards,
> > Yunfeng
> >
> > ```java
> >
> > public class SimpleTest {
> > @Test
> > public void testSimple1()throws Exception {
> > StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > Table inputTable = tEnv.fromValues(
> > DataTypes.ROW(
> > DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f0", DataTypes.STRING()),
> > DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f4", DataTypes.INT()),
> > DataTypes.FIELD("label", DataTypes.STRING())
> > ),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "b", 0., 1., 1., 3, "l1"),
> > Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> > Row.of(1., "a", 1., 1., 0., 1, "l0"),
> > Row.of(2., "d", 1., 1., 0., 1, "l0")
> > );
> >
> > DataStream<Row> input = tEnv.toDataStream(inputTable);
> >
> > System.out.println(IteratorUtils.toList(input
> > .windowAll(EndOfStreamWindows.get())
> > .reduce((ReduceFunction<Row>) (row, t1) -> row)
> > .executeAndCollect()
> > ));
> > }
> >
> >
> > @Test
> > public void testSimple2()throws Exception {
> > StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > Table inputTable = tEnv.fromValues(
> > DataTypes.ROW(
> > DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f0", DataTypes.STRING()),
> > DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f4", DataTypes.INT()),
> > DataTypes.FIELD("label", DataTypes.STRING())
> > ),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "a", 1., 0., 1., 2, "l1"),
> > Row.of(1., "b", 0., 1., 1., 3, "l1"),
> > Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> > Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
> > Row.of(2., "d", 1., 1., 0., 1, "l0")
> > );
> >
> > DataStream<Row> input = tEnv.toDataStream(inputTable);
> >
> > System.out.println(IteratorUtils.toList(input
> > .windowAll(EndOfStreamWindows.get())
> > .reduce((ReduceFunction<Row>) (row, t1) -> row)
> > .executeAndCollect()
> > ));
> > }
> > }
> >
> > ```
> >
> > ```java
> >
> > /**
> > * A {@link WindowAssigner} that assigns all elements of a bounded input
> > stream into one window
> > * pane. The results are emitted once the input stream has ended.
> > */
> > public class EndOfStreamWindowsextends WindowAssigner<Object,
> TimeWindow> {
> >
> > private static final EndOfStreamWindowsINSTANCE =new
> EndOfStreamWindows();
> >
> > private EndOfStreamWindows() {}
> >
> > public static EndOfStreamWindowsget() {
> > return INSTANCE;
> > }
> >
> > @Override
> > public Collection<TimeWindow>assignWindows(
> > Object element, long timestamp, WindowAssignerContext
> context) {
> > return Collections.singletonList(new TimeWindow(Long.MIN_VALUE,
> Long.MAX_VALUE));
> > }
> >
> > @Override
> > public Trigger<Object,
> TimeWindow>getDefaultTrigger(StreamExecutionEnvironment env) {
> > return EventTimeTrigger.create();
> > }
> >
> > @Override
> > public StringtoString() {
> > return "EndOfStreamWindows()";
> > }
> >
> > @Override
> > public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig
> executionConfig) {
> > return new TimeWindow.Serializer();
> > }
> >
> > @Override
> > public boolean isEventTime() {
> > return true;
> > }
> > }
> >
> > ```
> >
> > On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Hi Yunfeng,
> >
> > by default the fromDataStream does not propagate watermarks into
> Table
> > API. Because Table API needs a time attribute in the schema that
> > corresponds to the watermarking. A time attribute will also put back
> > into the stream record during toDataStream.
> >
> > Please take a look at:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
> >
> >
> > Esp. example 4 should solve your use case:
> >
> > // === EXAMPLE 4 ===
> >
> > // derive all physical columns automatically
> > // but access the stream record's timestamp for creating a rowtime
> > attribute column
> > // also rely on the watermarks generated in the DataStream API
> >
> > // we assume that a watermark strategy has been defined for
> > `dataStream`
> > before
> > // (not part of this example)
> > Table table =
> > tableEnv.fromDataStream(
> > dataStream,
> > Schema.newBuilder()
> > .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
> > .watermark("rowtime", "SOURCE_WATERMARK()")
> > .build());
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> > On 04.11.21 12:00, Yunfeng Zhou wrote:
> > > Hi,
> > >
> > > I found that if I convert a Datastream into Table and back into
> > > Datastream, watermark of the stream will be lost. As shown in the
> > > program below, the TestOperator before the conversion will have
> its
> > > processWatermark() method triggered and watermark value printed,
> > but the
> > > one after the conversion will not.
> > >
> > > Is my observation correct? If so, is it the expected behavior of
> the
> > > conversion API? My current work needs me to convert a table into
> > > datastream and to do window operation on it, but this problem
> > blocks me
> > > from creating a window.
> > >
> > > Regards,
> > > Yunfeng
> > >
> > > ```java
> > > public class SimpleTest {
> > > public static void main(String[] args) throws Exception {
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.createLocalEnvironment();
> > > env.setParallelism(1);
> > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> > >
> > > DataStream<Row> input = env.fromElements(Row.of(1));
> > >
> > > input = input.transform(
> > > "TestOperator",
> > > new RowTypeInfo(new
> > > TypeInformation[]{TypeInformation.of(Integer.class)}, new
> > String[]{"f0"}),
> > > new TestOperator("0")
> > > );
> > >
> > > input = tEnv.toDataStream(tEnv.fromDataStream(input));
> > >
> > > input = input.transform(
> > > "TestOperator",
> > > new RowTypeInfo(new
> > > TypeInformation[]{TypeInformation.of(Integer.class)}, new
> > String[]{"f0"}),
> > > new TestOperator("1")
> > > );
> > >
> > >
> System.out.println(IteratorUtils.toList(input.executeAndCollect()));
> > > }
> > >
> > > private static class TestOperator extends
> AbstractStreamOperator<Row>
> > > implements OneInputStreamOperator<Row, Row>{
> > > private final String prefix;
> > >
> > > private TestOperator(String prefix) {
> > > this.prefix = prefix;
> > > }
> > >
> > > @Override
> > > public void processElement(StreamRecord<Row> streamRecord) throws
> > > Exception {
> > > System.out.println(prefix + streamRecord.getValue());
> > > output.collect(streamRecord);
> > > }
> > >
> > > @Override
> > > public void processWatermark(Watermark mark) throws Exception {
> > > super.processWatermark(mark);
> > > System.out.println(prefix + mark.toString());
> > > }
> > > }
> > > }
> > > ```
> >
>
>