You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "humengyu (Jira)" <ji...@apache.org> on 2020/07/08 13:20:00 UTC

[jira] [Updated] (FLINK-18530) ParquetAvroWriters can not write data to hdfs

     [ https://issues.apache.org/jira/browse/FLINK-18530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

humengyu updated FLINK-18530:
-----------------------------
    Description: 
I read data from kafka and write to hdfs by StreamingFileSink:
 # in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
 #  AvroWriters works well in 1.11.0.

{code:java}
 {code}
 

  was:
I read data from kafka and write to hdfs by StreamingFileSink:
 # in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
 #  AvroWriters works well in 1.11.0.

{code:java}
public class TestParquetAvroSink {  @Test
  public void testParquet() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()
        .inStreamingMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    env.enableCheckpointing(20000L);    TableSchema tableSchema = TableSchema.builder().fields(
        new String[]{"id", "name", "sex"},
        new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()})
        .build();    // build a kafka source
    DataStream<Row> rowDataStream = xxxx;    Schema schema = SchemaBuilder
        .record("xxx")
        .namespace("xxxx")
        .fields()
        .optionalString("id")
        .optionalString("name")
        .optionalString("sex")
        .endRecord();    OutputFileConfig config = OutputFileConfig
        .builder()
        .withPartPrefix("prefix")
        .withPartSuffix(".ext")
        .build();    StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(
            new Path("hdfs://host:port/xxx/xxx/xxx"),
            ParquetAvroWriters.forGenericRecord(schema))
        .withOutputFileConfig(config)
        .withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
        .build();    SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
        .map(new RecordMapFunction());    recordDateStream.print();
    recordDateStream.addSink(sink);    env.execute("test");  }
  @Test
  public void testAvro() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()
        .inStreamingMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    env.enableCheckpointing(20000L);    TableSchema tableSchema = TableSchema.builder().fields(
        new String[]{"id", "name", "sex"},
        new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()})
        .build();    // build a kafka source
    DataStream<Row> rowDataStream = xxxx;    Schema schema = SchemaBuilder
        .record("xxx")
        .namespace("xxxx")
        .fields()
        .optionalString("id")
        .optionalString("name")
        .optionalString("sex")
        .endRecord();    OutputFileConfig config = OutputFileConfig
        .builder()
        .withPartPrefix("prefix")
        .withPartSuffix(".ext")
        .build();    StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(
            new Path("hdfs://host:port/xxx/xxx/xxx"),
            AvroWriters.forGenericRecord(schema))
        .withOutputFileConfig(config)
        .withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
        .build();    SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
        .map(new RecordMapFunction());    recordDateStream.print();
    recordDateStream.addSink(sink);    env.execute("test");  }  public static class RecordMapFunction implements MapFunction<Row, GenericRecord> {    private transient Schema schema;    @Override
    public GenericRecord map(Row row) throws Exception {
      if (schema == null) {
        schema = SchemaBuilder
            .record("xxx")
            .namespace("xxx")
            .fields()
            .optionalString("id")
            .optionalString("name")
            .optionalString("sex")
            .endRecord();
      }
      Record record = new Record(schema);
      record.put("id", row.getField(0));
      record.put("name", row.getField(1));
      record.put("sex", row.getField(2));
      return record;
    }
  }
}
{code}


> ParquetAvroWriters can not write data to hdfs
> ---------------------------------------------
>
>                 Key: FLINK-18530
>                 URL: https://issues.apache.org/jira/browse/FLINK-18530
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.0
>            Reporter: humengyu
>            Priority: Major
>
> I read data from kafka and write to hdfs by StreamingFileSink:
>  # in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
>  #  AvroWriters works well in 1.11.0.
> {code:java}
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)