You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by asha jyothi <as...@gmail.com> on 2021/02/04 18:35:28 UTC

IcebergTableSink to write data into multiple iceberg tables dynamically

Hi Team,

I have a flink job that read tranasctions data from a single source and creates iceberg tables. I need to create multiple datasets for each account in the list of transactions. The IcebergTableSink accepts the TableLoader on the constructor, but my table name is based on the account Id on each transaction and the account Ids is not a predefined list. This means I have to decide which table to load data into while writing the code, but I want it to be dynamic based on the account Id. Is there a better way to handle it? Here's the job I have to create one iceberg table from the job. Appreciate any help here.

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStream<Transaction> transactions = env
		.addSource(new TransactionSource())
		.name("transactions");

	DataStream<RowData> rows = transactions
		.keyBy(Transaction::getAccountId)
		.process(new FraudDetector())
		.name("fraud-detector");
            TableSchema ts = TableSchema.builder()
 	                         .field("AccountId", DataTypes.BIGINT())
 	                         .field("Timestamp", DataTypes.BIGINT())
 	                         .field("Amount", DataTypes.DOUBLE())
 	                         .build();
           // TODO: need to write to multiple tables here based on Transaction::getAccountId
           String tablelocation = "./data/flinklocal/transactions5";
           TableLoader tl = TableLoader.fromHadoopTable(tablelocation, hadoopConf);
           IcebergTableSink sink = new IcebergTableSink(false, tl, ts);
           sink.consumeDataStream(rows);
           env.execute("Multiple transactional datasets”);
}
Thanks,
Asha Desu