You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Varun (Jira)" <ji...@apache.org> on 2020/10/25 21:19:00 UTC

[jira] [Comment Edited] (BEAM-9873) Removing Invalid JSON messages from PCollection before starting BigQueryIO Operations

    [ https://issues.apache.org/jira/browse/BEAM-9873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220396#comment-17220396 ] 

Varun edited comment on BEAM-9873 at 10/25/20, 9:18 PM:
--------------------------------------------------------

{code:java}

// Read messages from Subscription with some windowing strategy

final PCollection<String> readMessages = pipeline.apply("Read from Subscription", PubsubIO.readStrings().apply()....)

// Apply filter transformation to remove invalid jsons

PCollection<String> validMessages = readMessages.apply("Remove invalid messages", Filter.by(Validate.IS_VALID));
        

// Enum definition. 

    enum Validate implements SerializableFunction<String, Boolean> {
        IS_VALID {
            @Override
            public Boolean apply(String input) {
                try {
                    jsonMapper.readTree(input);
                    return true;
                } catch (IOException ex) {
                    LOG.warn(input);
                    return false;
                }
            }
        }
}
{code}

This enum can be defined in Filter class (org.apache.beam.sdk.transforms.Filter)


was (Author: varun.sharma):

{code:java}

// Read messages from Subscription with some windowing strategy

final PCollection<String> readMessages = pipeline.apply("Read from Subscription", PubsubIO.readStrings().apply()....)

// Apply filter transformation to remove invalid jsons

PCollection<String> validMessages = readMessages.apply("Remove invalid messages", Filter.by(Validate.IS_VALID));
        

// Enum definition. 

    enum Validate implements SerializableFunction<String, Boolean> {
        IS_VALID {
            @Override
            public Boolean apply(String input) {
                try {
                    jsonMapper.readTree(input);
                    return true;
                } catch (IOException ex) {
                    LOG.warn(input);
                    return false;
                }
            }
        }
{code}

This enum can be defined in Filter class (org.apache.beam.sdk.transforms.Filter)

> Removing Invalid JSON messages from PCollection before starting BigQueryIO Operations
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-9873
>                 URL: https://issues.apache.org/jira/browse/BEAM-9873
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Varun
>            Assignee: Varun
>            Priority: P4
>              Labels: Clarified, features
>
> In a typical set up of Pub Sub and Cloud Dataflow, a pub sub subscriber might receive some messages that does not follow a valid json structure and the Big Query Insert operation fails to process these messages and the worker may gets terminated if the exception is not handled correctly.
> The likelihood of receiving the invalid json messages are very less and the upstream component pushing messages on the Topic should have a validation at their end but this is not always the case and the application should be robust enough to survive even if there are wrong messages being pushed by the Upstreams. 
> I have created an Enum which acts like a Predicate in Filter transform. This is very standard logic of validating Json and i would like to add this to the java SDK(and Python) in the Filter transform 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)