You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Aniruddh Sharma <as...@gmail.com> on 2020/04/14 01:49:03 UTC

read metadata for a generic pipeline and use metadata in transformations

Hi

I am writing a Generic pipeline which will be executed for 1000's of files and tables. 

many of transformations are based on complex rules. So I took out the complex rules outside dataflow job and calculated it outside DF job and written complex SQL's in a BQ table.

Now i want to read that BQ table, extract metdata from its columns and want to use it in Beam pipeline.

// 1. Read Metadata
        PCollection<TableRow> metaData =
                p.apply(format("Read: %s", metadataTable), BigQueryIO.readTableRowsWithSchema()
                        .from(metadataTable)
                        .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
                        .withSelectedFields(Lists.newArrayList("stdBQTable","dlpSelectQuery", "dlpJoinQuery","dlpHeaderMapStr","deidTemplateURI")));

        // 2. Extract Variables
        String select_metadata_qry = format("select * from PCOLLECTION where databaseName = '%s' and tableName = '%s' limit 1", dataBaseName, tableName);

        PCollectionView<Map<String,String>> metaDataRow =
                metaData.apply("Metadata View", SqlTransform.query(select_metadata_qry))
                                               .apply(ParDo.of(new GetFieldfromBQRow()))
                                               .apply(View.<String,String>asMap());

Now I want to use these sql queries in SQL transforms and I want to use what I read from BQ. 
PCollection<Row> piiData = allData.apply("Select PII", SqlTransform.query(<READFROMBQTABLE>));

I am new to Beam, these kind of operations are very easy to do in Spark.
Can you please guide how I can read value of sql query from a BQ row and use that in my pipeline downstream operations ?