You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Raghu Angadi (JIRA)" <ji...@apache.org> on 2018/03/02 22:37:00 UTC

[jira] [Commented] (BEAM-3770) The problem of kafkaIO sdk for data latency

    [ https://issues.apache.org/jira/browse/BEAM-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384259#comment-16384259 ] 

Raghu Angadi commented on BEAM-3770:
------------------------------------

This is being discussed on user mailing list : [https://lists.apache.org/thread.html/db923fbb55469287a68742bf8276e0eb18db923632dd94468235a177@%3Cuser.beam.apache.org%3E]

 

Please reopen this is bug if that discussion leads us to believe this is a KafkaIO issue. Please include the entire pipeline code if feasible, which makes it simpler to reproduce. Closing it now. There is no indication this is a KafkaIO issue yet.

> The problem of kafkaIO sdk for data latency
> -------------------------------------------
>
>                 Key: BEAM-3770
>                 URL: https://issues.apache.org/jira/browse/BEAM-3770
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>    Affects Versions: 2.0.0
>         Environment: For repeating my situation, my running environment is:
> OS: Ubuntn 14.04.3 LTS
> JAVA: JDK 1.7
> Beam 2.0.0 (with Direct runner)
> Kafka 2.10-0.10.1.1
> Maven 3.5.0, in which dependencies are listed in pom.xml:
> <dependency>
>       <groupId>org.apache.beam</groupId>
>       <artifactId>beam-sdks-java-core</artifactId>
>       <version>2.0.0</version>
>     </dependency>
> <dependency>
>    <groupId>org.apache.beam</groupId>
>   <artifactId>beam-runners-direct-java</artifactId>
>   <version>2.0.0</version>
>   <scope>runtime</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
>    <artifactId>beam-sdks-java-io-kafka</artifactId>
>    <version>2.0.0</version>       
> </dependency>
> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
> <dependency>
>    <groupId>org.apache.kafka</groupId>
>    <artifactId>kafka-clients</artifactId>
>    <version>0.10.0.1</version>
> </dependency>
>            Reporter: Rick Lin
>            Assignee: Raghu Angadi
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Dear all,
>  I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).
>  With using this sdk, there are a situation about *data* *latency*, and the description of situation is in the following.
>  The data come from kafak with a fixed speed: 100 data size/ 1 sec.
>  I create a fixed window within 1 sec without delay. I found that the data size is 70, 80, 104, or greater than or equal to 104.
>  After one day, the data latency happens in my running time, and the data size will be only 10 in each window.
>  *In order to clearly explain it, I also provide my code in the following.* 
> " PipelineOptions readOptions = PipelineOptionsFactory._create_();
> *final* Pipeline p = Pipeline._create_(readOptions);
>  PCollection<TimestampedValue<KV<String, String>>> readData =
>   p.apply(KafkaIO.<String, String>_read_()       
>      .withBootstrapServers("127.0.0.1:9092")
>      .withTopic("kafkasink")
>      .withKeyDeserializer(StringDeserializer.*class*)
>      .withValueDeserializer(StringDeserializer.*class*)
>      .withoutMetadata())
>      .apply(ParDo._of_(*new* +DoFn<KV<String, String>, TimestampedValue<KV<String, String>>>()+ {
>         @ProcessElement
>         *public* *void* test(ProcessContext c) *throws* ParseException {
>             String element = c.element().getValue();
>             *try* {
>               JsonNode arrNode = *new* ObjectMapper().readTree(element);
>               String t = arrNode.path("v").findValue("Timestamp").textValue();
>               DateTimeFormatter formatter = DateTimeFormatter._ofPattern_("MM/dd/uuuu HH:mm:ss.SSSS");
>              LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);
>              java.time.Instant java_instant = dateTime.atZone(ZoneId._systemDefault_()).toInstant();
>              Instant timestamp  = *new* Instant(java_instant.toEpochMilli());
>               c.output(TimestampedValue._of_(c.element(), timestamp));
>             } *catch* (JsonGenerationException e) {
>                 e.printStackTrace();
>             } *catch* (JsonMappingException e) {
>                 e.printStackTrace();
>           } *catch* (IOException e) {
>                 e.printStackTrace();
>           }
>         }}));
>  PCollection<TimestampedValue<KV<String, String>>> readDivideData = readData.apply(
>       Window.<TimestampedValue<KV<String, String>>> _into_(FixedWindows._of_(Duration._standardSeconds_(1))
>           .withOffset(Duration.*_ZERO_*))
>           .triggering(AfterWatermark._pastEndOfWindow_()           
>              .withLateFirings(AfterProcessingTime._pastFirstElementInPane_()
>                .plusDelayOf(Duration.*_ZERO_*)))
>           .withAllowedLateness(Duration.*_ZERO_*)
>           .discardingFiredPanes());"
>  *In addition, the running result is as shown in the following.*
> "data-size=104
> coming-data-time=2018-02-27 02:00:49.117
> window-time=2018-02-27 02:00:49.999
>  data-size=78
> coming-data-time=2018-02-27 02:00:50.318
> window-time=2018-02-27 02:00:50.999
>  data-size=104
> coming-data-time=2018-02-27 02:00:51.102
> window-time=2018-02-27 02:00:51.999
>  After one day:
> data-size=10
> coming-data-time=2018-02-28 02:05:48.217
> window-time=2018-03-01 10:35:16.999 "
> If you have any idea about the problem (data latency), I am looking forward to hearing from you.
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)