You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by lan tran <in...@gmail.com> on 2022/06/29 10:04:30 UTC

The methodlogy behind the join in Table API and Datastream

import org.apache.flink.connector.file.src.FileSourceSplit
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
import org.apache.flink.table.types.logical.RowType
import models.EnrichElementMapperFunction.MemberDescriptor
import models.{AccidentClaim, EnrichElement, EnrichElementMapperFunction, Members}
import org.apache.flink.api.common.serialization.{BulkWriter, Encoder, SimpleStringEncoder}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.file.sink.FileSink.RowFormatBuilder
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog
import org.apache.flink.core.fs
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.formats.parquet.{ParquetBuilder, ParquetBulkWriter, ParquetColumnarRowInputFormat, ParquetFileFormatFactory, ParquetWriterFactory}
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, CoProcessFunction}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder
import org.apache.flink.streaming.api.functions.sink.filesystem.{OutputFileConfig, StreamingFileSink}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.{Row, RowKind}
import org.apache.flink.util.Collector
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
import org.apache.flink.table.types.logical.RowType
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetWriter}
import org.apache.parquet.io.OutputFile
import org.apache.flink.formats.parquet.ParquetBulkWriter


object WindowWordCount2 {

  def register_table_source(st_env:StreamTableEnvironment) {

    Class.forName("org.postgresql.Driver")

    val name = "postgres"
    val default_database = "postgres"
    val username = "postgres"
    val password = "postgres"
    val base_url = "jdbc:postgresql://postgres:5432/"

    val catalog = new JdbcCatalog(name, default_database, username, password, base_url)

    st_env.registerCatalog("postgresql", catalog)

  }

  def toInt(s: String): Option[Int] = {
    try {
      Some(s.toInt)
    } catch {
      case e: Exception => None
    }
  }

  def main(args: Array[String]) {

    implicit val typeInfo = TypeInformation.of(classOf[(String)])

    val env = StreamExecutionEnvironment.getExecutionEnvironment

//    final BulkWriter.Factory<Tuple2<String, Integer>> writer,

    env.setStateBackend(new HashMapStateBackend)

    env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("s3://quynh-demo-flink/check_points/"))

    env.enableCheckpointing(1000)

    env.setParallelism(1)

    val st_env = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance()
      .inStreamingMode()
      .build())

    val configuration = st_env.getConfig()

    configuration.setNullCheck(false)

    println("Creating catalog")
    register_table_source(st_env)

    st_env.executeSql("""
                     CREATE TABLE accident_claims
                        WITH (
                        'connector'='kafka',
                        'topic'='pg_claims.claims.accident_claims',
                        'properties.bootstrap.servers'='kafka:9092',
                        'properties.group.id'='test-consumer-group',
                        'format'='debezium-json',
                        'scan.startup.mode'='earliest-offset'
                        )
                        LIKE postgresql.postgres.`claims.accident_claims` (
                        EXCLUDING OPTIONS
    )"""
    )

    st_env.executeSql("""
                     CREATE TABLE members
                        WITH (
                        'connector'='kafka',
                        'topic'='pg_claims.claims.members',
                        'properties.bootstrap.servers'='kafka:9092',
                        'properties.group.id'='test-consumer-group',
                        'format'='debezium-json',
                        'scan.startup.mode'='earliest-offset'
                        )
                        LIKE postgresql.postgres.`claims.members` (
                        EXCLUDING OPTIONS
    )"""
    )

    //    val resultTable = st_env.sqlQuery("SELECT t1.member_id, t1.claim_total_receipt, t2.id, t2.first_name, t2.last_name  FROM (SELECT member_id, claim_total_receipt FROM accident_claims) t1 LEFT JOIN (SELECT * FROM members) t2 ON t1.member_id = t2.id")
    //    val resultTable = st_env.sqlQuery("SELECT * FROM accident_claims")
    val st_accident_claims = st_env.from("accident_claims")

    val st_members = st_env.from("members")

    // interpret the insert-only Table as a DataStream again
    val ds_accident_claims = st_env.toChangelogStream(st_accident_claims)

    val ds_members = st_env.toChangelogStream(st_members)

    val dsConvertFromChangelogMembers:DataStream[(RowKind,Members)] = ds_members.map(x => (x.getKind, new Members(x.getField("id").asInstanceOf[Int], Some(x.getField("first_name").toString), Some(x.getField("last_name").toString), Some(x.getField("address").toString), Some(x.getField("address_city").toString), Some(x.getField("address_country").toString), Some(x.getField("insurance_company").toString), Some(x.getField("insurance_number").toString), Some(x.getField("ts_created").toString), Some(x.getField("ts_updated").toString)))).uid("memberdatastream").filter(_._1 != RowKind.UPDATE_BEFORE)

    val ds_member_broadcast = dsConvertFromChangelogMembers.broadcast(EnrichElementMapperFunction.MemberDescriptor)

    val dsConvertFromChangelogAccidentClaim:DataStream[(RowKind, AccidentClaim)] = ds_accident_claims.map(x => (x.getKind, new AccidentClaim(x.getField("claim_id").asInstanceOf[Int], Some(x.getField("claim_total").asInstanceOf[Double]), Some(x.getField("claim_total_receipt").asInstanceOf[String]), Some(x.getField("claim_currency").toString), Some(x.getField("member_id").asInstanceOf[Int]), Some(x.getField("accident_date").toString), Some(x.getField("accident_type").toString), Some(x.getField("accident_detail").toString), Some(x.getField("claim_date").toString), Some(x.getField("claim_status").toString)))).uid("accidentcliamdatastream").filter(_._1 != RowKind.UPDATE_BEFORE)

//    val resultStream = dsConvertFromChangelogAccidentClaim.connect(ds_member_broadcast)
//      .process(new EnrichEventLocationMapperFunction).uid("connectandprocess")
    //
//    //    ds_members.print()
//    dsConvertFromChangelogAccidentClaim.print()

    val resultStream = dsConvertFromChangelogAccidentClaim.connect(ds_member_broadcast)
      .process(new EnrichEventLocationMapperFunction).uid("connectandprocess")

    //    resultStream.print()

//    val resultStreamEncode = resultStream.map((rowKind, enrichElement) => Types.Row(rowKind, ))

//    val factory = new ParquetWriterFactory[(RowKind, EnrichElement)](new ParquetBuilder[(RowKind, EnrichElement)] {
//      override def createWriter(out: OutputFile): ParquetWriter[(RowKind, EnrichElement)] = {
//        val parquetWrite = new ParquetWriter[(RowKind, EnrichElement)]()
//      }
//    })

//    val buider:ParquetBuilder[(RowKind, EnrichElement)]
//    val builder:ParquetWriterFactory[(RowKind, EnrichElement)] =

//    val builder<BulkWriter.Factory>() = ParquetWriterFactory(ParquetBuilder.cre)

//    val rowType = RowType.of((RowKind, EnrichElement))


//    val format = new ParquetColumnarRowInputFormat[SplitT](new Nothing, RowType.of(fieldTypes, Array[String]("f7", "f4", "f99")), 500, false, true)

    val outputPath = new fs.Path("s3://quynh-demo-flink/quynh_test_5/")
    val fileSink:StreamingFileSink[(RowKind,EnrichElement)] = StreamingFileSink
         .forRowFormat(outputPath, new SimpleStringEncoder[(RowKind,EnrichElement)]("UTF-8"))
         .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("pre").withPartSuffix(".csv").build())
         .build()

    resultStream.addSink(fileSink).uid("addsink")

    env.execute("Window Stream WordCount")

  }
}

case class EnrichEventLocationMapperFunction()
  extends BroadcastProcessFunction[(RowKind, AccidentClaim), (RowKind, Members), (RowKind, EnrichElement)]() {
  override def processElement(value: (RowKind, AccidentClaim), ctx: BroadcastProcessFunction[(RowKind,AccidentClaim), (RowKind,Members), (RowKind, EnrichElement)]#ReadOnlyContext, out: Collector[(RowKind,EnrichElement)]): Unit = {
    val (rowKind, accidentClaim) = value
    print("Get value from ssh")
    print(value)
    val memberState = ctx.getBroadcastState(MemberDescriptor)
    val memberPk = accidentClaim.memberId.get
    val member:Members = memberState.get(memberPk)
    val enrichElemenet = EnrichElement(accidentClaim, member)
    out.collect(rowKind,enrichElemenet)
  }

  override def processBroadcastElement(value: (RowKind, Members), ctx: BroadcastProcessFunction[(RowKind,AccidentClaim), (RowKind,Members), (RowKind,EnrichElement)]#Context, collector: Collector[(RowKind,EnrichElement)]): Unit = {
    val memberState = ctx.getBroadcastState(MemberDescriptor)
    value match {
      case (RowKind.DELETE, member) =>
        memberState.remove(member.id)
      case (_, member) =>
        memberState.put(member.id, member)
    }
    val (rowKind, member) = value

    val enrichElemenet = EnrichElement(accidentClaim, member)
    collector.collect(rowKind, EnrichElement())

  }
}












Re: The methodlogy behind the join in Table API and Datastream

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
> any way I can both receive the message of both update. 
I think you may need outer join[1] 

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#outer-equi-join 

Best regards, 
Yuxia 


发件人: "lan tran" <in...@gmail.com> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期三, 2022年 6 月 29日 下午 6:04:30 
主题: The methodlogy behind the join in Table API and Datastream 



Hi team, 

I have the question about the methodology behind the joining using SQL-Client and DataStream. 

I have some scenario like this: I have two tables: t1 and t2 and I consume the WAL log from it and send to Kafka. Next, I will join two tables above together and convert this table in changelog stream. Therefore, if one of the tables is updated, there will be the messages. 

This is how it works if I use the SQL-Client to join two tables together. However, according to the doc since DataStream runs behind the background of Table API, I wonder what it will be looked like if I use DataStream instead of Table API. 

In Datastream API, I currently using connect to join two stream. And convert t2 into broadcast Stream and t1 as the main stream. When I update the t1 -> there is the output of the updated record but when I update t2, there is no update for the broadcast state (even though it update in state). Therefore, is there any way I can both receive the message of both update ? Do I have to save state for the t1 (main stream) or I have to change the way I joined ? 

Best, 
Quynh 





Sent from [ https://go.microsoft.com/fwlink/?LinkId=550986 | Mail ] for Windows