You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ricardo Ortega <ta...@gmail.com> on 2022/11/28 20:04:36 UTC

Fwd: Apache Beam - Windows and triggers

---------- Forwarded message ---------
De: Ricardo Ortega <ta...@gmail.com>
Date: lun, 28 nov 2022 a la(s) 15:01
Subject: Apache Beam - Windows and triggers
To: <us...@beam.apache.org>


Greetings, I have a problem in a dataflow pipeline with apache beam.

<https://stackoverflow.com/posts/74604736/timeline>

I have a streaming data stream that reads a kafka topical, which is a json
that is converted to class object and then grouped by id and send an output
json to another kafka topical.

The problem is that when sending 50 json at the same time to kafka, it does
not produce the same number of outputs and several of these data are
repeated in the output.

Windows and triggers were applied but I am new to Apache Beam and I don't
know what is wrong with my code. I have tried to follow the official
documentation but I have not achieved what I want.

I am using the DirectRunner.

Here is the json I sent to kafka:

{
    "transactionId": "1k1",
    "products":[
    {
        "negotiationId": 2002,
        "productGtin": 329794528329,
        "state": 1,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 9087120
    },
    {
        "negotiationId": 2004,
        "productGtin": 219059901669,
        "state": 2,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 1312691
    },
    {
        "negotiationId": 2008,
        "productGtin": 111579181771,
        "state": 2,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 4494522
    },
    {
        "negotiationId": 2009,
        "productGtin": 622670393720,
        "state": 2,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 5742150
    },
    {
        "negotiationId": 2005,
        "productGtin": 397999498452,
        "state": 1,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 5075384
    },
    {
        "negotiationId": 2004,
        "productGtin": 161158769517,
        "state": 2,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 4256911
    },
    {
        "negotiationId": 2009,
        "productGtin": 352265510055,
        "state": 1,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 2751445
    },
    {
        "negotiationId": 2003,
        "productGtin": 681014619525,
        "state": 2,
        "observation":
"0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023-0056-0055-0012-0034-0032-2056-0021-0058-0073-0020-0028-0023",
        "retailerCode": 5359730
    }
     ]
}

Here is the code:

public PipelineResult run() {

    Pipeline pipeline = Pipeline.create(options);
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coderPipeline.getCoder().getEncodedTypeDescriptor(),
coderPipeline.getCoder());

    options.setProject("low-code-exito");
    options.setRegion("us-central1");
    options.setUsePublicIps(false);
    options.setStagingLocation("gs://low-code-exito/binaries/");
    options.setGcpTempLocation("gs://low-code-exito/temp/");
    options.setNetwork("default");
    options.setSubnetwork("regions/us-central1/subnetworks/default");
    options.isStreaming();
    options.setKafkaReadTopics("test-topic");
    options.setReadBootstrapServers("localhost:9092");

    pipeline
            .apply(
                    "ReadFromKafka",
                    KafkaIO.<String, String>read()
                            .withTopic(options.getKafkaReadTopics())

.withBootstrapServers(options.getReadBootstrapServers())
                            .withKeyDeserializer(StringDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            .withoutMetadata())
            .apply(Values.create())
            .apply(Window.<String>into(FixedWindows.of(Duration.millis(10)))

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))))
                    .discardingFiredPanes()
                    .withAllowedLateness(Duration.ZERO))
            .apply(ParDo.of(new DoFn<String, NegotiationProduct>() {
                @ProcessElement
                public void processElement(@Element String input,
ProcessContext context) {
                    GsonBuilder builder = new GsonBuilder();
                    builder.setPrettyPrinting();
                    Gson gson = builder.create();
                    NegotiationProduct negotiationProducts =
gson.fromJson(input, NegotiationProduct.class);
                    fileName = negotiationProducts.getTransactionId();
                    context.output(negotiationProducts);
                }
            }))
            .apply(ParDo.of(new DoFn<NegotiationProduct, KV<String,
Product>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Arrays.stream(c.element().getProducts())
                            .map(product1 -> {

c.output(KV.of(product1.getNegotiationId(), product1));
                                return product1;
                            })
                            .forEach(System.out::println);
                }
            }))
            .apply(GroupByKey.create())
            .apply(WithKeys.of(0))
            .apply(GroupIntoBatches.ofSize(8))
            .apply(ParDo.of(new DoFn<KV<Integer, Iterable<KV<String,
Iterable<Product>>>>, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    GsonBuilder builder = new GsonBuilder();
                    builder.setPrettyPrinting();
                    Gson gson = builder.create();

                    List<Negotiation> negotiations = new ArrayList<>();

Objects.requireNonNull(c.element()).getValue().forEach(stringIterableKV
-> {
                        Negotiation negotiation1 = new
Negotiation(stringIterableKV.getKey(),
IterableUtils.toList(stringIterableKV.getValue()));
                        negotiations.add(negotiation1);
                    });

                    Payload payload = new Payload(negotiations);
                    String products = gson.toJson(payload);
                    Output output = new Output(
                            new Header(
                                    "a30631d2-56a0-4bdf-b311-355676495af3",

"sinco-notificacion-aprobacion-segmentacion-mq",

LocalDateTime.now().atOffset(ZoneOffset.UTC).toString(),
                                    new FlexField(fileName)),
                            new DataInfo(

"sinco-notificacion-aprobacion-segmentacion-mq",
                                    "application/json",
                                    products.replaceAll("\\n|\\t|\\s", ""),

LocalDateTime.now().atOffset(ZoneOffset.UTC).toString(),
                                    "a30631d2-56a0-4bdf-b311-355676495af3",

"negotiation-load-approved-2096-1664469006272-a30631d2-56a0-4bdf-b311-355676495af3.json",
                                    "UTF-8"
                            ));
                    String results = gson.toJson(output);
                    c.output(results);
                }
            }))
            .apply("write to kafka", KafkaIO.<Void, String>write()
                    .withBootstrapServers("localhost:9092")
                    .withTopic("exito")
                    .withValueSerializer(StringSerializer.class)

.withProducerConfigUpdates(ImmutableMap.of(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
52428800))
                    .values());

    return pipeline.run();
}

I welcome any corrections and suggestions