You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jiangxin369 (via GitHub)" <gi...@apache.org> on 2023/03/07 08:37:34 UTC

[GitHub] [flink] jiangxin369 commented on a diff in pull request #22034: [FLINK-31240][table] Reduce the overhead of conversion between DataStream and Table

jiangxin369 commented on code in PR #22034:
URL: https://github.com/apache/flink/pull/22034#discussion_r1127523251


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws Exception {
+                        Row row = new Row(2);
+                        row.setField(0, 1L);
+                        row.setField(1, "a");
+                        ctx.collect(row);
+                    }
+
+                    @Override
+                    public void cancel() {}
+                };
+
+        final RowTypeInfo typeInfo =
+                new RowTypeInfo(new TypeInformation[] {Types.LONG, Types.STRING});
+
+        // test datastream of rows with non-default name
+        DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);

Review Comment:
   `env.fromCollection(...)` generates row data with RowSerializer and it would construct the row with field name so that the rows of datastream can be accessed by non-default name anyway. But we want to test the case that the code change works even if the datastream is composed of positioned rows without field names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org