You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Tipper <jo...@hotmail.com> on 2022/04/10 22:10:45 UTC

How to process events with different JSON schemas in single Kinesis stream using PyFlink?

TLDR; I want to know how best to process a stream of events using PyFlink, where the events in the stream have a number of different schemas.

Details:

I want to process a stream of events coming from a Kinesis data stream which originate from an AWS EventBridge bus. The events in this stream are all JSON, but have different schema. Specifically, they all have a common set of properties (e.g. version, id, time, source, detail-type), but there is a "detail" section that is a different shape, depending on what the event type is.

For instance, an event on the bus from EC2 might look something like this:


{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
    "instance-id": " i-1234567890abcdef0",
    "state": "terminated"
  }
}

whereas the "detail" section for an event from Codebuild might look like this (and the top level "source" field would be "aws.codebuild"):


   "detail":{
    "build-status": "SUCCEEDED",
    "project-name": "my-sample-project",
    "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
    "additional-information": {
      "artifact": {
        "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
        "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
        "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
      }
     }
   }

I have the following constraints that I really want to abide by:


  1.  I cannot create a Kinesis data stream for each event type, i.e. the stream of events is multiplexed.
  2.  I'd like to create application code in Python using PyFlink (I can code happily in Java but my colleagues who need to contribute cannot).

I want to ingest the stream of events, key by the type of event (key by "detail-type", or "source") and then process the events according to their type.

I thought my options might be:


  1.  Use SQL/Table API (my preferred option), but it looks like JSON queries are not scheduled to be released until Flink 1.15, and until then I cannot define the schema of the input table where a field is a generic Map (i.e. Map<>). It appears I have to define the schema of the input table exactly, and I don't see how I can create a table which covers a property that varies in shape, e.g. for EC2 example above the schema for "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a more deeply nested JSON structure. If there were a "JSON" type then this would appear to be the way to go.
  2.
  3.  Use the Datastream API, but it looks like there is not a PyFlink Kinesis connector for the DataStream API. There is a Java connector - what's involved in creating a custom PyFlink Datastream connector?

Are there any other options I've missed? What's the best way to approach this?

Many thanks,

John


Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink?

Posted by John Tipper <jo...@hotmail.com>.
Hi Dian,

Thank you very much, that worked very nicely.

Kind regards,

John
________________________________
From: Dian Fu <di...@gmail.com>
Sent: 11 April 2022 06:32
To: John Tipper <jo...@hotmail.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink?

Hi John,

1) Regarding to Table API, you could declare the column `detail` as STRING and then parse it into a json in the Python use-defined function as following:

```

@udf(result_type=DataTypes.STRING())
def get_id(detail):
    detail_json = json.loads(detail)
    if 'build-id' in detail_json:
        return detail_json['build-id']
    else:
        return detail_json['instance-id']

```

2) Regarding the DataStream API, Kinesis is still not supported, however it should be very easy as it is simply a wrapping of the Java Kinesis connector. If you want to use DataStream API, you could wrap it yourself for now and could refer to how the other connectors are handled [1] for more details.

Regards,
Dian

[1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235

On Mon, Apr 11, 2022 at 6:17 AM John Tipper <jo...@hotmail.com>> wrote:
TLDR; I want to know how best to process a stream of events using PyFlink, where the events in the stream have a number of different schemas.

Details:

I want to process a stream of events coming from a Kinesis data stream which originate from an AWS EventBridge bus. The events in this stream are all JSON, but have different schema. Specifically, they all have a common set of properties (e.g. version, id, time, source, detail-type), but there is a "detail" section that is a different shape, depending on what the event type is.

For instance, an event on the bus from EC2 might look something like this:


{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
    "instance-id": " i-1234567890abcdef0",
    "state": "terminated"
  }
}

whereas the "detail" section for an event from Codebuild might look like this (and the top level "source" field would be "aws.codebuild"):


   "detail":{
    "build-status": "SUCCEEDED",
    "project-name": "my-sample-project",
    "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
    "additional-information": {
      "artifact": {
        "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
        "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
        "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
      }
     }
   }

I have the following constraints that I really want to abide by:


  1.  I cannot create a Kinesis data stream for each event type, i.e. the stream of events is multiplexed.
  2.  I'd like to create application code in Python using PyFlink (I can code happily in Java but my colleagues who need to contribute cannot).

I want to ingest the stream of events, key by the type of event (key by "detail-type", or "source") and then process the events according to their type.

I thought my options might be:


  1.  Use SQL/Table API (my preferred option), but it looks like JSON queries are not scheduled to be released until Flink 1.15, and until then I cannot define the schema of the input table where a field is a generic Map (i.e. Map<>). It appears I have to define the schema of the input table exactly, and I don't see how I can create a table which covers a property that varies in shape, e.g. for EC2 example above the schema for "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a more deeply nested JSON structure. If there were a "JSON" type then this would appear to be the way to go.
  2.
  3.  Use the Datastream API, but it looks like there is not a PyFlink Kinesis connector for the DataStream API. There is a Java connector - what's involved in creating a custom PyFlink Datastream connector?

Are there any other options I've missed? What's the best way to approach this?

Many thanks,

John


Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink?

Posted by Dian Fu <di...@gmail.com>.
Hi John,

1) Regarding to Table API, you could declare the column `detail` as STRING
and then parse it into a json in the Python use-defined function as
following:

```

@udf(result_type=DataTypes.STRING())
def get_id(detail):
    detail_json = json.loads(detail)
    if 'build-id' in detail_json:
        return detail_json['build-id']
    else:
        return detail_json['instance-id']

```

2) Regarding the DataStream API, Kinesis is still not supported, however it
should be very easy as it is simply a wrapping of the Java Kinesis
connector. If you want to use DataStream API, you could wrap it yourself
for now and could refer to how the other connectors are handled [1] for
more details.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235

On Mon, Apr 11, 2022 at 6:17 AM John Tipper <jo...@hotmail.com> wrote:

> TLDR; I want to know how best to process a stream of events using PyFlink,
> where the events in the stream have a number of different schemas.
>
> Details:
>
> I want to process a stream of events coming from a Kinesis data stream
> which originate from an AWS EventBridge bus. The events in this stream are
> all JSON, but have different schema. Specifically, they all have a common
> set of properties (e.g. version, id, time, source, detail-type), but there
> is a "detail" section that is a different shape, depending on what the
> event type is.
>
> For instance, an event on the bus from EC2 might look something like this:
>
> {
>   "version": "0",
>   "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
>   "detail-type": "EC2 Instance State-change Notification",
>   "source": "aws.ec2",
>   "account": "111122223333",
>   "time": "2017-12-22T18:43:48Z",
>   "region": "us-west-1",
>   "detail": {
>     "instance-id": " i-1234567890abcdef0",
>     "state": "terminated"
>   }
> }
>
>
> whereas the "detail" section for an event from Codebuild might look like
> this (and the top level "source" field would be "aws.codebuild"):
>
>    "detail":{
>     "build-status": "SUCCEEDED",
>     "project-name": "my-sample-project",
>     "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
>     "additional-information": {
>       "artifact": {
>         "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
>         "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
>         "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
>       }
>      }
>    }
>
>
> I have the following constraints that I really want to abide by:
>
>
>    1. I cannot create a Kinesis data stream for each event type, i.e. the
>    stream of events is multiplexed.
>    2. I'd like to create application code in Python using PyFlink (I can
>    code happily in Java but my colleagues who need to contribute cannot).
>
>
> I want to ingest the stream of events, key by the type of event (key by
> "detail-type", or "source") and then process the events according to their
> type.
>
> I thought my options might be:
>
>
>    1. Use SQL/Table API (my preferred option), but it looks like JSON
>    queries are not scheduled to be released until Flink 1.15, and until then I
>    cannot define the schema of the input table where a field is a generic Map
>    (i.e. Map<>). It appears I have to define the schema of the input table
>    exactly, and I don't see how I can create a table which covers a property
>    that varies in shape, e.g. for EC2 example above the schema for
>    "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a
>    more deeply nested JSON structure. If there were a "JSON" type then this
>    would appear to be the way to go.
>    2.
>    3. Use the Datastream API, but it looks like there is not a PyFlink
>    Kinesis connector for the DataStream API. There is a Java connector -
>    what's involved in creating a custom PyFlink Datastream connector?
>
>
> Are there any other options I've missed? What's the best way to approach
> this?
>
> Many thanks,
>
> John
>
>