You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/07 11:10:09 UTC

[GitHub] [iceberg] pan3793 opened a new issue #1885: Flink Iceberg Usage

pan3793 opened a new issue #1885:
URL: https://github.com/apache/iceberg/issues/1885


   We use Avro schema as unified ETL schema management solution. when I'm trying to write data into Iceberg using Flink, I found there are so many terms in Flink to represent data types, such as `TypeInformation`, `LogicalType`, `RowType`, `TableSchema`, `DataType `... I can't figure out the relationship between them and how to convert each other.
   
   Specifically,  my question is *How can I write `DataStream<GenericRecord>` to an Iceberg table using Flink Iceberg api?* And I think avro `Schema` should have enough information to desc the Record schema.
   
   Should I use below APIs? If yes, how can I adapt them from `DataStream<GenericRecord>`?
   ```
   public static <T> Builder builderFor(DataStream<T> input,
                                          MapFunction<T, RowData> mapper,
                                          TypeInformation<RowData> outputType)
   ```
   
   ```
   public static Builder forRow(DataStream<Row> input, TableSchema tableSchema)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 edited a comment on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-741473849


   @openinx Thanks for following up on this issue, I haven't test nested field, but found another issue about Logical Types. Our ETL build on CDH-6.3.1 with Avro 1.8.2, which not support generate Java 8 time because of AVRO-2079, and without `converter`, Flink can't handle joda time properly.
   
   > I think we may need a `converter` which just like the flink's `DataFormatConverters.RowConverter` to convert the avro `GenericRecord` to `RowData` ?
   
   We really need that `converter`. All of our ETL job input and output data structures are present by Avro `GenericRecord` because it has `avrc` to define schema in Json, and provide `avro-maven-plugin` to generate Entity automatically. 
   
   PS: We only use avro to manage schemas, but store in ORC. And we are trying to migrate storage format to Iceberg, if iceberg provide schema management tools like avro, may be we can also manage schema by Iceberg.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 closed issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 closed issue #1885:
URL: https://github.com/apache/iceberg/issues/1885


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-741442756


   I will try to provide an unit test to show how it work if I have time, Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 edited a comment on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-741473849


   @openinx Thanks for following up on this issue, I haven't test nested field, but found another issue about Logical Types. Our ETL build on CDH-6.3.1 with Avro 1.8.2, which not support generate Java 8 time because of AVRO-2079, and without `converter`, Flink can't handle joda time properly.
   
   > I think we may need a `converter` which just like the flink's `DataFormatConverters.RowConverter` to convert the avro `GenericRecord` to `RowData` ?
   
   We really need that `converter`. All of our ETL job input and output data structures are present by Avro `GenericRecord` because it has `avrc` to define schema in Json, and provide `avro-maven-plugin` to generate Entity automatically. 
   
   PS: We only use avro to manage schemas, but store in ORC. And we are trying to migrate storage format to Iceberg. If iceberg provide schema management tools like avro, may be we can also manage schema by Iceberg.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739955832


   @openinx Thanks for your response.
   
   > So you might need to provide a function to convert `GenericRecord` to RowData. The `TypeInformation<RowData>` specify your data type for inputstream .
   
   That's definitely what I'm trying to do. My question is, as I already have avro `GenericRecord` and `Schema`, how to write a  generic `MapFunction` to handle all `GenericRecord` properly, convert `GenericRecord` to `RowData` and convert `Schema` to `TypeInformation<RowData>`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 edited a comment on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739987048


   I think I found the solution:
   ```
   @Override
   protected void output(DataStream<GenericRecord> outputStream, org.apache.avro.Schema avroSchema) {
       DataStream<Row> rowDataStream = outputStream.map(genericRecord -> {
           int columnNum = genericRecord.getSchema().getFields().size();
           Object[] rowData = new Object[columnNum];
           for (int i = 0; i < columnNum; i++) {
               rowData[i] = genericRecord.get(i);
           }
           return Row.of(rowData);
       });
       org.apache.iceberg.shaded.org.apache.avro.Schema shadeAvroSchema =
               new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(avroSchema.toString());
       Schema icebergSchema = AvroSchemaUtil.toIceberg(shadeAvroSchema);
       RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
       TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType);
       FlinkSink.forRow(rowDataStream, tableSchema)
          .table(table)
          .tableLoader(tableLoader)
          .tableSchema(tableSchema)
          .writeParallelism(parallelism)
          .build();
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739987048


   I think I found the solution:
   ```
   @Override
   protected void output(DataStream<GenericRecord> outputStream) {
       org.apache.avro.Schema[] avroSchema = new org.apache.avro.Schema[1];
       DataStream<Row> rowDataStream = outputStream.map(genericRecord -> {
           avroSchema[0] = genericRecord.getSchema();
           int columnNum = genericRecord.getSchema().getFields().size();
           Object[] rowData = new Object[columnNum];
           for (int i = 0; i < columnNum; i++) {
               rowData[i] = genericRecord.get(i);
           }
           return Row.of(rowData);
       });
       org.apache.iceberg.shaded.org.apache.avro.Schema shadeAvroSchema =
               new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(avroSchema[0].toString());
       Schema icebergSchema = AvroSchemaUtil.toIceberg(shadeAvroSchema);
       RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
       TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType);
       FlinkSink.forRow(rowDataStream, tableSchema);
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 closed issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 closed issue #1885:
URL: https://github.com/apache/iceberg/issues/1885


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-741473849


   @openinx Thanks for following up this issue, I haven't test nested field, but found another issue about Logical Types. Our ETL build on CDH-6.3.1 with Avro 1.8.2, which not support generate Java 8 time because of AVRO-2079, and without `converter`, Flink can't handle joda time properly.
   
   > I think we may need a `converter` which just like the flink's `DataFormatConverters.RowConverter` to convert the avro `GenericRecord` to `RowData` ?
   
   We really need that `converter`. All of our ETL job input and output data structures are present by Avro `GenericRecord` because it has `avrc` to define schema in Json, and provide `avro-maven-plugin` to generate Entity automatically. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 edited a comment on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739987048


   I think I found the solution:
   ```
   @Override
   protected void output(DataStream<GenericRecord> outputStream, org.apache.avro.Schema avroSchema) {
       DataStream<Row> rowDataStream = outputStream.map(genericRecord -> {
           int columnNum = genericRecord.getSchema().getFields().size();
           Object[] rowData = new Object[columnNum];
           for (int i = 0; i < columnNum; i++) {
               rowData[i] = genericRecord.get(i);
           }
           return Row.of(rowData);
       });
       org.apache.iceberg.shaded.org.apache.avro.Schema shadeAvroSchema =
               new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(avroSchema.toString());
       Schema icebergSchema = AvroSchemaUtil.toIceberg(shadeAvroSchema);
       RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
       TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType);
       FlinkSink.forRow(rowDataStream, tableSchema);
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 edited a comment on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-741473849


   @openinx Thanks for following up on this issue, I haven't test nested field, but found another issue about Logical Types. Our ETL build on CDH-6.3.1 with Avro 1.8.2, which not support generate Java 8 time because of AVRO-2079, and without `converter`, Flink can't handle joda time properly.
   
   > I think we may need a `converter` which just like the flink's `DataFormatConverters.RowConverter` to convert the avro `GenericRecord` to `RowData` ?
   
   We really need that `converter`. All of our ETL job input and output data structures are present by Avro `GenericRecord` because it has `avrc` to define schema in Json, and provide `avro-maven-plugin` to generate Entity automatically. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 edited a comment on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739987048


   I think I found the solution:
   ```
   @Override
   protected void output(DataStream<GenericRecord> outputStream, org.apache.avro.Schema avroSchema) {
       DataStream<Row> rowDataStream = outputStream.map(genericRecord -> {
           int columnNum = genericRecord.getSchema().getFields().size();
           Object[] rowData = new Object[columnNum];
           for (int i = 0; i < columnNum; i++) {
               rowData[i] = genericRecord.get(i);
           }
           return Row.of(rowData);
       });
       org.apache.iceberg.shaded.org.apache.avro.Schema shadeAvroSchema =
               new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(avroSchema.toString());
       Schema icebergSchema = AvroSchemaUtil.toIceberg(shadeAvroSchema);
       RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
       TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType);
       FlinkSink.forRow(rowDataStream, tableSchema)
      .table(table)
      .tableLoader(tableLoader)
      .tableSchema(tableSchema)
      .writeParallelism(parallelism)
      .build();
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-741442050


   Will it work if the schema has a nested field ?  I think we may need a `converter` which just like the flink's `DataFormatConverters.RowConverter` to convert the avro `GenericRecord` to `RowData` ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739941275


   @pan3793 , the iceberg flink sink underlying will write  `RowData` into avro/parquet/orc files , no matter which data types you    are writing. So we provided the `MapFunction<T, RowData> mapper` to convert your own data type into `RowData`. So you might need to provide a function to convert `GenericRecord` to `RowData`.   The `TypeInformation<RowData>`  specify your data type for inputstream . 
   
   We have provided an unit test to write `row` stream to iceberg sink, you may want to take a look. https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java#L153 .     


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pan3793 commented on issue #1885: Flink Iceberg Usage

Posted by GitBox <gi...@apache.org>.
pan3793 commented on issue #1885:
URL: https://github.com/apache/iceberg/issues/1885#issuecomment-739855324


   @JingsongLi @openinx Could you please have a look on this question? Thx.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org