You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Raghu Angadi (JIRA)" <ji...@apache.org> on 2018/03/02 22:37:00 UTC
[jira] [Commented] (BEAM-3770) The problem of kafkaIO sdk for data
latency
[ https://issues.apache.org/jira/browse/BEAM-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384259#comment-16384259 ]
Raghu Angadi commented on BEAM-3770:
------------------------------------
This is being discussed on user mailing list : [https://lists.apache.org/thread.html/db923fbb55469287a68742bf8276e0eb18db923632dd94468235a177@%3Cuser.beam.apache.org%3E]
Please reopen this is bug if that discussion leads us to believe this is a KafkaIO issue. Please include the entire pipeline code if feasible, which makes it simpler to reproduce. Closing it now. There is no indication this is a KafkaIO issue yet.
> The problem of kafkaIO sdk for data latency
> -------------------------------------------
>
> Key: BEAM-3770
> URL: https://issues.apache.org/jira/browse/BEAM-3770
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Affects Versions: 2.0.0
> Environment: For repeating my situation, my running environment is:
> OS: Ubuntn 14.04.3 LTS
> JAVA: JDK 1.7
> Beam 2.0.0 (with Direct runner)
> Kafka 2.10-0.10.1.1
> Maven 3.5.0, in which dependencies are listed in pom.xml:
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-core</artifactId>
> <version>2.0.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-runners-direct-java</artifactId>
> <version>2.0.0</version>
> <scope>runtime</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-kafka</artifactId>
> <version>2.0.0</version>
> </dependency>
> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>0.10.0.1</version>
> </dependency>
> Reporter: Rick Lin
> Assignee: Raghu Angadi
> Priority: Major
> Fix For: 2.0.0
>
>
> Dear all,
> I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).
> With using this sdk, there are a situation about *data* *latency*, and the description of situation is in the following.
> The data come from kafak with a fixed speed: 100 data size/ 1 sec.
> I create a fixed window within 1 sec without delay. I found that the data size is 70, 80, 104, or greater than or equal to 104.
> After one day, the data latency happens in my running time, and the data size will be only 10 in each window.
> *In order to clearly explain it, I also provide my code in the following.*
> " PipelineOptions readOptions = PipelineOptionsFactory._create_();
> *final* Pipeline p = Pipeline._create_(readOptions);
> PCollection<TimestampedValue<KV<String, String>>> readData =
> p.apply(KafkaIO.<String, String>_read_()
> .withBootstrapServers("127.0.0.1:9092")
> .withTopic("kafkasink")
> .withKeyDeserializer(StringDeserializer.*class*)
> .withValueDeserializer(StringDeserializer.*class*)
> .withoutMetadata())
> .apply(ParDo._of_(*new* +DoFn<KV<String, String>, TimestampedValue<KV<String, String>>>()+ {
> @ProcessElement
> *public* *void* test(ProcessContext c) *throws* ParseException {
> String element = c.element().getValue();
> *try* {
> JsonNode arrNode = *new* ObjectMapper().readTree(element);
> String t = arrNode.path("v").findValue("Timestamp").textValue();
> DateTimeFormatter formatter = DateTimeFormatter._ofPattern_("MM/dd/uuuu HH:mm:ss.SSSS");
> LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);
> java.time.Instant java_instant = dateTime.atZone(ZoneId._systemDefault_()).toInstant();
> Instant timestamp = *new* Instant(java_instant.toEpochMilli());
> c.output(TimestampedValue._of_(c.element(), timestamp));
> } *catch* (JsonGenerationException e) {
> e.printStackTrace();
> } *catch* (JsonMappingException e) {
> e.printStackTrace();
> } *catch* (IOException e) {
> e.printStackTrace();
> }
> }}));
> PCollection<TimestampedValue<KV<String, String>>> readDivideData = readData.apply(
> Window.<TimestampedValue<KV<String, String>>> _into_(FixedWindows._of_(Duration._standardSeconds_(1))
> .withOffset(Duration.*_ZERO_*))
> .triggering(AfterWatermark._pastEndOfWindow_()
> .withLateFirings(AfterProcessingTime._pastFirstElementInPane_()
> .plusDelayOf(Duration.*_ZERO_*)))
> .withAllowedLateness(Duration.*_ZERO_*)
> .discardingFiredPanes());"
> *In addition, the running result is as shown in the following.*
> "data-size=104
> coming-data-time=2018-02-27 02:00:49.117
> window-time=2018-02-27 02:00:49.999
> data-size=78
> coming-data-time=2018-02-27 02:00:50.318
> window-time=2018-02-27 02:00:50.999
> data-size=104
> coming-data-time=2018-02-27 02:00:51.102
> window-time=2018-02-27 02:00:51.999
> After one day:
> data-size=10
> coming-data-time=2018-02-28 02:05:48.217
> window-time=2018-03-01 10:35:16.999 "
> If you have any idea about the problem (data latency), I am looking forward to hearing from you.
> Thanks
> Rick
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)