You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by liuyehan <ly...@163.com> on 2022/09/07 23:46:59 UTC
In streaming scenarios, only one piece of data is written to a file
HI,everyone:
Now, when I use Iceberg native Java API to write data to Iceberg table, only one piece of data can be written into a file,The next piece of data doesn't fit into the file. Therefore, I need a streaming writing scene to write one piece of data into a file,I also did not find the corresponding test class in the source code,Thank you.
The following code:
Configuration conf = new Configuration();
String warehousePath = "hdfs://192.168.88.110:8020/warehouse_path_iceberg";
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
// catalog.createNamespace(Namespace.of("seatunnel"));
catalog.setConf(conf);
Schema schema = new Schema(
Types.NestedField.required(1, "level", Types.StringType.get()),
Types.NestedField.required(2, "message", Types.IntegerType.get())
);
/* PartitionSpec spec = PartitionSpec.builderFor(schema)
.hour("event_time")
.build();*/
TableIdentifier name = TableIdentifier.of("seatunnel5", "firsttable5");
Table table = catalog.loadTable(name);
// Transaction t = table.newTransaction();
// commit operations to the transaction
// t.newDelete().deleteFromRowFilter(filter).commit();
String location = "hdfs://192.168.88.110:8020/warehouse_path_iceberg/lyhoutput";
String filename = "9_8_lyh.orc";
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
FileAppenderFactory<Record> appenderFactory =
new GenericAppenderFactory(schema);
ArrayList<GenericRecord> genericRecords = new ArrayList<>();
GenericRecord genericRecord = GenericRecord.create(schema);
GenericRecord genericRecord2 = GenericRecord.create(schema);
genericRecord.set(0, "ddddff");
genericRecord.set(1, 5555);
genericRecord2.set(0, "hhhhh");
genericRecord2.set(1, 4444);
genericRecords.add(genericRecord2);
genericRecords.add(genericRecord);
FileAppender<Record> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat);
for (GenericRecord record : genericRecords) {
try (FileAppender<Record> closeableAppender = appender) {
closeableAppender.add(record);
// closeableAppender.addAll((Iterable<Record>) genericRecord);
}
}
DataFile data = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(HadoopInputFile.fromPath(path, conf))
.withMetrics(appender.metrics())
.build();
table.newAppend().appendFile(data).commit();
Best,
YeHAN Liu (2013650523)
| |
liuyehan
|
|
lyh1067341434@163.com
|
Re: In streaming scenarios, only one piece of data is written to a file
Posted by Ryan Blue <bl...@tabular.io>.
Hi YeHAN, I'm not sure that I understand your question. Maybe you could
join the Slack channel and ask there so we can get some quick follow-up
questions answered?
On Wed, Sep 7, 2022 at 4:47 PM liuyehan <ly...@163.com> wrote:
> HI,everyone:
> Now, when I use Iceberg native Java API to write data to Iceberg table,
> only one piece of data can be written into a file,The next piece of data
> doesn't fit into the file. Therefore, I need a streaming writing scene to
> write one piece of data into a file,I also did not find the corresponding
> test class in the source code,Thank you.
> The following code:
>
> Configuration conf = new Configuration();
> String warehousePath = "hdfs://192.168.88.110:8020/warehouse_path_iceberg";
> HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
> // catalog.createNamespace(Namespace.of("seatunnel"));
> catalog.setConf(conf);
> Schema schema = new Schema(
> Types.NestedField.required(1, "level", Types.StringType.get()),
> Types.NestedField.required(2, "message", Types.IntegerType.get())
> );
>
>
> /* PartitionSpec spec = PartitionSpec.builderFor(schema)
> .hour("event_time")
> .build();*/
>
> TableIdentifier name = TableIdentifier.of("seatunnel5", "firsttable5");
> Table table = catalog.loadTable(name);
> // Transaction t = table.newTransaction();
>
>
> // commit operations to the transaction
> // t.newDelete().deleteFromRowFilter(filter).commit();
> String location = "hdfs://192.168.88.110:8020/warehouse_path_iceberg/lyhoutput";
> String filename = "9_8_lyh.orc";
> Path path = new Path(location, filename);
> FileFormat fileFormat = FileFormat.fromFileName(filename);
> Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
>
>
> FileAppenderFactory<Record> appenderFactory =
> new GenericAppenderFactory(schema);
> ArrayList<GenericRecord> genericRecords = new ArrayList<>();
> GenericRecord genericRecord = GenericRecord.create(schema);
> GenericRecord genericRecord2 = GenericRecord.create(schema);
> genericRecord.set(0, "ddddff");
> genericRecord.set(1, 5555);
> genericRecord2.set(0, "hhhhh");
> genericRecord2.set(1, 4444);
> genericRecords.add(genericRecord2);
> genericRecords.add(genericRecord);
> FileAppender<Record> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat);
> for (GenericRecord record : genericRecords) {
> try (FileAppender<Record> closeableAppender = appender) {
> closeableAppender.add(record);
> // closeableAppender.addAll((Iterable<Record>) genericRecord);
> }
> }
>
>
> DataFile data = DataFiles.builder(PartitionSpec.unpartitioned())
> .withInputFile(HadoopInputFile.fromPath(path, conf))
> .withMetrics(appender.metrics())
> .build();
> table.newAppend().appendFile(data).commit();
>
> Best,
> YeHAN Liu (2013650523)
> liuyehan
> lyh1067341434@163.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liuyehan&uid=lyh1067341434%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma957d454002c351ecf6ffb4bd3c71fdc.jpg&items=%5B%22lyh1067341434%40163.com%22%5D>
>
--
Ryan Blue
Tabular