You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "humengyu (Jira)" <ji...@apache.org> on 2020/07/08 13:20:00 UTC
[jira] [Updated] (FLINK-18530) ParquetAvroWriters can not write
data to hdfs
[ https://issues.apache.org/jira/browse/FLINK-18530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
humengyu updated FLINK-18530:
-----------------------------
Description:
I read data from kafka and write to hdfs by StreamingFileSink:
# in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
# AvroWriters works well in 1.11.0.
{code:java}
{code}
was:
I read data from kafka and write to hdfs by StreamingFileSink:
# in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
# AvroWriters works well in 1.11.0.
{code:java}
public class TestParquetAvroSink { @Test
public void testParquet() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.enableCheckpointing(20000L); TableSchema tableSchema = TableSchema.builder().fields(
new String[]{"id", "name", "sex"},
new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()})
.build(); // build a kafka source
DataStream<Row> rowDataStream = xxxx; Schema schema = SchemaBuilder
.record("xxx")
.namespace("xxxx")
.fields()
.optionalString("id")
.optionalString("name")
.optionalString("sex")
.endRecord(); OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build(); StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(
new Path("hdfs://host:port/xxx/xxx/xxx"),
ParquetAvroWriters.forGenericRecord(schema))
.withOutputFileConfig(config)
.withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
.build(); SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
.map(new RecordMapFunction()); recordDateStream.print();
recordDateStream.addSink(sink); env.execute("test"); }
@Test
public void testAvro() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.enableCheckpointing(20000L); TableSchema tableSchema = TableSchema.builder().fields(
new String[]{"id", "name", "sex"},
new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()})
.build(); // build a kafka source
DataStream<Row> rowDataStream = xxxx; Schema schema = SchemaBuilder
.record("xxx")
.namespace("xxxx")
.fields()
.optionalString("id")
.optionalString("name")
.optionalString("sex")
.endRecord(); OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build(); StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(
new Path("hdfs://host:port/xxx/xxx/xxx"),
AvroWriters.forGenericRecord(schema))
.withOutputFileConfig(config)
.withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
.build(); SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
.map(new RecordMapFunction()); recordDateStream.print();
recordDateStream.addSink(sink); env.execute("test"); } public static class RecordMapFunction implements MapFunction<Row, GenericRecord> { private transient Schema schema; @Override
public GenericRecord map(Row row) throws Exception {
if (schema == null) {
schema = SchemaBuilder
.record("xxx")
.namespace("xxx")
.fields()
.optionalString("id")
.optionalString("name")
.optionalString("sex")
.endRecord();
}
Record record = new Record(schema);
record.put("id", row.getField(0));
record.put("name", row.getField(1));
record.put("sex", row.getField(2));
return record;
}
}
}
{code}
> ParquetAvroWriters can not write data to hdfs
> ---------------------------------------------
>
> Key: FLINK-18530
> URL: https://issues.apache.org/jira/browse/FLINK-18530
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.11.0
> Reporter: humengyu
> Priority: Major
>
> I read data from kafka and write to hdfs by StreamingFileSink:
> # in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
> # AvroWriters works well in 1.11.0.
> {code:java}
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)