You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Chawla,Sumit " <su...@gmail.com> on 2016/08/24 21:10:47 UTC

KafkaIO Windowing Fn

Hi All


I am trying to do some simple batch processing on KafkaIO records.  My beam
pipeline looks like following:

pipeline.apply(KafkaIO.read()
        .withTopics(ImmutableList.of(s"mytopic"))
        .withBootstrapServers("localhost:9200")
.apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
KV<String,String>

.apply("WindowBy10Sec", Window.<KV<String,
JSONObject>>into(FixedWindows.of(Duration.standardSeconds(10))).withAllowedLateness(Duration.standardSeconds(1)))

.apply("GroupByKey", GroupByKey.create())

.apply("Sink", ParDo.of(new MySink())


My Kafka Source already has some messages 1000+, and new messages arrive
every few minutes.

When i start my pipeline,  i can see that it reads all the 1000+ messages
from Kafka.  However, Window does not fire untill a new message arrives in
Kafka.  And Sink does not receive any message until that point.  Do i need
to override the WaterMarkFn here? Since i am not providing any timeStampFn
, i am assuming that timestamps will be assigned as in when message arrives
i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
the Window not supposed to be fired based on Ingestion time?




Regards
Sumit Chawla

Re: KafkaIO Windowing Fn

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

>
> We should still have a JIRA to improve the KafkaIO watermark tracking in
> the absence of new records .
>

filed https://issues.apache.org/jira/browse/BEAM-591

I don't want to hijack this thread Sumit's primary issue, but want to
mention related concerns here, which could be discussed on a new thread or
on the jira:
from the jira description :

A user can provide functions to calculate watermark and record timestamps.
There are a few concerns with current design:

   - What should happen when a kafka topic is idle:
      - in default case, I think watermark should advance to current time.
      - What should happen when user has provided a function to calculate
      record timestamp?
         - Should the watermark stay same as record timestamp?
         - same when user has provided own watermark function?
      - Are the current semantics of user provided watermark function
   correct?
      - it is run once for each record read.
      - Should it instead be run inside getWatermark() called by the runner
      (we could still provide the last user record, and its timestamp).

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Most of the KafkaIO tests on DirectRunner are using withMaxNumRecords which
creates a BoundedReadFromUnboundedSource

Regards
Sumit Chawla


On Fri, Sep 2, 2016 at 11:07 AM, Raghu Angadi <ra...@google.com.invalid>
wrote:

> On Tue, Aug 30, 2016 at 12:01 AM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Sorry i tried with DirectRunner but ran into some kafka issues.
> Following
> > is the snippet i am working on, and will post more details once i get it
> > working ( as of now i am unable to read messages from Kafka using
> > DirectRunner)
> >
>
> would like to know more about issues you are running into. KafkaIO should
> work with DirectRunner, that is how it often tested (unit tests also run on
> DirectRunner).
>

Re: KafkaIO Windowing Fn

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Tue, Aug 30, 2016 at 12:01 AM, Chawla,Sumit <su...@gmail.com>
wrote:

> Sorry i tried with DirectRunner but ran into some kafka issues.  Following
> is the snippet i am working on, and will post more details once i get it
> working ( as of now i am unable to read messages from Kafka using
> DirectRunner)
>

would like to know more about issues you are running into. KafkaIO should
work with DirectRunner, that is how it often tested (unit tests also run on
DirectRunner).

Re: KafkaIO Windowing Fn

Posted by Gaurav Gupta <ga...@gmail.com>.
Sumit,

I tried running the code you shared. I noticed that if MaxNumRecords is set
to number N then KafkaIO doesn't return till it has read N messages. So
either try setting a low value of MaxNumRecords or don't set it at all..

Another thing I observed was that while using anonymous DoFns I got
following exception

java.lang.IllegalArgumentException: unable to serialize
org.apache.beam.sdk.io.kafka.Test$1@384ad17b
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:56)
at
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)
at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:671)
at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:629)
at org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:558)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:525)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:520)

but when I defined classes for these anonymous DoFn the above exception
went away.

I wrote small test to reproduce above issue

@Test
public void testSerialization() throws Exception {
  SerializableUtils.serializeToByteArray(new DoFn<String, String>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
    }
  });
}


Thanks

Gaurav


On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> could the reason for the second part of the trigger never firing be that
> there are never at least 100 elements per key. The trigger would only fire
> if it saw 100 elements and with only 540 elements that seems unlikely if
> you have more than 6 keys.
>
> Cheers,
> Aljoscha
>
> On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > KafkaIO is implemented using the UnboundedRead API, which is supported by
> > the DirectRunner. You should be able to run without the
> withMaxNumRecords;
> > if you can't, I'd be very interested to see the stack trace that you get
> > when you try to run the Pipeline.
> >
> > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Yes.  I added it only for DirectRunner as it cannot translate
> > > Read(UnboundedSourceOfKafka)
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> > > wrote:
> > >
> > > > Ah ok, this might be a stupid question but did you remove this line
> > when
> > > > running it with Flink:
> > > >         .withMaxNumRecords(500)
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Aljoscha
> > > > >
> > > > > The code is not different while running on Flink.  It have removed
> > > > business
> > > > > specific transformations only.
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > could you maybe also post the complete that you're using with the
> > > > > > FlinkRunner? I could have a look into it.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> sumitkchawla@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Thomas
> > > > > > >
> > > > > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > > > > Following
> > > > > > > is the snippet i am working on, and will post more details
> once i
> > > get
> > > > > it
> > > > > > > working ( as of now i am unable to read messages from Kafka
> using
> > > > > > > DirectRunner)
> > > > > > >
> > > > > > >
> > > > > > > PipelineOptions pipelineOptions =
> > PipelineOptionsFactory.create();
> > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > >         .withMaxNumRecords(500)
> > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "true",
> > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > "earliest"
> > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> byte[]>,
> > > > > > > KV<String, String>>() {
> > > > > > >     @Override
> > > > > > >     public void processElement(ProcessContext c) throws
> > Exception {
> > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > String(record.getValue())));
> > > > > > >     }
> > > > > > > }))
> > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > > >                 .withAllowedLateness(Duration.
> standardSeconds(1))
> > > > > > >                 .triggering(
> > > > > > >                         Repeatedly.forever(
> > > > > > >                                 AfterFirst.of(
> > > > > > >
> > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > >
> > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > >
> > > >  AfterPane.elementCountAtLeast(
> > > > > > 100)
> > > > > > >                                 )))
> > > > > > >                 .discardingFiredPanes())
> > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>,
> > > > Void>()
> > > > > {
> > > > > > >             @Override
> > > > > > >             public void processElement(ProcessContext c) throws
> > > > > > Exception {
> > > > > > >                 KV<String, Iterable<String>> element =
> > c.element();
> > > > > > >                 Iterator<String> iterator =
> > > > > > element.getValue().iterator();
> > > > > > >                 int count = 0;
> > > > > > >                 while (iterator.hasNext()) {
> > > > > > >                     iterator.next();
> > > > > > >                     count++;
> > > > > > >                 }
> > > > > > >                 System.out.println(String.format("Key %s Value
> > > Count
> > > > > > > %d", element.getKey(), count));
> > > > > > >             }
> > > > > > >         }));
> > > > > > > pipeline.run();
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > <tgroh@google.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > If you use the DirectRunner, do you observe the same
> behavior?
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > sumitkchawla@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Thomas
> > > > > > > > >
> > > > > > > > > I am using FlinkRunner.  Yes the second part of trigger
> never
> > > > fires
> > > > > > for
> > > > > > > > me,
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > <tgroh@google.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Sumit;
> > > > > > > > > >
> > > > > > > > > > What runner are you using? I can set up a test with the
> > same
> > > > > > trigger
> > > > > > > > > > reading from an unbounded input using the DirectRunner
> and
> > I
> > > > get
> > > > > > the
> > > > > > > > > > expected output panes.
> > > > > > > > > >
> > > > > > > > > > Just to clarify, the second half of the trigger ('when
> the
> > > > first
> > > > > > > > element
> > > > > > > > > > has been there for at least 30+ seconds') simply never
> > fires?
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas
> > > > > > > > > > >
> > > > > > > > > > > That did not work.
> > > > > > > > > > >
> > > > > > > > > > > I tried following instead:
> > > > > > > > > > >
> > > > > > > > > > > .triggering(
> > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > >                               AfterProcessingTime.
> > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > >
> > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > Seconds(30)),
> > > > > > > > > > >
> > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > >                         )))
> > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > >
> > > > > > > > > > > What i am trying to do here.  This is to make sure that
> > > > > followup
> > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > >
> > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > >
> > > > > > > > > > > 2.  Or Fire when the first element has been there for
> > > atleast
> > > > > 30
> > > > > > > > sec+.
> > > > > > > > > > >
> > > > > > > > > > > However,  2 point does not seem to work.  e.g. I have
> 540
> > > > > records
> > > > > > > in
> > > > > > > > > > > Kafka.  The first 500 records are available
> immediately,
> > > > > > > > > > >
> > > > > > > > > > > but the remaining 40 don't pass through. I was
> expecting
> > > 2nd
> > > > to
> > > > > > > > > > > trigger to help here.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > You can adjust the trigger in the windowing transform
> > if
> > > > your
> > > > > > > sink
> > > > > > > > > can
> > > > > > > > > > > > handle being written to multiple times for the same
> > > window.
> > > > > For
> > > > > > > > > > example,
> > > > > > > > > > > if
> > > > > > > > > > > > the sink appends to the output when it receives new
> > data
> > > > in a
> > > > > > > > window,
> > > > > > > > > > you
> > > > > > > > > > > > could add something like
> > > > > > > > > > > >
> > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> AfterProcessingTime.
> > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).
> > > > discardin
> > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > >
> > > > > > > > > > > > This will cause elements to be output some amount of
> > time
> > > > > after
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > > first received from Kafka, even if Kafka does not
> have
> > > any
> > > > > new
> > > > > > > > > > elements.
> > > > > > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > > > > > >
> > > > > > > > > > > > We should still have a JIRA to improve the KafkaIO
> > > > watermark
> > > > > > > > tracking
> > > > > > > > > > in
> > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I don't have much control over changing KafkaIO
> > > > properties.
> > > > > > I
> > > > > > > > > added
> > > > > > > > > > > > > KafkaIO code for completing the example.  Are there
> > any
> > > > > > changes
> > > > > > > > > that
> > > > > > > > > > > can
> > > > > > > > > > > > be
> > > > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards
> > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > The default implementation returns processing
> > > timestamp
> > > > > of
> > > > > > > the
> > > > > > > > > last
> > > > > > > > > > > > > record
> > > > > > > > > > > > > > (in effect. more accurately it returns same as
> > > > > > > getTimestamp(),
> > > > > > > > > > which
> > > > > > > > > > > > > might
> > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > As a work around, yes, you can provide your own
> > > > > watermarkFn
> > > > > > > > that
> > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage
> in
> > > > > javadoc
> > > > > > > > > > > > > > <https://github.com/apache/
> > > incubator-beam/blob/master/
> > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > )
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think default watermark should be smarter. it
> > > should
> > > > > > > advance
> > > > > > > > to
> > > > > > > > > > > > current
> > > > > > > > > > > > > > time if there aren't any records to read from
> > Kafka.
> > > > > Could
> > > > > > > you
> > > > > > > > > > file a
> > > > > > > > > > > > > jira?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I am trying to do some simple batch processing
> on
> > > > > KafkaIO
> > > > > > > > > > records.
> > > > > > > > > > > > My
> > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> s"mytopic"))
> > > > > > > > > > > > > > >         .withBootstrapServers("
> localhost:9200")
> > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > ExtractKVMessage()))
> > > > > > > //
> > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > My Kafka Source already has some messages
> 1000+,
> > > and
> > > > > new
> > > > > > > > > messages
> > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > When i start my pipeline,  i can see that it
> > reads
> > > > all
> > > > > > the
> > > > > > > > > 1000+
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > from Kafka.  However, Window does not fire
> > untill a
> > > > new
> > > > > > > > message
> > > > > > > > > > > > arrives
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > Kafka.  And Sink does not receive any message
> > until
> > > > > that
> > > > > > > > point.
> > > > > > > > > > > Do i
> > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am
> not
> > > > > > providing
> > > > > > > > any
> > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > , i am assuming that timestamps will be
> assigned
> > as
> > > > in
> > > > > > when
> > > > > > > > > > message
> > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > > WaterMarkFn
> > > > > > > > > > > implementation?
> > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > the Window not supposed to be fired based on
> > > > Ingestion
> > > > > > > time?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Aljoscha Krettek <al...@apache.org>.
Ah, now I remember that the Flink runner did never support processing-time
timers. I created a Jira issue for this:
https://issues.apache.org/jira/browse/BEAM-615

On Thu, 1 Sep 2016 at 19:20 Chawla,Sumit <su...@gmail.com> wrote:

> Thanks Ajioscha\Thomas
>
> I will explore on the option to upgrade.  Meanwhile here is what observed
> with the above code in my local Flink Cluster.
>
> 1.  To start there are 0 records in Kafka
> 2.  Deploy the pipeline.  Two records are received in Kafka at time
> 10:00:00 AM
> 3.  The Pane with 100 records would not fire because expected data is not
> there.  I would expect the 30 sec based filter to fire and downstream to
> receive the record around 10:00:30 AM.
>
> 4.  No new records are arriving.  The downstream received the above record
> around 10 minutes later around 10:10:00 AM
>
> I am not sure whats actually triggering the window firing here.  ( does not
> look like to be 30 sec trigger)
>
>
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 31, 2016 at 11:14 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Ah I see, the Flink Runner had quite some updates in 0.2.0-incubating and
> > even more for the upcoming 0.3.0-incubating.
> >
> > On Thu, 1 Sep 2016 at 04:09 Thomas Groh <tg...@google.com.invalid>
> wrote:
> >
> > > In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner
> > with
> > > the DirectRunner (formerly InProcessPipelineRunner), which is capable
> of
> > > handling Unbounded Pipelines. Is it possible for you to upgrade?
> > >
> > > On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > >
> > > > @Ajioscha,  My assumption is here that atleast one trigger should
> fire.
> > > > Either the 100 elements or the 30 second since first element.
> > (whichever
> > > > happens first)
> > > >
> > > > @Thomas - here is the error i get: I am using 0.1.0-incubating
> > > >
> > > > *ava.lang.IllegalStateException: no evaluator registered for
> > > > Read(UnboundedKafkaSource)*
> > > >
> > > > * at
> > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> > > > visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> > > > * at
> > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > > TransformTreeNode.java:225)*
> > > > * at
> > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > > TransformTreeNode.java:220)*
> > > > * at
> > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > > TransformTreeNode.java:220)*
> > > > * a*
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <
> > aljoscha@apache.org>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > could the reason for the second part of the trigger never firing be
> > > that
> > > > > there are never at least 100 elements per key. The trigger would
> only
> > > > fire
> > > > > if it saw 100 elements and with only 540 elements that seems
> unlikely
> > > if
> > > > > you have more than 6 keys.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tgroh@google.com.invalid
> >
> > > > wrote:
> > > > >
> > > > > > KafkaIO is implemented using the UnboundedRead API, which is
> > > supported
> > > > by
> > > > > > the DirectRunner. You should be able to run without the
> > > > > withMaxNumRecords;
> > > > > > if you can't, I'd be very interested to see the stack trace that
> > you
> > > > get
> > > > > > when you try to run the Pipeline.
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <
> > > sumitkchawla@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > > > > Read(UnboundedSourceOfKafka)
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > > > > aljoscha@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Ah ok, this might be a stupid question but did you remove
> this
> > > line
> > > > > > when
> > > > > > > > running it with Flink:
> > > > > > > >         .withMaxNumRecords(500)
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <
> > > sumitkchawla@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Aljoscha
> > > > > > > > >
> > > > > > > > > The code is not different while running on Flink.  It have
> > > > removed
> > > > > > > > business
> > > > > > > > > specific transformations only.
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > > > > aljoscha@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > could you maybe also post the complete that you're using
> > with
> > > > the
> > > > > > > > > > FlinkRunner? I could have a look into it.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Aljoscha
> > > > > > > > > >
> > > > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > > > > sumitkchawla@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas
> > > > > > > > > > >
> > > > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka
> > > > issues.
> > > > > > > > > > Following
> > > > > > > > > > > is the snippet i am working on, and will post more
> > details
> > > > > once i
> > > > > > > get
> > > > > > > > > it
> > > > > > > > > > > working ( as of now i am unable to read messages from
> > Kafka
> > > > > using
> > > > > > > > > > > DirectRunner)
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > PipelineOptions pipelineOptions =
> > > > > > PipelineOptionsFactory.create();
> > > > > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > >         .withMaxNumRecords(500)
> > > > > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG,
> "test1",
> > > > > > > > > > >                 ConsumerConfig.ENABLE_AUTO_
> > COMMIT_CONFIG,
> > > > > "true",
> > > > > > > > > > >
>  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > > > "earliest"
> > > > > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > > > > byte[]>,
> > > > > > > > > > > KV<String, String>>() {
> > > > > > > > > > >     @Override
> > > > > > > > > > >     public void processElement(ProcessContext c) throws
> > > > > > Exception {
> > > > > > > > > > >         KV<byte[], byte[]> record =
> c.element().getKV();
> > > > > > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > > > > > String(record.getValue())));
> > > > > > > > > > >     }
> > > > > > > > > > > }))
> > > > > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > > > > String>>into(FixedWindows.of(
> > Duration.standardSeconds(10)))
> > > > > > > > > > >                 .withAllowedLateness(Duration.
> > > > > standardSeconds(1))
> > > > > > > > > > >                 .triggering(
> > > > > > > > > > >                         Repeatedly.forever(
> > > > > > > > > > >                                 AfterFirst.of(
> > > > > > > > > > >
> > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > > > > >
> > > > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > > > > >
> > > > > > > >  AfterPane.elementCountAtLeast(
> > > > > > > > > > 100)
> > > > > > > > > > >                                 )))
> > > > > > > > > > >                 .discardingFiredPanes())
> > > > > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> > > > Iterable<String>>,
> > > > > > > > Void>()
> > > > > > > > > {
> > > > > > > > > > >             @Override
> > > > > > > > > > >             public void processElement(ProcessContext
> c)
> > > > throws
> > > > > > > > > > Exception {
> > > > > > > > > > >                 KV<String, Iterable<String>> element =
> > > > > > c.element();
> > > > > > > > > > >                 Iterator<String> iterator =
> > > > > > > > > > element.getValue().iterator();
> > > > > > > > > > >                 int count = 0;
> > > > > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > > > > >                     iterator.next();
> > > > > > > > > > >                     count++;
> > > > > > > > > > >                 }
> > > > > > > > > > >                 System.out.println(String.format("Key
> %s
> > > > Value
> > > > > > > Count
> > > > > > > > > > > %d", element.getKey(), count));
> > > > > > > > > > >             }
> > > > > > > > > > >         }));
> > > > > > > > > > > pipeline.run();
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > If you use the DirectRunner, do you observe the same
> > > > > behavior?
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am using FlinkRunner.  Yes the second part of
> > trigger
> > > > > never
> > > > > > > > fires
> > > > > > > > > > for
> > > > > > > > > > > > me,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards
> > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hey Sumit;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What runner are you using? I can set up a test
> with
> > > the
> > > > > > same
> > > > > > > > > > trigger
> > > > > > > > > > > > > > reading from an unbounded input using the
> > > DirectRunner
> > > > > and
> > > > > > I
> > > > > > > > get
> > > > > > > > > > the
> > > > > > > > > > > > > > expected output panes.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Just to clarify, the second half of the trigger
> > > ('when
> > > > > the
> > > > > > > > first
> > > > > > > > > > > > element
> > > > > > > > > > > > > > has been there for at least 30+ seconds') simply
> > > never
> > > > > > fires?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > That did not work.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .triggering(
> > > > > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > > > > >
> >  AfterProcessingTime.
> > > > > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > > > > >
> > > > > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > > > > >
> > > > > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > > > > >                         )))
> > > > > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > What i am trying to do here.  This is to make
> > sure
> > > > that
> > > > > > > > > followup
> > > > > > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2.  Or Fire when the first element has been
> there
> > > for
> > > > > > > atleast
> > > > > > > > > 30
> > > > > > > > > > > > sec+.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > However,  2 point does not seem to work.  e.g.
> I
> > > have
> > > > > 540
> > > > > > > > > records
> > > > > > > > > > > in
> > > > > > > > > > > > > > > Kafka.  The first 500 records are available
> > > > > immediately,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > but the remaining 40 don't pass through. I was
> > > > > expecting
> > > > > > > 2nd
> > > > > > > > to
> > > > > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > You can adjust the trigger in the windowing
> > > > transform
> > > > > > if
> > > > > > > > your
> > > > > > > > > > > sink
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > handle being written to multiple times for
> the
> > > same
> > > > > > > window.
> > > > > > > > > For
> > > > > > > > > > > > > > example,
> > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > the sink appends to the output when it
> receives
> > > new
> > > > > > data
> > > > > > > > in a
> > > > > > > > > > > > window,
> > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > could add something like
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > > > > AfterProcessingTime.
> > > > > > > > > > > > > > > >
> pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > > > > withLateFirings(AfterPane.
> > > > elementCountAtLeast(1))).
> > > > > > > > discardin
> > > > > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > This will cause elements to be output some
> > amount
> > > > of
> > > > > > time
> > > > > > > > > after
> > > > > > > > > > > > they
> > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > first received from Kafka, even if Kafka does
> > not
> > > > > have
> > > > > > > any
> > > > > > > > > new
> > > > > > > > > > > > > > elements.
> > > > > > > > > > > > > > > > Elements will only be output by the
> GroupByKey
> > > > once.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > We should still have a JIRA to improve the
> > > KafkaIO
> > > > > > > > watermark
> > > > > > > > > > > > tracking
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM,
> Chawla,Sumit
> > <
> > > > > > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I don't have much control over changing
> > KafkaIO
> > > > > > > > properties.
> > > > > > > > > > I
> > > > > > > > > > > > > added
> > > > > > > > > > > > > > > > > KafkaIO code for completing the example.
> Are
> > > > there
> > > > > > any
> > > > > > > > > > changes
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > done to Windowing to achieve the same
> > behavior?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu
> Angadi
> > > > > > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > The default implementation returns
> > processing
> > > > > > > timestamp
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > (in effect. more accurately it returns
> same
> > > as
> > > > > > > > > > > getTimestamp(),
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > As a work around, yes, you can provide
> your
> > > own
> > > > > > > > > watermarkFn
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec.
> > > (usage
> > > > > in
> > > > > > > > > javadoc
> > > > > > > > > > > > > > > > > > <https://github.com/apache/
> > > > > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I think default watermark should be
> > smarter.
> > > it
> > > > > > > should
> > > > > > > > > > > advance
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > time if there aren't any records to read
> > from
> > > > > > Kafka.
> > > > > > > > > Could
> > > > > > > > > > > you
> > > > > > > > > > > > > > file a
> > > > > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM,
> > > Chawla,Sumit <
> > > > > > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I am trying to do some simple batch
> > > > processing
> > > > > on
> > > > > > > > > KafkaIO
> > > > > > > > > > > > > > records.
> > > > > > > > > > > > > > > > My
> > > > > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > > > > s"mytopic"))
> > > > > > > > > > > > > > > > > > >         .withBootstrapServers("
> > > > > localhost:9200")
> > > > > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > > > > > ExtractKVMessage()))
> > > > > > > > > > > //
> > > > > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > .apply("WindowBy10Sec",
> > Window.<KV<String,
> > > > > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > .apply("GroupByKey",
> GroupByKey.create())
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > My Kafka Source already has some
> messages
> > > > > 1000+,
> > > > > > > and
> > > > > > > > > new
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > When i start my pipeline,  i can see
> that
> > > it
> > > > > > reads
> > > > > > > > all
> > > > > > > > > > the
> > > > > > > > > > > > > 1000+
> > > > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > > from Kafka.  However, Window does not
> > fire
> > > > > > untill a
> > > > > > > > new
> > > > > > > > > > > > message
> > > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > Kafka.  And Sink does not receive any
> > > message
> > > > > > until
> > > > > > > > > that
> > > > > > > > > > > > point.
> > > > > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > to override the WaterMarkFn here?
> Since i
> > > am
> > > > > not
> > > > > > > > > > providing
> > > > > > > > > > > > any
> > > > > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > > > > , i am assuming that timestamps will be
> > > > > assigned
> > > > > > as
> > > > > > > > in
> > > > > > > > > > when
> > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > > > i.e. ingestion time.  What is the
> default
> > > > > > > WaterMarkFn
> > > > > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > > > > the Window not supposed to be fired
> based
> > > on
> > > > > > > > Ingestion
> > > > > > > > > > > time?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Thanks Ajioscha\Thomas

I will explore on the option to upgrade.  Meanwhile here is what observed
with the above code in my local Flink Cluster.

1.  To start there are 0 records in Kafka
2.  Deploy the pipeline.  Two records are received in Kafka at time
10:00:00 AM
3.  The Pane with 100 records would not fire because expected data is not
there.  I would expect the 30 sec based filter to fire and downstream to
receive the record around 10:00:30 AM.

4.  No new records are arriving.  The downstream received the above record
around 10 minutes later around 10:10:00 AM

I am not sure whats actually triggering the window firing here.  ( does not
look like to be 30 sec trigger)



Regards
Sumit Chawla


On Wed, Aug 31, 2016 at 11:14 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Ah I see, the Flink Runner had quite some updates in 0.2.0-incubating and
> even more for the upcoming 0.3.0-incubating.
>
> On Thu, 1 Sep 2016 at 04:09 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner
> with
> > the DirectRunner (formerly InProcessPipelineRunner), which is capable of
> > handling Unbounded Pipelines. Is it possible for you to upgrade?
> >
> > On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > @Ajioscha,  My assumption is here that atleast one trigger should fire.
> > > Either the 100 elements or the 30 second since first element.
> (whichever
> > > happens first)
> > >
> > > @Thomas - here is the error i get: I am using 0.1.0-incubating
> > >
> > > *ava.lang.IllegalStateException: no evaluator registered for
> > > Read(UnboundedKafkaSource)*
> > >
> > > * at
> > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> > > visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> > > * at
> > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > TransformTreeNode.java:225)*
> > > * at
> > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > TransformTreeNode.java:220)*
> > > * at
> > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > TransformTreeNode.java:220)*
> > > * a*
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <
> aljoscha@apache.org>
> > > wrote:
> > >
> > > > Hi,
> > > > could the reason for the second part of the trigger never firing be
> > that
> > > > there are never at least 100 elements per key. The trigger would only
> > > fire
> > > > if it saw 100 elements and with only 540 elements that seems unlikely
> > if
> > > > you have more than 6 keys.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid>
> > > wrote:
> > > >
> > > > > KafkaIO is implemented using the UnboundedRead API, which is
> > supported
> > > by
> > > > > the DirectRunner. You should be able to run without the
> > > > withMaxNumRecords;
> > > > > if you can't, I'd be very interested to see the stack trace that
> you
> > > get
> > > > > when you try to run the Pipeline.
> > > > >
> > > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <
> > sumitkchawla@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > > > Read(UnboundedSourceOfKafka)
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > > > aljoscha@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Ah ok, this might be a stupid question but did you remove this
> > line
> > > > > when
> > > > > > > running it with Flink:
> > > > > > >         .withMaxNumRecords(500)
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Aljoscha
> > > > > > > >
> > > > > > > > The code is not different while running on Flink.  It have
> > > removed
> > > > > > > business
> > > > > > > > specific transformations only.
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > > > aljoscha@apache.org
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > could you maybe also post the complete that you're using
> with
> > > the
> > > > > > > > > FlinkRunner? I could have a look into it.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Aljoscha
> > > > > > > > >
> > > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > > > sumitkchawla@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Thomas
> > > > > > > > > >
> > > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka
> > > issues.
> > > > > > > > > Following
> > > > > > > > > > is the snippet i am working on, and will post more
> details
> > > > once i
> > > > > > get
> > > > > > > > it
> > > > > > > > > > working ( as of now i am unable to read messages from
> Kafka
> > > > using
> > > > > > > > > > DirectRunner)
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > PipelineOptions pipelineOptions =
> > > > > PipelineOptionsFactory.create();
> > > > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > >         .withMaxNumRecords(500)
> > > > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > > > > >                 ConsumerConfig.ENABLE_AUTO_
> COMMIT_CONFIG,
> > > > "true",
> > > > > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > > "earliest"
> > > > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > > > byte[]>,
> > > > > > > > > > KV<String, String>>() {
> > > > > > > > > >     @Override
> > > > > > > > > >     public void processElement(ProcessContext c) throws
> > > > > Exception {
> > > > > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > > > > String(record.getValue())));
> > > > > > > > > >     }
> > > > > > > > > > }))
> > > > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > > > String>>into(FixedWindows.of(
> Duration.standardSeconds(10)))
> > > > > > > > > >                 .withAllowedLateness(Duration.
> > > > standardSeconds(1))
> > > > > > > > > >                 .triggering(
> > > > > > > > > >                         Repeatedly.forever(
> > > > > > > > > >                                 AfterFirst.of(
> > > > > > > > > >
> > > > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > > > >
> > > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > > > >
> > > > > > >  AfterPane.elementCountAtLeast(
> > > > > > > > > 100)
> > > > > > > > > >                                 )))
> > > > > > > > > >                 .discardingFiredPanes())
> > > > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> > > Iterable<String>>,
> > > > > > > Void>()
> > > > > > > > {
> > > > > > > > > >             @Override
> > > > > > > > > >             public void processElement(ProcessContext c)
> > > throws
> > > > > > > > > Exception {
> > > > > > > > > >                 KV<String, Iterable<String>> element =
> > > > > c.element();
> > > > > > > > > >                 Iterator<String> iterator =
> > > > > > > > > element.getValue().iterator();
> > > > > > > > > >                 int count = 0;
> > > > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > > > >                     iterator.next();
> > > > > > > > > >                     count++;
> > > > > > > > > >                 }
> > > > > > > > > >                 System.out.println(String.format("Key %s
> > > Value
> > > > > > Count
> > > > > > > > > > %d", element.getKey(), count));
> > > > > > > > > >             }
> > > > > > > > > >         }));
> > > > > > > > > > pipeline.run();
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > > > <tgroh@google.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > If you use the DirectRunner, do you observe the same
> > > > behavior?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > >
> > > > > > > > > > > > I am using FlinkRunner.  Yes the second part of
> trigger
> > > > never
> > > > > > > fires
> > > > > > > > > for
> > > > > > > > > > > me,
> > > > > > > > > > > >
> > > > > > > > > > > > Regards
> > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey Sumit;
> > > > > > > > > > > > >
> > > > > > > > > > > > > What runner are you using? I can set up a test with
> > the
> > > > > same
> > > > > > > > > trigger
> > > > > > > > > > > > > reading from an unbounded input using the
> > DirectRunner
> > > > and
> > > > > I
> > > > > > > get
> > > > > > > > > the
> > > > > > > > > > > > > expected output panes.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Just to clarify, the second half of the trigger
> > ('when
> > > > the
> > > > > > > first
> > > > > > > > > > > element
> > > > > > > > > > > > > has been there for at least 30+ seconds') simply
> > never
> > > > > fires?
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > That did not work.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > .triggering(
> > > > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > > > >
>  AfterProcessingTime.
> > > > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > > > >
> > > > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > > > >
> > > > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > > > >                         )))
> > > > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What i am trying to do here.  This is to make
> sure
> > > that
> > > > > > > > followup
> > > > > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2.  Or Fire when the first element has been there
> > for
> > > > > > atleast
> > > > > > > > 30
> > > > > > > > > > > sec+.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > However,  2 point does not seem to work.  e.g. I
> > have
> > > > 540
> > > > > > > > records
> > > > > > > > > > in
> > > > > > > > > > > > > > Kafka.  The first 500 records are available
> > > > immediately,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > but the remaining 40 don't pass through. I was
> > > > expecting
> > > > > > 2nd
> > > > > > > to
> > > > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > You can adjust the trigger in the windowing
> > > transform
> > > > > if
> > > > > > > your
> > > > > > > > > > sink
> > > > > > > > > > > > can
> > > > > > > > > > > > > > > handle being written to multiple times for the
> > same
> > > > > > window.
> > > > > > > > For
> > > > > > > > > > > > > example,
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > the sink appends to the output when it receives
> > new
> > > > > data
> > > > > > > in a
> > > > > > > > > > > window,
> > > > > > > > > > > > > you
> > > > > > > > > > > > > > > could add something like
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > > > AfterProcessingTime.
> > > > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > > > withLateFirings(AfterPane.
> > > elementCountAtLeast(1))).
> > > > > > > discardin
> > > > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This will cause elements to be output some
> amount
> > > of
> > > > > time
> > > > > > > > after
> > > > > > > > > > > they
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > > first received from Kafka, even if Kafka does
> not
> > > > have
> > > > > > any
> > > > > > > > new
> > > > > > > > > > > > > elements.
> > > > > > > > > > > > > > > Elements will only be output by the GroupByKey
> > > once.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > We should still have a JIRA to improve the
> > KafkaIO
> > > > > > > watermark
> > > > > > > > > > > tracking
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit
> <
> > > > > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I don't have much control over changing
> KafkaIO
> > > > > > > properties.
> > > > > > > > > I
> > > > > > > > > > > > added
> > > > > > > > > > > > > > > > KafkaIO code for completing the example.  Are
> > > there
> > > > > any
> > > > > > > > > changes
> > > > > > > > > > > > that
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > done to Windowing to achieve the same
> behavior?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The default implementation returns
> processing
> > > > > > timestamp
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > last
> > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > (in effect. more accurately it returns same
> > as
> > > > > > > > > > getTimestamp(),
> > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > As a work around, yes, you can provide your
> > own
> > > > > > > > watermarkFn
> > > > > > > > > > > that
> > > > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec.
> > (usage
> > > > in
> > > > > > > > javadoc
> > > > > > > > > > > > > > > > > <https://github.com/apache/
> > > > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I think default watermark should be
> smarter.
> > it
> > > > > > should
> > > > > > > > > > advance
> > > > > > > > > > > to
> > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > time if there aren't any records to read
> from
> > > > > Kafka.
> > > > > > > > Could
> > > > > > > > > > you
> > > > > > > > > > > > > file a
> > > > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM,
> > Chawla,Sumit <
> > > > > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I am trying to do some simple batch
> > > processing
> > > > on
> > > > > > > > KafkaIO
> > > > > > > > > > > > > records.
> > > > > > > > > > > > > > > My
> > > > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > > > s"mytopic"))
> > > > > > > > > > > > > > > > > >         .withBootstrapServers("
> > > > localhost:9200")
> > > > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > > > > ExtractKVMessage()))
> > > > > > > > > > //
> > > > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > .apply("WindowBy10Sec",
> Window.<KV<String,
> > > > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > My Kafka Source already has some messages
> > > > 1000+,
> > > > > > and
> > > > > > > > new
> > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > When i start my pipeline,  i can see that
> > it
> > > > > reads
> > > > > > > all
> > > > > > > > > the
> > > > > > > > > > > > 1000+
> > > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > from Kafka.  However, Window does not
> fire
> > > > > untill a
> > > > > > > new
> > > > > > > > > > > message
> > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > Kafka.  And Sink does not receive any
> > message
> > > > > until
> > > > > > > > that
> > > > > > > > > > > point.
> > > > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i
> > am
> > > > not
> > > > > > > > > providing
> > > > > > > > > > > any
> > > > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > > > , i am assuming that timestamps will be
> > > > assigned
> > > > > as
> > > > > > > in
> > > > > > > > > when
> > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > > > > > WaterMarkFn
> > > > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > > > the Window not supposed to be fired based
> > on
> > > > > > > Ingestion
> > > > > > > > > > time?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Aljoscha Krettek <al...@apache.org>.
Ah I see, the Flink Runner had quite some updates in 0.2.0-incubating and
even more for the upcoming 0.3.0-incubating.

On Thu, 1 Sep 2016 at 04:09 Thomas Groh <tg...@google.com.invalid> wrote:

> In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner with
> the DirectRunner (formerly InProcessPipelineRunner), which is capable of
> handling Unbounded Pipelines. Is it possible for you to upgrade?
>
> On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > @Ajioscha,  My assumption is here that atleast one trigger should fire.
> > Either the 100 elements or the 30 second since first element. (whichever
> > happens first)
> >
> > @Thomas - here is the error i get: I am using 0.1.0-incubating
> >
> > *ava.lang.IllegalStateException: no evaluator registered for
> > Read(UnboundedKafkaSource)*
> >
> > * at
> > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> > visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> > * at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:225)*
> > * at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)*
> > * at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)*
> > * a*
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > could the reason for the second part of the trigger never firing be
> that
> > > there are never at least 100 elements per key. The trigger would only
> > fire
> > > if it saw 100 elements and with only 540 elements that seems unlikely
> if
> > > you have more than 6 keys.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid>
> > wrote:
> > >
> > > > KafkaIO is implemented using the UnboundedRead API, which is
> supported
> > by
> > > > the DirectRunner. You should be able to run without the
> > > withMaxNumRecords;
> > > > if you can't, I'd be very interested to see the stack trace that you
> > get
> > > > when you try to run the Pipeline.
> > > >
> > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <
> sumitkchawla@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > > Read(UnboundedSourceOfKafka)
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > > aljoscha@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Ah ok, this might be a stupid question but did you remove this
> line
> > > > when
> > > > > > running it with Flink:
> > > > > >         .withMaxNumRecords(500)
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <
> sumitkchawla@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Aljoscha
> > > > > > >
> > > > > > > The code is not different while running on Flink.  It have
> > removed
> > > > > > business
> > > > > > > specific transformations only.
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > > aljoscha@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > could you maybe also post the complete that you're using with
> > the
> > > > > > > > FlinkRunner? I could have a look into it.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > > sumitkchawla@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Thomas
> > > > > > > > >
> > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka
> > issues.
> > > > > > > > Following
> > > > > > > > > is the snippet i am working on, and will post more details
> > > once i
> > > > > get
> > > > > > > it
> > > > > > > > > working ( as of now i am unable to read messages from Kafka
> > > using
> > > > > > > > > DirectRunner)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > PipelineOptions pipelineOptions =
> > > > PipelineOptionsFactory.create();
> > > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > >         .withMaxNumRecords(500)
> > > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > > "true",
> > > > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > "earliest"
> > > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > > byte[]>,
> > > > > > > > > KV<String, String>>() {
> > > > > > > > >     @Override
> > > > > > > > >     public void processElement(ProcessContext c) throws
> > > > Exception {
> > > > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > > > String(record.getValue())));
> > > > > > > > >     }
> > > > > > > > > }))
> > > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > > > > >                 .withAllowedLateness(Duration.
> > > standardSeconds(1))
> > > > > > > > >                 .triggering(
> > > > > > > > >                         Repeatedly.forever(
> > > > > > > > >                                 AfterFirst.of(
> > > > > > > > >
> > > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > > >
> > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > > >
> > > > > >  AfterPane.elementCountAtLeast(
> > > > > > > > 100)
> > > > > > > > >                                 )))
> > > > > > > > >                 .discardingFiredPanes())
> > > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> > Iterable<String>>,
> > > > > > Void>()
> > > > > > > {
> > > > > > > > >             @Override
> > > > > > > > >             public void processElement(ProcessContext c)
> > throws
> > > > > > > > Exception {
> > > > > > > > >                 KV<String, Iterable<String>> element =
> > > > c.element();
> > > > > > > > >                 Iterator<String> iterator =
> > > > > > > > element.getValue().iterator();
> > > > > > > > >                 int count = 0;
> > > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > > >                     iterator.next();
> > > > > > > > >                     count++;
> > > > > > > > >                 }
> > > > > > > > >                 System.out.println(String.format("Key %s
> > Value
> > > > > Count
> > > > > > > > > %d", element.getKey(), count));
> > > > > > > > >             }
> > > > > > > > >         }));
> > > > > > > > > pipeline.run();
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > > <tgroh@google.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > If you use the DirectRunner, do you observe the same
> > > behavior?
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas
> > > > > > > > > > >
> > > > > > > > > > > I am using FlinkRunner.  Yes the second part of trigger
> > > never
> > > > > > fires
> > > > > > > > for
> > > > > > > > > > me,
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey Sumit;
> > > > > > > > > > > >
> > > > > > > > > > > > What runner are you using? I can set up a test with
> the
> > > > same
> > > > > > > > trigger
> > > > > > > > > > > > reading from an unbounded input using the
> DirectRunner
> > > and
> > > > I
> > > > > > get
> > > > > > > > the
> > > > > > > > > > > > expected output panes.
> > > > > > > > > > > >
> > > > > > > > > > > > Just to clarify, the second half of the trigger
> ('when
> > > the
> > > > > > first
> > > > > > > > > > element
> > > > > > > > > > > > has been there for at least 30+ seconds') simply
> never
> > > > fires?
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > > >
> > > > > > > > > > > > > That did not work.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > > >
> > > > > > > > > > > > > .triggering(
> > > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > > >                               AfterProcessingTime.
> > > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > > >
> > > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > > >
> > > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > > >                         )))
> > > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > > >
> > > > > > > > > > > > > What i am trying to do here.  This is to make sure
> > that
> > > > > > > followup
> > > > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2.  Or Fire when the first element has been there
> for
> > > > > atleast
> > > > > > > 30
> > > > > > > > > > sec+.
> > > > > > > > > > > > >
> > > > > > > > > > > > > However,  2 point does not seem to work.  e.g. I
> have
> > > 540
> > > > > > > records
> > > > > > > > > in
> > > > > > > > > > > > > Kafka.  The first 500 records are available
> > > immediately,
> > > > > > > > > > > > >
> > > > > > > > > > > > > but the remaining 40 don't pass through. I was
> > > expecting
> > > > > 2nd
> > > > > > to
> > > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards
> > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > You can adjust the trigger in the windowing
> > transform
> > > > if
> > > > > > your
> > > > > > > > > sink
> > > > > > > > > > > can
> > > > > > > > > > > > > > handle being written to multiple times for the
> same
> > > > > window.
> > > > > > > For
> > > > > > > > > > > > example,
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > the sink appends to the output when it receives
> new
> > > > data
> > > > > > in a
> > > > > > > > > > window,
> > > > > > > > > > > > you
> > > > > > > > > > > > > > could add something like
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > > AfterProcessingTime.
> > > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > > withLateFirings(AfterPane.
> > elementCountAtLeast(1))).
> > > > > > discardin
> > > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This will cause elements to be output some amount
> > of
> > > > time
> > > > > > > after
> > > > > > > > > > they
> > > > > > > > > > > > are
> > > > > > > > > > > > > > first received from Kafka, even if Kafka does not
> > > have
> > > > > any
> > > > > > > new
> > > > > > > > > > > > elements.
> > > > > > > > > > > > > > Elements will only be output by the GroupByKey
> > once.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We should still have a JIRA to improve the
> KafkaIO
> > > > > > watermark
> > > > > > > > > > tracking
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I don't have much control over changing KafkaIO
> > > > > > properties.
> > > > > > > > I
> > > > > > > > > > > added
> > > > > > > > > > > > > > > KafkaIO code for completing the example.  Are
> > there
> > > > any
> > > > > > > > changes
> > > > > > > > > > > that
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The default implementation returns processing
> > > > > timestamp
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > last
> > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > (in effect. more accurately it returns same
> as
> > > > > > > > > getTimestamp(),
> > > > > > > > > > > > which
> > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > As a work around, yes, you can provide your
> own
> > > > > > > watermarkFn
> > > > > > > > > > that
> > > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec.
> (usage
> > > in
> > > > > > > javadoc
> > > > > > > > > > > > > > > > <https://github.com/apache/
> > > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I think default watermark should be smarter.
> it
> > > > > should
> > > > > > > > > advance
> > > > > > > > > > to
> > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > time if there aren't any records to read from
> > > > Kafka.
> > > > > > > Could
> > > > > > > > > you
> > > > > > > > > > > > file a
> > > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM,
> Chawla,Sumit <
> > > > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I am trying to do some simple batch
> > processing
> > > on
> > > > > > > KafkaIO
> > > > > > > > > > > > records.
> > > > > > > > > > > > > > My
> > > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > > s"mytopic"))
> > > > > > > > > > > > > > > > >         .withBootstrapServers("
> > > localhost:9200")
> > > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > > > ExtractKVMessage()))
> > > > > > > > > //
> > > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > My Kafka Source already has some messages
> > > 1000+,
> > > > > and
> > > > > > > new
> > > > > > > > > > > messages
> > > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > When i start my pipeline,  i can see that
> it
> > > > reads
> > > > > > all
> > > > > > > > the
> > > > > > > > > > > 1000+
> > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > from Kafka.  However, Window does not fire
> > > > untill a
> > > > > > new
> > > > > > > > > > message
> > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > Kafka.  And Sink does not receive any
> message
> > > > until
> > > > > > > that
> > > > > > > > > > point.
> > > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i
> am
> > > not
> > > > > > > > providing
> > > > > > > > > > any
> > > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > > , i am assuming that timestamps will be
> > > assigned
> > > > as
> > > > > > in
> > > > > > > > when
> > > > > > > > > > > > message
> > > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > > > > WaterMarkFn
> > > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > > the Window not supposed to be fired based
> on
> > > > > > Ingestion
> > > > > > > > > time?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Thomas Groh <tg...@google.com.INVALID>.
In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner with
the DirectRunner (formerly InProcessPipelineRunner), which is capable of
handling Unbounded Pipelines. Is it possible for you to upgrade?

On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> @Ajioscha,  My assumption is here that atleast one trigger should fire.
> Either the 100 elements or the 30 second since first element. (whichever
> happens first)
>
> @Thomas - here is the error i get: I am using 0.1.0-incubating
>
> *ava.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)*
>
> * at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:225)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)*
> * a*
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > could the reason for the second part of the trigger never firing be that
> > there are never at least 100 elements per key. The trigger would only
> fire
> > if it saw 100 elements and with only 540 elements that seems unlikely if
> > you have more than 6 keys.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid>
> wrote:
> >
> > > KafkaIO is implemented using the UnboundedRead API, which is supported
> by
> > > the DirectRunner. You should be able to run without the
> > withMaxNumRecords;
> > > if you can't, I'd be very interested to see the stack trace that you
> get
> > > when you try to run the Pipeline.
> > >
> > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <sumitkchawla@gmail.com
> >
> > > wrote:
> > >
> > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > Read(UnboundedSourceOfKafka)
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > aljoscha@apache.org>
> > > > wrote:
> > > >
> > > > > Ah ok, this might be a stupid question but did you remove this line
> > > when
> > > > > running it with Flink:
> > > > >         .withMaxNumRecords(500)
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha
> > > > > >
> > > > > > The code is not different while running on Flink.  It have
> removed
> > > > > business
> > > > > > specific transformations only.
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > could you maybe also post the complete that you're using with
> the
> > > > > > > FlinkRunner? I could have a look into it.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Thomas
> > > > > > > >
> > > > > > > > Sorry i tried with DirectRunner but ran into some kafka
> issues.
> > > > > > > Following
> > > > > > > > is the snippet i am working on, and will post more details
> > once i
> > > > get
> > > > > > it
> > > > > > > > working ( as of now i am unable to read messages from Kafka
> > using
> > > > > > > > DirectRunner)
> > > > > > > >
> > > > > > > >
> > > > > > > > PipelineOptions pipelineOptions =
> > > PipelineOptionsFactory.create();
> > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > >         .withMaxNumRecords(500)
> > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > "true",
> > > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > "earliest"
> > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > byte[]>,
> > > > > > > > KV<String, String>>() {
> > > > > > > >     @Override
> > > > > > > >     public void processElement(ProcessContext c) throws
> > > Exception {
> > > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > > String(record.getValue())));
> > > > > > > >     }
> > > > > > > > }))
> > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > > > >                 .withAllowedLateness(Duration.
> > standardSeconds(1))
> > > > > > > >                 .triggering(
> > > > > > > >                         Repeatedly.forever(
> > > > > > > >                                 AfterFirst.of(
> > > > > > > >
> > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > >
> > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > >
> > > > >  AfterPane.elementCountAtLeast(
> > > > > > > 100)
> > > > > > > >                                 )))
> > > > > > > >                 .discardingFiredPanes())
> > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> Iterable<String>>,
> > > > > Void>()
> > > > > > {
> > > > > > > >             @Override
> > > > > > > >             public void processElement(ProcessContext c)
> throws
> > > > > > > Exception {
> > > > > > > >                 KV<String, Iterable<String>> element =
> > > c.element();
> > > > > > > >                 Iterator<String> iterator =
> > > > > > > element.getValue().iterator();
> > > > > > > >                 int count = 0;
> > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > >                     iterator.next();
> > > > > > > >                     count++;
> > > > > > > >                 }
> > > > > > > >                 System.out.println(String.format("Key %s
> Value
> > > > Count
> > > > > > > > %d", element.getKey(), count));
> > > > > > > >             }
> > > > > > > >         }));
> > > > > > > > pipeline.run();
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > <tgroh@google.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > If you use the DirectRunner, do you observe the same
> > behavior?
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > > sumitkchawla@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Thomas
> > > > > > > > > >
> > > > > > > > > > I am using FlinkRunner.  Yes the second part of trigger
> > never
> > > > > fires
> > > > > > > for
> > > > > > > > > me,
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > > <tgroh@google.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Sumit;
> > > > > > > > > > >
> > > > > > > > > > > What runner are you using? I can set up a test with the
> > > same
> > > > > > > trigger
> > > > > > > > > > > reading from an unbounded input using the DirectRunner
> > and
> > > I
> > > > > get
> > > > > > > the
> > > > > > > > > > > expected output panes.
> > > > > > > > > > >
> > > > > > > > > > > Just to clarify, the second half of the trigger ('when
> > the
> > > > > first
> > > > > > > > > element
> > > > > > > > > > > has been there for at least 30+ seconds') simply never
> > > fires?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > >
> > > > > > > > > > > > That did not work.
> > > > > > > > > > > >
> > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > >
> > > > > > > > > > > > .triggering(
> > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > >                               AfterProcessingTime.
> > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > >
> > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > >
> > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > >                         )))
> > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > >
> > > > > > > > > > > > What i am trying to do here.  This is to make sure
> that
> > > > > > followup
> > > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > > >
> > > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > > >
> > > > > > > > > > > > 2.  Or Fire when the first element has been there for
> > > > atleast
> > > > > > 30
> > > > > > > > > sec+.
> > > > > > > > > > > >
> > > > > > > > > > > > However,  2 point does not seem to work.  e.g. I have
> > 540
> > > > > > records
> > > > > > > > in
> > > > > > > > > > > > Kafka.  The first 500 records are available
> > immediately,
> > > > > > > > > > > >
> > > > > > > > > > > > but the remaining 40 don't pass through. I was
> > expecting
> > > > 2nd
> > > > > to
> > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Regards
> > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > You can adjust the trigger in the windowing
> transform
> > > if
> > > > > your
> > > > > > > > sink
> > > > > > > > > > can
> > > > > > > > > > > > > handle being written to multiple times for the same
> > > > window.
> > > > > > For
> > > > > > > > > > > example,
> > > > > > > > > > > > if
> > > > > > > > > > > > > the sink appends to the output when it receives new
> > > data
> > > > > in a
> > > > > > > > > window,
> > > > > > > > > > > you
> > > > > > > > > > > > > could add something like
> > > > > > > > > > > > >
> > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > AfterProcessingTime.
> > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > withLateFirings(AfterPane.
> elementCountAtLeast(1))).
> > > > > discardin
> > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > >
> > > > > > > > > > > > > This will cause elements to be output some amount
> of
> > > time
> > > > > > after
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > > first received from Kafka, even if Kafka does not
> > have
> > > > any
> > > > > > new
> > > > > > > > > > > elements.
> > > > > > > > > > > > > Elements will only be output by the GroupByKey
> once.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We should still have a JIRA to improve the KafkaIO
> > > > > watermark
> > > > > > > > > tracking
> > > > > > > > > > > in
> > > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I don't have much control over changing KafkaIO
> > > > > properties.
> > > > > > > I
> > > > > > > > > > added
> > > > > > > > > > > > > > KafkaIO code for completing the example.  Are
> there
> > > any
> > > > > > > changes
> > > > > > > > > > that
> > > > > > > > > > > > can
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The default implementation returns processing
> > > > timestamp
> > > > > > of
> > > > > > > > the
> > > > > > > > > > last
> > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > (in effect. more accurately it returns same as
> > > > > > > > getTimestamp(),
> > > > > > > > > > > which
> > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > As a work around, yes, you can provide your own
> > > > > > watermarkFn
> > > > > > > > > that
> > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage
> > in
> > > > > > javadoc
> > > > > > > > > > > > > > > <https://github.com/apache/
> > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think default watermark should be smarter. it
> > > > should
> > > > > > > > advance
> > > > > > > > > to
> > > > > > > > > > > > > current
> > > > > > > > > > > > > > > time if there aren't any records to read from
> > > Kafka.
> > > > > > Could
> > > > > > > > you
> > > > > > > > > > > file a
> > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I am trying to do some simple batch
> processing
> > on
> > > > > > KafkaIO
> > > > > > > > > > > records.
> > > > > > > > > > > > > My
> > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > s"mytopic"))
> > > > > > > > > > > > > > > >         .withBootstrapServers("
> > localhost:9200")
> > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > > ExtractKVMessage()))
> > > > > > > > //
> > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > My Kafka Source already has some messages
> > 1000+,
> > > > and
> > > > > > new
> > > > > > > > > > messages
> > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > When i start my pipeline,  i can see that it
> > > reads
> > > > > all
> > > > > > > the
> > > > > > > > > > 1000+
> > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > from Kafka.  However, Window does not fire
> > > untill a
> > > > > new
> > > > > > > > > message
> > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > Kafka.  And Sink does not receive any message
> > > until
> > > > > > that
> > > > > > > > > point.
> > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am
> > not
> > > > > > > providing
> > > > > > > > > any
> > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > , i am assuming that timestamps will be
> > assigned
> > > as
> > > > > in
> > > > > > > when
> > > > > > > > > > > message
> > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > > > WaterMarkFn
> > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > the Window not supposed to be fired based on
> > > > > Ingestion
> > > > > > > > time?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
@Ajioscha,  My assumption is here that atleast one trigger should fire.
Either the 100 elements or the 30 second since first element. (whichever
happens first)

@Thomas - here is the error i get: I am using 0.1.0-incubating

*ava.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource)*

* at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitPrimitiveTransform(DirectPipelineRunner.java:890)*
* at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)*
* at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)*
* at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)*
* a*

Regards
Sumit Chawla


On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> could the reason for the second part of the trigger never firing be that
> there are never at least 100 elements per key. The trigger would only fire
> if it saw 100 elements and with only 540 elements that seems unlikely if
> you have more than 6 keys.
>
> Cheers,
> Aljoscha
>
> On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid> wrote:
>
> > KafkaIO is implemented using the UnboundedRead API, which is supported by
> > the DirectRunner. You should be able to run without the
> withMaxNumRecords;
> > if you can't, I'd be very interested to see the stack trace that you get
> > when you try to run the Pipeline.
> >
> > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Yes.  I added it only for DirectRunner as it cannot translate
> > > Read(UnboundedSourceOfKafka)
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> > > wrote:
> > >
> > > > Ah ok, this might be a stupid question but did you remove this line
> > when
> > > > running it with Flink:
> > > >         .withMaxNumRecords(500)
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Aljoscha
> > > > >
> > > > > The code is not different while running on Flink.  It have removed
> > > > business
> > > > > specific transformations only.
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > could you maybe also post the complete that you're using with the
> > > > > > FlinkRunner? I could have a look into it.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> sumitkchawla@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Thomas
> > > > > > >
> > > > > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > > > > Following
> > > > > > > is the snippet i am working on, and will post more details
> once i
> > > get
> > > > > it
> > > > > > > working ( as of now i am unable to read messages from Kafka
> using
> > > > > > > DirectRunner)
> > > > > > >
> > > > > > >
> > > > > > > PipelineOptions pipelineOptions =
> > PipelineOptionsFactory.create();
> > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > >         .withMaxNumRecords(500)
> > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "true",
> > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > "earliest"
> > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> byte[]>,
> > > > > > > KV<String, String>>() {
> > > > > > >     @Override
> > > > > > >     public void processElement(ProcessContext c) throws
> > Exception {
> > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > String(record.getValue())));
> > > > > > >     }
> > > > > > > }))
> > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > > >                 .withAllowedLateness(Duration.
> standardSeconds(1))
> > > > > > >                 .triggering(
> > > > > > >                         Repeatedly.forever(
> > > > > > >                                 AfterFirst.of(
> > > > > > >
> > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > >
> > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > >
> > > >  AfterPane.elementCountAtLeast(
> > > > > > 100)
> > > > > > >                                 )))
> > > > > > >                 .discardingFiredPanes())
> > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>,
> > > > Void>()
> > > > > {
> > > > > > >             @Override
> > > > > > >             public void processElement(ProcessContext c) throws
> > > > > > Exception {
> > > > > > >                 KV<String, Iterable<String>> element =
> > c.element();
> > > > > > >                 Iterator<String> iterator =
> > > > > > element.getValue().iterator();
> > > > > > >                 int count = 0;
> > > > > > >                 while (iterator.hasNext()) {
> > > > > > >                     iterator.next();
> > > > > > >                     count++;
> > > > > > >                 }
> > > > > > >                 System.out.println(String.format("Key %s Value
> > > Count
> > > > > > > %d", element.getKey(), count));
> > > > > > >             }
> > > > > > >         }));
> > > > > > > pipeline.run();
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > <tgroh@google.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > If you use the DirectRunner, do you observe the same
> behavior?
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > sumitkchawla@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Thomas
> > > > > > > > >
> > > > > > > > > I am using FlinkRunner.  Yes the second part of trigger
> never
> > > > fires
> > > > > > for
> > > > > > > > me,
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > <tgroh@google.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Sumit;
> > > > > > > > > >
> > > > > > > > > > What runner are you using? I can set up a test with the
> > same
> > > > > > trigger
> > > > > > > > > > reading from an unbounded input using the DirectRunner
> and
> > I
> > > > get
> > > > > > the
> > > > > > > > > > expected output panes.
> > > > > > > > > >
> > > > > > > > > > Just to clarify, the second half of the trigger ('when
> the
> > > > first
> > > > > > > > element
> > > > > > > > > > has been there for at least 30+ seconds') simply never
> > fires?
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas
> > > > > > > > > > >
> > > > > > > > > > > That did not work.
> > > > > > > > > > >
> > > > > > > > > > > I tried following instead:
> > > > > > > > > > >
> > > > > > > > > > > .triggering(
> > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > >                               AfterProcessingTime.
> > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > >
> > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > Seconds(30)),
> > > > > > > > > > >
> > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > >                         )))
> > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > >
> > > > > > > > > > > What i am trying to do here.  This is to make sure that
> > > > > followup
> > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > >
> > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > >
> > > > > > > > > > > 2.  Or Fire when the first element has been there for
> > > atleast
> > > > > 30
> > > > > > > > sec+.
> > > > > > > > > > >
> > > > > > > > > > > However,  2 point does not seem to work.  e.g. I have
> 540
> > > > > records
> > > > > > > in
> > > > > > > > > > > Kafka.  The first 500 records are available
> immediately,
> > > > > > > > > > >
> > > > > > > > > > > but the remaining 40 don't pass through. I was
> expecting
> > > 2nd
> > > > to
> > > > > > > > > > > trigger to help here.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > You can adjust the trigger in the windowing transform
> > if
> > > > your
> > > > > > > sink
> > > > > > > > > can
> > > > > > > > > > > > handle being written to multiple times for the same
> > > window.
> > > > > For
> > > > > > > > > > example,
> > > > > > > > > > > if
> > > > > > > > > > > > the sink appends to the output when it receives new
> > data
> > > > in a
> > > > > > > > window,
> > > > > > > > > > you
> > > > > > > > > > > > could add something like
> > > > > > > > > > > >
> > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> AfterProcessingTime.
> > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).
> > > > discardin
> > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > >
> > > > > > > > > > > > This will cause elements to be output some amount of
> > time
> > > > > after
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > > first received from Kafka, even if Kafka does not
> have
> > > any
> > > > > new
> > > > > > > > > > elements.
> > > > > > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > > > > > >
> > > > > > > > > > > > We should still have a JIRA to improve the KafkaIO
> > > > watermark
> > > > > > > > tracking
> > > > > > > > > > in
> > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I don't have much control over changing KafkaIO
> > > > properties.
> > > > > > I
> > > > > > > > > added
> > > > > > > > > > > > > KafkaIO code for completing the example.  Are there
> > any
> > > > > > changes
> > > > > > > > > that
> > > > > > > > > > > can
> > > > > > > > > > > > be
> > > > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards
> > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > The default implementation returns processing
> > > timestamp
> > > > > of
> > > > > > > the
> > > > > > > > > last
> > > > > > > > > > > > > record
> > > > > > > > > > > > > > (in effect. more accurately it returns same as
> > > > > > > getTimestamp(),
> > > > > > > > > > which
> > > > > > > > > > > > > might
> > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > As a work around, yes, you can provide your own
> > > > > watermarkFn
> > > > > > > > that
> > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage
> in
> > > > > javadoc
> > > > > > > > > > > > > > <https://github.com/apache/
> > > incubator-beam/blob/master/
> > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > )
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think default watermark should be smarter. it
> > > should
> > > > > > > advance
> > > > > > > > to
> > > > > > > > > > > > current
> > > > > > > > > > > > > > time if there aren't any records to read from
> > Kafka.
> > > > > Could
> > > > > > > you
> > > > > > > > > > file a
> > > > > > > > > > > > > jira?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I am trying to do some simple batch processing
> on
> > > > > KafkaIO
> > > > > > > > > > records.
> > > > > > > > > > > > My
> > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> s"mytopic"))
> > > > > > > > > > > > > > >         .withBootstrapServers("
> localhost:9200")
> > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > ExtractKVMessage()))
> > > > > > > //
> > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > My Kafka Source already has some messages
> 1000+,
> > > and
> > > > > new
> > > > > > > > > messages
> > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > When i start my pipeline,  i can see that it
> > reads
> > > > all
> > > > > > the
> > > > > > > > > 1000+
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > from Kafka.  However, Window does not fire
> > untill a
> > > > new
> > > > > > > > message
> > > > > > > > > > > > arrives
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > Kafka.  And Sink does not receive any message
> > until
> > > > > that
> > > > > > > > point.
> > > > > > > > > > > Do i
> > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am
> not
> > > > > > providing
> > > > > > > > any
> > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > , i am assuming that timestamps will be
> assigned
> > as
> > > > in
> > > > > > when
> > > > > > > > > > message
> > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > > WaterMarkFn
> > > > > > > > > > > implementation?
> > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > the Window not supposed to be fired based on
> > > > Ingestion
> > > > > > > time?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
could the reason for the second part of the trigger never firing be that
there are never at least 100 elements per key. The trigger would only fire
if it saw 100 elements and with only 540 elements that seems unlikely if
you have more than 6 keys.

Cheers,
Aljoscha

On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid> wrote:

> KafkaIO is implemented using the UnboundedRead API, which is supported by
> the DirectRunner. You should be able to run without the withMaxNumRecords;
> if you can't, I'd be very interested to see the stack trace that you get
> when you try to run the Pipeline.
>
> On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Yes.  I added it only for DirectRunner as it cannot translate
> > Read(UnboundedSourceOfKafka)
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Ah ok, this might be a stupid question but did you remove this line
> when
> > > running it with Flink:
> > >         .withMaxNumRecords(500)
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com>
> > wrote:
> > >
> > > > Hi Aljoscha
> > > >
> > > > The code is not different while running on Flink.  It have removed
> > > business
> > > > specific transformations only.
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > could you maybe also post the complete that you're using with the
> > > > > FlinkRunner? I could have a look into it.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <su...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Thomas
> > > > > >
> > > > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > > > Following
> > > > > > is the snippet i am working on, and will post more details once i
> > get
> > > > it
> > > > > > working ( as of now i am unable to read messages from Kafka using
> > > > > > DirectRunner)
> > > > > >
> > > > > >
> > > > > > PipelineOptions pipelineOptions =
> PipelineOptionsFactory.create();
> > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > pipeline.apply(KafkaIO.read()
> > > > > >         .withMaxNumRecords(500)
> > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > >         .withBootstrapServers("localhost:9092")
> > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > "earliest"
> > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > > > > > KV<String, String>>() {
> > > > > >     @Override
> > > > > >     public void processElement(ProcessContext c) throws
> Exception {
> > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > String(record.getValue())));
> > > > > >     }
> > > > > > }))
> > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > >                 .withAllowedLateness(Duration.standardSeconds(1))
> > > > > >                 .triggering(
> > > > > >                         Repeatedly.forever(
> > > > > >                                 AfterFirst.of(
> > > > > >
> > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > >
> > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > >
> > >  AfterPane.elementCountAtLeast(
> > > > > 100)
> > > > > >                                 )))
> > > > > >                 .discardingFiredPanes())
> > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>,
> > > Void>()
> > > > {
> > > > > >             @Override
> > > > > >             public void processElement(ProcessContext c) throws
> > > > > Exception {
> > > > > >                 KV<String, Iterable<String>> element =
> c.element();
> > > > > >                 Iterator<String> iterator =
> > > > > element.getValue().iterator();
> > > > > >                 int count = 0;
> > > > > >                 while (iterator.hasNext()) {
> > > > > >                     iterator.next();
> > > > > >                     count++;
> > > > > >                 }
> > > > > >                 System.out.println(String.format("Key %s Value
> > Count
> > > > > > %d", element.getKey(), count));
> > > > > >             }
> > > > > >         }));
> > > > > > pipeline.run();
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > <tgroh@google.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > If you use the DirectRunner, do you observe the same behavior?
> > > > > > >
> > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > sumitkchawla@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Thomas
> > > > > > > >
> > > > > > > > I am using FlinkRunner.  Yes the second part of trigger never
> > > fires
> > > > > for
> > > > > > > me,
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > <tgroh@google.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Sumit;
> > > > > > > > >
> > > > > > > > > What runner are you using? I can set up a test with the
> same
> > > > > trigger
> > > > > > > > > reading from an unbounded input using the DirectRunner and
> I
> > > get
> > > > > the
> > > > > > > > > expected output panes.
> > > > > > > > >
> > > > > > > > > Just to clarify, the second half of the trigger ('when the
> > > first
> > > > > > > element
> > > > > > > > > has been there for at least 30+ seconds') simply never
> fires?
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > sumitkchawla@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Thomas
> > > > > > > > > >
> > > > > > > > > > That did not work.
> > > > > > > > > >
> > > > > > > > > > I tried following instead:
> > > > > > > > > >
> > > > > > > > > > .triggering(
> > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > >                               AfterProcessingTime.
> > > > > > > > > pastFirstElementInPane()
> > > > > > > > > >
> > >  .plusDelayOf(Duration.standard
> > > > > > > > > > Seconds(30)),
> > > > > > > > > >
> > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > >                         )))
> > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > >
> > > > > > > > > > What i am trying to do here.  This is to make sure that
> > > > followup
> > > > > > > > > > operations receive batches of records.
> > > > > > > > > >
> > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > >
> > > > > > > > > > 2.  Or Fire when the first element has been there for
> > atleast
> > > > 30
> > > > > > > sec+.
> > > > > > > > > >
> > > > > > > > > > However,  2 point does not seem to work.  e.g. I have 540
> > > > records
> > > > > > in
> > > > > > > > > > Kafka.  The first 500 records are available immediately,
> > > > > > > > > >
> > > > > > > > > > but the remaining 40 don't pass through. I was expecting
> > 2nd
> > > to
> > > > > > > > > > trigger to help here.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > <tgroh@google.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > You can adjust the trigger in the windowing transform
> if
> > > your
> > > > > > sink
> > > > > > > > can
> > > > > > > > > > > handle being written to multiple times for the same
> > window.
> > > > For
> > > > > > > > > example,
> > > > > > > > > > if
> > > > > > > > > > > the sink appends to the output when it receives new
> data
> > > in a
> > > > > > > window,
> > > > > > > > > you
> > > > > > > > > > > could add something like
> > > > > > > > > > >
> > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > standardSeconds(5))).
> > > > > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).
> > > discardin
> > > > > > > > > > gFiredPanes();
> > > > > > > > > > >
> > > > > > > > > > > This will cause elements to be output some amount of
> time
> > > > after
> > > > > > > they
> > > > > > > > > are
> > > > > > > > > > > first received from Kafka, even if Kafka does not have
> > any
> > > > new
> > > > > > > > > elements.
> > > > > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > > > > >
> > > > > > > > > > > We should still have a JIRA to improve the KafkaIO
> > > watermark
> > > > > > > tracking
> > > > > > > > > in
> > > > > > > > > > > the absence of new records .
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > >
> > > > > > > > > > > > I don't have much control over changing KafkaIO
> > > properties.
> > > > > I
> > > > > > > > added
> > > > > > > > > > > > KafkaIO code for completing the example.  Are there
> any
> > > > > changes
> > > > > > > > that
> > > > > > > > > > can
> > > > > > > > > > > be
> > > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > > >
> > > > > > > > > > > > Regards
> > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > The default implementation returns processing
> > timestamp
> > > > of
> > > > > > the
> > > > > > > > last
> > > > > > > > > > > > record
> > > > > > > > > > > > > (in effect. more accurately it returns same as
> > > > > > getTimestamp(),
> > > > > > > > > which
> > > > > > > > > > > > might
> > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > >
> > > > > > > > > > > > > As a work around, yes, you can provide your own
> > > > watermarkFn
> > > > > > > that
> > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage in
> > > > javadoc
> > > > > > > > > > > > > <https://github.com/apache/
> > incubator-beam/blob/master/
> > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > )
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think default watermark should be smarter. it
> > should
> > > > > > advance
> > > > > > > to
> > > > > > > > > > > current
> > > > > > > > > > > > > time if there aren't any records to read from
> Kafka.
> > > > Could
> > > > > > you
> > > > > > > > > file a
> > > > > > > > > > > > jira?
> > > > > > > > > > > > >
> > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I am trying to do some simple batch processing on
> > > > KafkaIO
> > > > > > > > > records.
> > > > > > > > > > > My
> > > > > > > > > > > > > beam
> > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > ExtractKVMessage()))
> > > > > > //
> > > > > > > > > > Emits a
> > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > My Kafka Source already has some messages 1000+,
> > and
> > > > new
> > > > > > > > messages
> > > > > > > > > > > > arrive
> > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > When i start my pipeline,  i can see that it
> reads
> > > all
> > > > > the
> > > > > > > > 1000+
> > > > > > > > > > > > messages
> > > > > > > > > > > > > > from Kafka.  However, Window does not fire
> untill a
> > > new
> > > > > > > message
> > > > > > > > > > > arrives
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > Kafka.  And Sink does not receive any message
> until
> > > > that
> > > > > > > point.
> > > > > > > > > > Do i
> > > > > > > > > > > > > need
> > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am not
> > > > > providing
> > > > > > > any
> > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > , i am assuming that timestamps will be assigned
> as
> > > in
> > > > > when
> > > > > > > > > message
> > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > WaterMarkFn
> > > > > > > > > > implementation?
> > > > > > > > > > > > Is
> > > > > > > > > > > > > > the Window not supposed to be fired based on
> > > Ingestion
> > > > > > time?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Thomas Groh <tg...@google.com.INVALID>.
KafkaIO is implemented using the UnboundedRead API, which is supported by
the DirectRunner. You should be able to run without the withMaxNumRecords;
if you can't, I'd be very interested to see the stack trace that you get
when you try to run the Pipeline.

On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> Yes.  I added it only for DirectRunner as it cannot translate
> Read(UnboundedSourceOfKafka)
>
> Regards
> Sumit Chawla
>
>
> On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Ah ok, this might be a stupid question but did you remove this line when
> > running it with Flink:
> >         .withMaxNumRecords(500)
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com>
> wrote:
> >
> > > Hi Aljoscha
> > >
> > > The code is not different while running on Flink.  It have removed
> > business
> > > specific transformations only.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > could you maybe also post the complete that you're using with the
> > > > FlinkRunner? I could have a look into it.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Thomas
> > > > >
> > > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > > Following
> > > > > is the snippet i am working on, and will post more details once i
> get
> > > it
> > > > > working ( as of now i am unable to read messages from Kafka using
> > > > > DirectRunner)
> > > > >
> > > > >
> > > > > PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > pipeline.apply(KafkaIO.read()
> > > > >         .withMaxNumRecords(500)
> > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > >         .withBootstrapServers("localhost:9092")
> > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest"
> > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > > > > KV<String, String>>() {
> > > > >     @Override
> > > > >     public void processElement(ProcessContext c) throws Exception {
> > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > String(record.getValue())));
> > > > >     }
> > > > > }))
> > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > >                 .withAllowedLateness(Duration.standardSeconds(1))
> > > > >                 .triggering(
> > > > >                         Repeatedly.forever(
> > > > >                                 AfterFirst.of(
> > > > >
> > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > >
> > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > >
> >  AfterPane.elementCountAtLeast(
> > > > 100)
> > > > >                                 )))
> > > > >                 .discardingFiredPanes())
> > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>,
> > Void>()
> > > {
> > > > >             @Override
> > > > >             public void processElement(ProcessContext c) throws
> > > > Exception {
> > > > >                 KV<String, Iterable<String>> element = c.element();
> > > > >                 Iterator<String> iterator =
> > > > element.getValue().iterator();
> > > > >                 int count = 0;
> > > > >                 while (iterator.hasNext()) {
> > > > >                     iterator.next();
> > > > >                     count++;
> > > > >                 }
> > > > >                 System.out.println(String.format("Key %s Value
> Count
> > > > > %d", element.getKey(), count));
> > > > >             }
> > > > >         }));
> > > > > pipeline.run();
> > > > >
> > > > >
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > <tgroh@google.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > If you use the DirectRunner, do you observe the same behavior?
> > > > > >
> > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > sumitkchawla@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Thomas
> > > > > > >
> > > > > > > I am using FlinkRunner.  Yes the second part of trigger never
> > fires
> > > > for
> > > > > > me,
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > <tgroh@google.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Sumit;
> > > > > > > >
> > > > > > > > What runner are you using? I can set up a test with the same
> > > > trigger
> > > > > > > > reading from an unbounded input using the DirectRunner and I
> > get
> > > > the
> > > > > > > > expected output panes.
> > > > > > > >
> > > > > > > > Just to clarify, the second half of the trigger ('when the
> > first
> > > > > > element
> > > > > > > > has been there for at least 30+ seconds') simply never fires?
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > sumitkchawla@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Thomas
> > > > > > > > >
> > > > > > > > > That did not work.
> > > > > > > > >
> > > > > > > > > I tried following instead:
> > > > > > > > >
> > > > > > > > > .triggering(
> > > > > > > > >         Repeatedly.forever(
> > > > > > > > >                 AfterFirst.of(
> > > > > > > > >                               AfterProcessingTime.
> > > > > > > > pastFirstElementInPane()
> > > > > > > > >
> >  .plusDelayOf(Duration.standard
> > > > > > > > > Seconds(30)),
> > > > > > > > >
> > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > >                         )))
> > > > > > > > > .discardingFiredPanes()
> > > > > > > > >
> > > > > > > > > What i am trying to do here.  This is to make sure that
> > > followup
> > > > > > > > > operations receive batches of records.
> > > > > > > > >
> > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > >
> > > > > > > > > 2.  Or Fire when the first element has been there for
> atleast
> > > 30
> > > > > > sec+.
> > > > > > > > >
> > > > > > > > > However,  2 point does not seem to work.  e.g. I have 540
> > > records
> > > > > in
> > > > > > > > > Kafka.  The first 500 records are available immediately,
> > > > > > > > >
> > > > > > > > > but the remaining 40 don't pass through. I was expecting
> 2nd
> > to
> > > > > > > > > trigger to help here.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > <tgroh@google.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > You can adjust the trigger in the windowing transform if
> > your
> > > > > sink
> > > > > > > can
> > > > > > > > > > handle being written to multiple times for the same
> window.
> > > For
> > > > > > > > example,
> > > > > > > > > if
> > > > > > > > > > the sink appends to the output when it receives new data
> > in a
> > > > > > window,
> > > > > > > > you
> > > > > > > > > > could add something like
> > > > > > > > > >
> > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > triggering(AfterWatermark.
> > > > > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > standardSeconds(5))).
> > > > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).
> > discardin
> > > > > > > > > gFiredPanes();
> > > > > > > > > >
> > > > > > > > > > This will cause elements to be output some amount of time
> > > after
> > > > > > they
> > > > > > > > are
> > > > > > > > > > first received from Kafka, even if Kafka does not have
> any
> > > new
> > > > > > > > elements.
> > > > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > > > >
> > > > > > > > > > We should still have a JIRA to improve the KafkaIO
> > watermark
> > > > > > tracking
> > > > > > > > in
> > > > > > > > > > the absence of new records .
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > sumitkchawla@gmail.com
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > >
> > > > > > > > > > > I don't have much control over changing KafkaIO
> > properties.
> > > > I
> > > > > > > added
> > > > > > > > > > > KafkaIO code for completing the example.  Are there any
> > > > changes
> > > > > > > that
> > > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > The default implementation returns processing
> timestamp
> > > of
> > > > > the
> > > > > > > last
> > > > > > > > > > > record
> > > > > > > > > > > > (in effect. more accurately it returns same as
> > > > > getTimestamp(),
> > > > > > > > which
> > > > > > > > > > > might
> > > > > > > > > > > > overridden by user).
> > > > > > > > > > > >
> > > > > > > > > > > > As a work around, yes, you can provide your own
> > > watermarkFn
> > > > > > that
> > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage in
> > > javadoc
> > > > > > > > > > > > <https://github.com/apache/
> incubator-beam/blob/master/
> > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > )
> > > > > > > > > > > >
> > > > > > > > > > > > I think default watermark should be smarter. it
> should
> > > > > advance
> > > > > > to
> > > > > > > > > > current
> > > > > > > > > > > > time if there aren't any records to read from Kafka.
> > > Could
> > > > > you
> > > > > > > > file a
> > > > > > > > > > > jira?
> > > > > > > > > > > >
> > > > > > > > > > > > thanks,
> > > > > > > > > > > > Raghu.
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi All
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am trying to do some simple batch processing on
> > > KafkaIO
> > > > > > > > records.
> > > > > > > > > > My
> > > > > > > > > > > > beam
> > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > >
> > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > ExtractKVMessage()))
> > > > > //
> > > > > > > > > Emits a
> > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > >
> > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > 10))).withAllowedLateness(
> > Duration.standardSeconds(1)))
> > > > > > > > > > > > >
> > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > >
> > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > My Kafka Source already has some messages 1000+,
> and
> > > new
> > > > > > > messages
> > > > > > > > > > > arrive
> > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > >
> > > > > > > > > > > > > When i start my pipeline,  i can see that it reads
> > all
> > > > the
> > > > > > > 1000+
> > > > > > > > > > > messages
> > > > > > > > > > > > > from Kafka.  However, Window does not fire untill a
> > new
> > > > > > message
> > > > > > > > > > arrives
> > > > > > > > > > > > in
> > > > > > > > > > > > > Kafka.  And Sink does not receive any message until
> > > that
> > > > > > point.
> > > > > > > > > Do i
> > > > > > > > > > > > need
> > > > > > > > > > > > > to override the WaterMarkFn here? Since i am not
> > > > providing
> > > > > > any
> > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > , i am assuming that timestamps will be assigned as
> > in
> > > > when
> > > > > > > > message
> > > > > > > > > > > > arrives
> > > > > > > > > > > > > i.e. ingestion time.  What is the default
> WaterMarkFn
> > > > > > > > > implementation?
> > > > > > > > > > > Is
> > > > > > > > > > > > > the Window not supposed to be fired based on
> > Ingestion
> > > > > time?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards
> > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Yes.  I added it only for DirectRunner as it cannot translate
Read(UnboundedSourceOfKafka)

Regards
Sumit Chawla


On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Ah ok, this might be a stupid question but did you remove this line when
> running it with Flink:
>         .withMaxNumRecords(500)
>
> Cheers,
> Aljoscha
>
> On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com> wrote:
>
> > Hi Aljoscha
> >
> > The code is not different while running on Flink.  It have removed
> business
> > specific transformations only.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > could you maybe also post the complete that you're using with the
> > > FlinkRunner? I could have a look into it.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <su...@gmail.com>
> > wrote:
> > >
> > > > Hi Thomas
> > > >
> > > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > > Following
> > > > is the snippet i am working on, and will post more details once i get
> > it
> > > > working ( as of now i am unable to read messages from Kafka using
> > > > DirectRunner)
> > > >
> > > >
> > > > PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > pipeline.apply(KafkaIO.read()
> > > >         .withMaxNumRecords(500)
> > > >         .withTopics(ImmutableList.of("mytopic"))
> > > >         .withBootstrapServers("localhost:9092")
> > > >         .updateConsumerProperties(ImmutableMap.of(
> > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
> > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > > > KV<String, String>>() {
> > > >     @Override
> > > >     public void processElement(ProcessContext c) throws Exception {
> > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > >         c.output(KV.of(new String(record.getKey()), new
> > > > String(record.getValue())));
> > > >     }
> > > > }))
> > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > >                 .withAllowedLateness(Duration.standardSeconds(1))
> > > >                 .triggering(
> > > >                         Repeatedly.forever(
> > > >                                 AfterFirst.of(
> > > >
> > > > AfterProcessingTime.pastFirstElementInPane()
> > > >
> > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > >
>  AfterPane.elementCountAtLeast(
> > > 100)
> > > >                                 )))
> > > >                 .discardingFiredPanes())
> > > >         .apply("GroupByTenant", GroupByKey.create())
> > > >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>,
> Void>()
> > {
> > > >             @Override
> > > >             public void processElement(ProcessContext c) throws
> > > Exception {
> > > >                 KV<String, Iterable<String>> element = c.element();
> > > >                 Iterator<String> iterator =
> > > element.getValue().iterator();
> > > >                 int count = 0;
> > > >                 while (iterator.hasNext()) {
> > > >                     iterator.next();
> > > >                     count++;
> > > >                 }
> > > >                 System.out.println(String.format("Key %s Value Count
> > > > %d", element.getKey(), count));
> > > >             }
> > > >         }));
> > > > pipeline.run();
> > > >
> > > >
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> <tgroh@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > If you use the DirectRunner, do you observe the same behavior?
> > > > >
> > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Thomas
> > > > > >
> > > > > > I am using FlinkRunner.  Yes the second part of trigger never
> fires
> > > for
> > > > > me,
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > <tgroh@google.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Sumit;
> > > > > > >
> > > > > > > What runner are you using? I can set up a test with the same
> > > trigger
> > > > > > > reading from an unbounded input using the DirectRunner and I
> get
> > > the
> > > > > > > expected output panes.
> > > > > > >
> > > > > > > Just to clarify, the second half of the trigger ('when the
> first
> > > > > element
> > > > > > > has been there for at least 30+ seconds') simply never fires?
> > > > > > >
> > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > sumitkchawla@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Thomas
> > > > > > > >
> > > > > > > > That did not work.
> > > > > > > >
> > > > > > > > I tried following instead:
> > > > > > > >
> > > > > > > > .triggering(
> > > > > > > >         Repeatedly.forever(
> > > > > > > >                 AfterFirst.of(
> > > > > > > >                               AfterProcessingTime.
> > > > > > > pastFirstElementInPane()
> > > > > > > >
>  .plusDelayOf(Duration.standard
> > > > > > > > Seconds(30)),
> > > > > > > >
> >  AfterPane.elementCountAtLeast(100)
> > > > > > > >                         )))
> > > > > > > > .discardingFiredPanes()
> > > > > > > >
> > > > > > > > What i am trying to do here.  This is to make sure that
> > followup
> > > > > > > > operations receive batches of records.
> > > > > > > >
> > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > >
> > > > > > > > 2.  Or Fire when the first element has been there for atleast
> > 30
> > > > > sec+.
> > > > > > > >
> > > > > > > > However,  2 point does not seem to work.  e.g. I have 540
> > records
> > > > in
> > > > > > > > Kafka.  The first 500 records are available immediately,
> > > > > > > >
> > > > > > > > but the remaining 40 don't pass through. I was expecting 2nd
> to
> > > > > > > > trigger to help here.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > <tgroh@google.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > You can adjust the trigger in the windowing transform if
> your
> > > > sink
> > > > > > can
> > > > > > > > > handle being written to multiple times for the same window.
> > For
> > > > > > > example,
> > > > > > > > if
> > > > > > > > > the sink appends to the output when it receives new data
> in a
> > > > > window,
> > > > > > > you
> > > > > > > > > could add something like
> > > > > > > > >
> > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > triggering(AfterWatermark.
> > > > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > standardSeconds(5))).
> > > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).
> discardin
> > > > > > > > gFiredPanes();
> > > > > > > > >
> > > > > > > > > This will cause elements to be output some amount of time
> > after
> > > > > they
> > > > > > > are
> > > > > > > > > first received from Kafka, even if Kafka does not have any
> > new
> > > > > > > elements.
> > > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > > >
> > > > > > > > > We should still have a JIRA to improve the KafkaIO
> watermark
> > > > > tracking
> > > > > > > in
> > > > > > > > > the absence of new records .
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > sumitkchawla@gmail.com
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Raghu.
> > > > > > > > > >
> > > > > > > > > > I don't have much control over changing KafkaIO
> properties.
> > > I
> > > > > > added
> > > > > > > > > > KafkaIO code for completing the example.  Are there any
> > > changes
> > > > > > that
> > > > > > > > can
> > > > > > > > > be
> > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > The default implementation returns processing timestamp
> > of
> > > > the
> > > > > > last
> > > > > > > > > > record
> > > > > > > > > > > (in effect. more accurately it returns same as
> > > > getTimestamp(),
> > > > > > > which
> > > > > > > > > > might
> > > > > > > > > > > overridden by user).
> > > > > > > > > > >
> > > > > > > > > > > As a work around, yes, you can provide your own
> > watermarkFn
> > > > > that
> > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage in
> > javadoc
> > > > > > > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > > > > > > sdks/java/io/kafka/src/main/
> java/org/apache/beam/sdk/io/
> > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > )
> > > > > > > > > > >
> > > > > > > > > > > I think default watermark should be smarter. it should
> > > > advance
> > > > > to
> > > > > > > > > current
> > > > > > > > > > > time if there aren't any records to read from Kafka.
> > Could
> > > > you
> > > > > > > file a
> > > > > > > > > > jira?
> > > > > > > > > > >
> > > > > > > > > > > thanks,
> > > > > > > > > > > Raghu.
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi All
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > I am trying to do some simple batch processing on
> > KafkaIO
> > > > > > > records.
> > > > > > > > > My
> > > > > > > > > > > beam
> > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > >
> > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > ExtractKVMessage()))
> > > > //
> > > > > > > > Emits a
> > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > >
> > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > of(Duration.standardSeconds(
> > > > > > > > > > > > 10))).withAllowedLateness(
> Duration.standardSeconds(1)))
> > > > > > > > > > > >
> > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > >
> > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > My Kafka Source already has some messages 1000+, and
> > new
> > > > > > messages
> > > > > > > > > > arrive
> > > > > > > > > > > > every few minutes.
> > > > > > > > > > > >
> > > > > > > > > > > > When i start my pipeline,  i can see that it reads
> all
> > > the
> > > > > > 1000+
> > > > > > > > > > messages
> > > > > > > > > > > > from Kafka.  However, Window does not fire untill a
> new
> > > > > message
> > > > > > > > > arrives
> > > > > > > > > > > in
> > > > > > > > > > > > Kafka.  And Sink does not receive any message until
> > that
> > > > > point.
> > > > > > > > Do i
> > > > > > > > > > > need
> > > > > > > > > > > > to override the WaterMarkFn here? Since i am not
> > > providing
> > > > > any
> > > > > > > > > > > timeStampFn
> > > > > > > > > > > > , i am assuming that timestamps will be assigned as
> in
> > > when
> > > > > > > message
> > > > > > > > > > > arrives
> > > > > > > > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > > > > > > > implementation?
> > > > > > > > > > Is
> > > > > > > > > > > > the Window not supposed to be fired based on
> Ingestion
> > > > time?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Regards
> > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Aljoscha Krettek <al...@apache.org>.
Ah ok, this might be a stupid question but did you remove this line when
running it with Flink:
        .withMaxNumRecords(500)

Cheers,
Aljoscha

On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <su...@gmail.com> wrote:

> Hi Aljoscha
>
> The code is not different while running on Flink.  It have removed business
> specific transformations only.
>
> Regards
> Sumit Chawla
>
>
> On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > could you maybe also post the complete that you're using with the
> > FlinkRunner? I could have a look into it.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <su...@gmail.com>
> wrote:
> >
> > > Hi Thomas
> > >
> > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > Following
> > > is the snippet i am working on, and will post more details once i get
> it
> > > working ( as of now i am unable to read messages from Kafka using
> > > DirectRunner)
> > >
> > >
> > > PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > pipeline.apply(KafkaIO.read()
> > >         .withMaxNumRecords(500)
> > >         .withTopics(ImmutableList.of("mytopic"))
> > >         .withBootstrapServers("localhost:9092")
> > >         .updateConsumerProperties(ImmutableMap.of(
> > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
> > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > > KV<String, String>>() {
> > >     @Override
> > >     public void processElement(ProcessContext c) throws Exception {
> > >         KV<byte[], byte[]> record = c.element().getKV();
> > >         c.output(KV.of(new String(record.getKey()), new
> > > String(record.getValue())));
> > >     }
> > > }))
> > >         .apply("WindowByMinute", Window.<KV<String,
> > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > >                 .withAllowedLateness(Duration.standardSeconds(1))
> > >                 .triggering(
> > >                         Repeatedly.forever(
> > >                                 AfterFirst.of(
> > >
> > > AfterProcessingTime.pastFirstElementInPane()
> > >
> > > .plusDelayOf(Duration.standardSeconds(30)),
> > >                                         AfterPane.elementCountAtLeast(
> > 100)
> > >                                 )))
> > >                 .discardingFiredPanes())
> > >         .apply("GroupByTenant", GroupByKey.create())
> > >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>()
> {
> > >             @Override
> > >             public void processElement(ProcessContext c) throws
> > Exception {
> > >                 KV<String, Iterable<String>> element = c.element();
> > >                 Iterator<String> iterator =
> > element.getValue().iterator();
> > >                 int count = 0;
> > >                 while (iterator.hasNext()) {
> > >                     iterator.next();
> > >                     count++;
> > >                 }
> > >                 System.out.println(String.format("Key %s Value Count
> > > %d", element.getKey(), count));
> > >             }
> > >         }));
> > > pipeline.run();
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <tgroh@google.com.invalid
> >
> > > wrote:
> > >
> > > > If you use the DirectRunner, do you observe the same behavior?
> > > >
> > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> sumitkchawla@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Thomas
> > > > >
> > > > > I am using FlinkRunner.  Yes the second part of trigger never fires
> > for
> > > > me,
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > <tgroh@google.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Sumit;
> > > > > >
> > > > > > What runner are you using? I can set up a test with the same
> > trigger
> > > > > > reading from an unbounded input using the DirectRunner and I get
> > the
> > > > > > expected output panes.
> > > > > >
> > > > > > Just to clarify, the second half of the trigger ('when the first
> > > > element
> > > > > > has been there for at least 30+ seconds') simply never fires?
> > > > > >
> > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > sumitkchawla@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Thomas
> > > > > > >
> > > > > > > That did not work.
> > > > > > >
> > > > > > > I tried following instead:
> > > > > > >
> > > > > > > .triggering(
> > > > > > >         Repeatedly.forever(
> > > > > > >                 AfterFirst.of(
> > > > > > >                               AfterProcessingTime.
> > > > > > pastFirstElementInPane()
> > > > > > >                                 .plusDelayOf(Duration.standard
> > > > > > > Seconds(30)),
> > > > > > >
>  AfterPane.elementCountAtLeast(100)
> > > > > > >                         )))
> > > > > > > .discardingFiredPanes()
> > > > > > >
> > > > > > > What i am trying to do here.  This is to make sure that
> followup
> > > > > > > operations receive batches of records.
> > > > > > >
> > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > >
> > > > > > > 2.  Or Fire when the first element has been there for atleast
> 30
> > > > sec+.
> > > > > > >
> > > > > > > However,  2 point does not seem to work.  e.g. I have 540
> records
> > > in
> > > > > > > Kafka.  The first 500 records are available immediately,
> > > > > > >
> > > > > > > but the remaining 40 don't pass through. I was expecting 2nd to
> > > > > > > trigger to help here.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > <tgroh@google.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > You can adjust the trigger in the windowing transform if your
> > > sink
> > > > > can
> > > > > > > > handle being written to multiple times for the same window.
> For
> > > > > > example,
> > > > > > > if
> > > > > > > > the sink appends to the output when it receives new data in a
> > > > window,
> > > > > > you
> > > > > > > > could add something like
> > > > > > > >
> > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > triggering(AfterWatermark.
> > > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > standardSeconds(5))).
> > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > > > > > > gFiredPanes();
> > > > > > > >
> > > > > > > > This will cause elements to be output some amount of time
> after
> > > > they
> > > > > > are
> > > > > > > > first received from Kafka, even if Kafka does not have any
> new
> > > > > > elements.
> > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > >
> > > > > > > > We should still have a JIRA to improve the KafkaIO watermark
> > > > tracking
> > > > > > in
> > > > > > > > the absence of new records .
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > sumitkchawla@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Raghu.
> > > > > > > > >
> > > > > > > > > I don't have much control over changing KafkaIO properties.
> > I
> > > > > added
> > > > > > > > > KafkaIO code for completing the example.  Are there any
> > changes
> > > > > that
> > > > > > > can
> > > > > > > > be
> > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > <rangadi@google.com.invalid
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > The default implementation returns processing timestamp
> of
> > > the
> > > > > last
> > > > > > > > > record
> > > > > > > > > > (in effect. more accurately it returns same as
> > > getTimestamp(),
> > > > > > which
> > > > > > > > > might
> > > > > > > > > > overridden by user).
> > > > > > > > > >
> > > > > > > > > > As a work around, yes, you can provide your own
> watermarkFn
> > > > that
> > > > > > > > > > essentially returns Now() or Now()-1sec. (usage in
> javadoc
> > > > > > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > )
> > > > > > > > > >
> > > > > > > > > > I think default watermark should be smarter. it should
> > > advance
> > > > to
> > > > > > > > current
> > > > > > > > > > time if there aren't any records to read from Kafka.
> Could
> > > you
> > > > > > file a
> > > > > > > > > jira?
> > > > > > > > > >
> > > > > > > > > > thanks,
> > > > > > > > > > Raghu.
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi All
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I am trying to do some simple batch processing on
> KafkaIO
> > > > > > records.
> > > > > > > > My
> > > > > > > > > > beam
> > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > >
> > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > ExtractKVMessage()))
> > > //
> > > > > > > Emits a
> > > > > > > > > > > KV<String,String>
> > > > > > > > > > >
> > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > JSONObject>>into(FixedWindows.
> > of(Duration.standardSeconds(
> > > > > > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > > > > > > >
> > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > >
> > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > My Kafka Source already has some messages 1000+, and
> new
> > > > > messages
> > > > > > > > > arrive
> > > > > > > > > > > every few minutes.
> > > > > > > > > > >
> > > > > > > > > > > When i start my pipeline,  i can see that it reads all
> > the
> > > > > 1000+
> > > > > > > > > messages
> > > > > > > > > > > from Kafka.  However, Window does not fire untill a new
> > > > message
> > > > > > > > arrives
> > > > > > > > > > in
> > > > > > > > > > > Kafka.  And Sink does not receive any message until
> that
> > > > point.
> > > > > > > Do i
> > > > > > > > > > need
> > > > > > > > > > > to override the WaterMarkFn here? Since i am not
> > providing
> > > > any
> > > > > > > > > > timeStampFn
> > > > > > > > > > > , i am assuming that timestamps will be assigned as in
> > when
> > > > > > message
> > > > > > > > > > arrives
> > > > > > > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > > > > > > implementation?
> > > > > > > > > Is
> > > > > > > > > > > the Window not supposed to be fired based on Ingestion
> > > time?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi Aljoscha

The code is not different while running on Flink.  It have removed business
specific transformations only.

Regards
Sumit Chawla


On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> could you maybe also post the complete that you're using with the
> FlinkRunner? I could have a look into it.
>
> Cheers,
> Aljoscha
>
> On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <su...@gmail.com> wrote:
>
> > Hi Thomas
> >
> > Sorry i tried with DirectRunner but ran into some kafka issues.
> Following
> > is the snippet i am working on, and will post more details once i get it
> > working ( as of now i am unable to read messages from Kafka using
> > DirectRunner)
> >
> >
> > PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > pipeline.apply(KafkaIO.read()
> >         .withMaxNumRecords(500)
> >         .withTopics(ImmutableList.of("mytopic"))
> >         .withBootstrapServers("localhost:9092")
> >         .updateConsumerProperties(ImmutableMap.of(
> >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
> >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > KV<String, String>>() {
> >     @Override
> >     public void processElement(ProcessContext c) throws Exception {
> >         KV<byte[], byte[]> record = c.element().getKV();
> >         c.output(KV.of(new String(record.getKey()), new
> > String(record.getValue())));
> >     }
> > }))
> >         .apply("WindowByMinute", Window.<KV<String,
> > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> >                 .withAllowedLateness(Duration.standardSeconds(1))
> >                 .triggering(
> >                         Repeatedly.forever(
> >                                 AfterFirst.of(
> >
> > AfterProcessingTime.pastFirstElementInPane()
> >
> > .plusDelayOf(Duration.standardSeconds(30)),
> >                                         AfterPane.elementCountAtLeast(
> 100)
> >                                 )))
> >                 .discardingFiredPanes())
> >         .apply("GroupByTenant", GroupByKey.create())
> >         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
> >             @Override
> >             public void processElement(ProcessContext c) throws
> Exception {
> >                 KV<String, Iterable<String>> element = c.element();
> >                 Iterator<String> iterator =
> element.getValue().iterator();
> >                 int count = 0;
> >                 while (iterator.hasNext()) {
> >                     iterator.next();
> >                     count++;
> >                 }
> >                 System.out.println(String.format("Key %s Value Count
> > %d", element.getKey(), count));
> >             }
> >         }));
> > pipeline.run();
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> > > If you use the DirectRunner, do you observe the same behavior?
> > >
> > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > >
> > > > Hi Thomas
> > > >
> > > > I am using FlinkRunner.  Yes the second part of trigger never fires
> for
> > > me,
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> <tgroh@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > Hey Sumit;
> > > > >
> > > > > What runner are you using? I can set up a test with the same
> trigger
> > > > > reading from an unbounded input using the DirectRunner and I get
> the
> > > > > expected output panes.
> > > > >
> > > > > Just to clarify, the second half of the trigger ('when the first
> > > element
> > > > > has been there for at least 30+ seconds') simply never fires?
> > > > >
> > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Thomas
> > > > > >
> > > > > > That did not work.
> > > > > >
> > > > > > I tried following instead:
> > > > > >
> > > > > > .triggering(
> > > > > >         Repeatedly.forever(
> > > > > >                 AfterFirst.of(
> > > > > >                               AfterProcessingTime.
> > > > > pastFirstElementInPane()
> > > > > >                                 .plusDelayOf(Duration.standard
> > > > > > Seconds(30)),
> > > > > >                               AfterPane.elementCountAtLeast(100)
> > > > > >                         )))
> > > > > > .discardingFiredPanes()
> > > > > >
> > > > > > What i am trying to do here.  This is to make sure that followup
> > > > > > operations receive batches of records.
> > > > > >
> > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > >
> > > > > > 2.  Or Fire when the first element has been there for atleast 30
> > > sec+.
> > > > > >
> > > > > > However,  2 point does not seem to work.  e.g. I have 540 records
> > in
> > > > > > Kafka.  The first 500 records are available immediately,
> > > > > >
> > > > > > but the remaining 40 don't pass through. I was expecting 2nd to
> > > > > > trigger to help here.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > <tgroh@google.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > You can adjust the trigger in the windowing transform if your
> > sink
> > > > can
> > > > > > > handle being written to multiple times for the same window. For
> > > > > example,
> > > > > > if
> > > > > > > the sink appends to the output when it receives new data in a
> > > window,
> > > > > you
> > > > > > > could add something like
> > > > > > >
> > > > > > > Window.into(...).withAllowedLateness(...).
> > > triggering(AfterWatermark.
> > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > standardSeconds(5))).
> > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > > > > > gFiredPanes();
> > > > > > >
> > > > > > > This will cause elements to be output some amount of time after
> > > they
> > > > > are
> > > > > > > first received from Kafka, even if Kafka does not have any new
> > > > > elements.
> > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > >
> > > > > > > We should still have a JIRA to improve the KafkaIO watermark
> > > tracking
> > > > > in
> > > > > > > the absence of new records .
> > > > > > >
> > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > sumitkchawla@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Raghu.
> > > > > > > >
> > > > > > > > I don't have much control over changing KafkaIO properties.
> I
> > > > added
> > > > > > > > KafkaIO code for completing the example.  Are there any
> changes
> > > > that
> > > > > > can
> > > > > > > be
> > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > <rangadi@google.com.invalid
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > The default implementation returns processing timestamp of
> > the
> > > > last
> > > > > > > > record
> > > > > > > > > (in effect. more accurately it returns same as
> > getTimestamp(),
> > > > > which
> > > > > > > > might
> > > > > > > > > overridden by user).
> > > > > > > > >
> > > > > > > > > As a work around, yes, you can provide your own watermarkFn
> > > that
> > > > > > > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > )
> > > > > > > > >
> > > > > > > > > I think default watermark should be smarter. it should
> > advance
> > > to
> > > > > > > current
> > > > > > > > > time if there aren't any records to read from Kafka. Could
> > you
> > > > > file a
> > > > > > > > jira?
> > > > > > > > >
> > > > > > > > > thanks,
> > > > > > > > > Raghu.
> > > > > > > > >
> > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > sumitkchawla@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi All
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I am trying to do some simple batch processing on KafkaIO
> > > > > records.
> > > > > > > My
> > > > > > > > > beam
> > > > > > > > > > pipeline looks like following:
> > > > > > > > > >
> > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> ExtractKVMessage()))
> > //
> > > > > > Emits a
> > > > > > > > > > KV<String,String>
> > > > > > > > > >
> > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > JSONObject>>into(FixedWindows.
> of(Duration.standardSeconds(
> > > > > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > > > > > >
> > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > >
> > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > My Kafka Source already has some messages 1000+, and new
> > > > messages
> > > > > > > > arrive
> > > > > > > > > > every few minutes.
> > > > > > > > > >
> > > > > > > > > > When i start my pipeline,  i can see that it reads all
> the
> > > > 1000+
> > > > > > > > messages
> > > > > > > > > > from Kafka.  However, Window does not fire untill a new
> > > message
> > > > > > > arrives
> > > > > > > > > in
> > > > > > > > > > Kafka.  And Sink does not receive any message until that
> > > point.
> > > > > > Do i
> > > > > > > > > need
> > > > > > > > > > to override the WaterMarkFn here? Since i am not
> providing
> > > any
> > > > > > > > > timeStampFn
> > > > > > > > > > , i am assuming that timestamps will be assigned as in
> when
> > > > > message
> > > > > > > > > arrives
> > > > > > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > > > > > implementation?
> > > > > > > > Is
> > > > > > > > > > the Window not supposed to be fired based on Ingestion
> > time?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
could you maybe also post the complete that you're using with the
FlinkRunner? I could have a look into it.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <su...@gmail.com> wrote:

> Hi Thomas
>
> Sorry i tried with DirectRunner but ran into some kafka issues.  Following
> is the snippet i am working on, and will post more details once i get it
> working ( as of now i am unable to read messages from Kafka using
> DirectRunner)
>
>
> PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> pipelineOptions.setRunner(DirectPipelineRunner.class);
> Pipeline pipeline = Pipeline.create(pipelineOptions);
> pipeline.apply(KafkaIO.read()
>         .withMaxNumRecords(500)
>         .withTopics(ImmutableList.of("mytopic"))
>         .withBootstrapServers("localhost:9092")
>         .updateConsumerProperties(ImmutableMap.of(
>                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
>                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
>                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
>         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> KV<String, String>>() {
>     @Override
>     public void processElement(ProcessContext c) throws Exception {
>         KV<byte[], byte[]> record = c.element().getKV();
>         c.output(KV.of(new String(record.getKey()), new
> String(record.getValue())));
>     }
> }))
>         .apply("WindowByMinute", Window.<KV<String,
> String>>into(FixedWindows.of(Duration.standardSeconds(10)))
>                 .withAllowedLateness(Duration.standardSeconds(1))
>                 .triggering(
>                         Repeatedly.forever(
>                                 AfterFirst.of(
>
> AfterProcessingTime.pastFirstElementInPane()
>
> .plusDelayOf(Duration.standardSeconds(30)),
>                                         AfterPane.elementCountAtLeast(100)
>                                 )))
>                 .discardingFiredPanes())
>         .apply("GroupByTenant", GroupByKey.create())
>         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
>             @Override
>             public void processElement(ProcessContext c) throws Exception {
>                 KV<String, Iterable<String>> element = c.element();
>                 Iterator<String> iterator = element.getValue().iterator();
>                 int count = 0;
>                 while (iterator.hasNext()) {
>                     iterator.next();
>                     count++;
>                 }
>                 System.out.println(String.format("Key %s Value Count
> %d", element.getKey(), count));
>             }
>         }));
> pipeline.run();
>
>
>
> Regards
> Sumit Chawla
>
>
> On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > If you use the DirectRunner, do you observe the same behavior?
> >
> > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Hi Thomas
> > >
> > > I am using FlinkRunner.  Yes the second part of trigger never fires for
> > me,
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tgroh@google.com.invalid
> >
> > > wrote:
> > >
> > > > Hey Sumit;
> > > >
> > > > What runner are you using? I can set up a test with the same trigger
> > > > reading from an unbounded input using the DirectRunner and I get the
> > > > expected output panes.
> > > >
> > > > Just to clarify, the second half of the trigger ('when the first
> > element
> > > > has been there for at least 30+ seconds') simply never fires?
> > > >
> > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> sumitkchawla@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Thomas
> > > > >
> > > > > That did not work.
> > > > >
> > > > > I tried following instead:
> > > > >
> > > > > .triggering(
> > > > >         Repeatedly.forever(
> > > > >                 AfterFirst.of(
> > > > >                               AfterProcessingTime.
> > > > pastFirstElementInPane()
> > > > >                                 .plusDelayOf(Duration.standard
> > > > > Seconds(30)),
> > > > >                               AfterPane.elementCountAtLeast(100)
> > > > >                         )))
> > > > > .discardingFiredPanes()
> > > > >
> > > > > What i am trying to do here.  This is to make sure that followup
> > > > > operations receive batches of records.
> > > > >
> > > > > 1.  Fire when at Pane has 100+ elements
> > > > >
> > > > > 2.  Or Fire when the first element has been there for atleast 30
> > sec+.
> > > > >
> > > > > However,  2 point does not seem to work.  e.g. I have 540 records
> in
> > > > > Kafka.  The first 500 records are available immediately,
> > > > >
> > > > > but the remaining 40 don't pass through. I was expecting 2nd to
> > > > > trigger to help here.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > <tgroh@google.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > You can adjust the trigger in the windowing transform if your
> sink
> > > can
> > > > > > handle being written to multiple times for the same window. For
> > > > example,
> > > > > if
> > > > > > the sink appends to the output when it receives new data in a
> > window,
> > > > you
> > > > > > could add something like
> > > > > >
> > > > > > Window.into(...).withAllowedLateness(...).
> > triggering(AfterWatermark.
> > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > standardSeconds(5))).
> > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > > > > gFiredPanes();
> > > > > >
> > > > > > This will cause elements to be output some amount of time after
> > they
> > > > are
> > > > > > first received from Kafka, even if Kafka does not have any new
> > > > elements.
> > > > > > Elements will only be output by the GroupByKey once.
> > > > > >
> > > > > > We should still have a JIRA to improve the KafkaIO watermark
> > tracking
> > > > in
> > > > > > the absence of new records .
> > > > > >
> > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > sumitkchawla@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Raghu.
> > > > > > >
> > > > > > > I don't have much control over changing KafkaIO properties.  I
> > > added
> > > > > > > KafkaIO code for completing the example.  Are there any changes
> > > that
> > > > > can
> > > > > > be
> > > > > > > done to Windowing to achieve the same behavior?
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > <rangadi@google.com.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > The default implementation returns processing timestamp of
> the
> > > last
> > > > > > > record
> > > > > > > > (in effect. more accurately it returns same as
> getTimestamp(),
> > > > which
> > > > > > > might
> > > > > > > > overridden by user).
> > > > > > > >
> > > > > > > > As a work around, yes, you can provide your own watermarkFn
> > that
> > > > > > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > )
> > > > > > > >
> > > > > > > > I think default watermark should be smarter. it should
> advance
> > to
> > > > > > current
> > > > > > > > time if there aren't any records to read from Kafka. Could
> you
> > > > file a
> > > > > > > jira?
> > > > > > > >
> > > > > > > > thanks,
> > > > > > > > Raghu.
> > > > > > > >
> > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > sumitkchawla@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I am trying to do some simple batch processing on KafkaIO
> > > > records.
> > > > > > My
> > > > > > > > beam
> > > > > > > > > pipeline looks like following:
> > > > > > > > >
> > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage()))
> //
> > > > > Emits a
> > > > > > > > > KV<String,String>
> > > > > > > > >
> > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > > > > >
> > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > >
> > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > My Kafka Source already has some messages 1000+, and new
> > > messages
> > > > > > > arrive
> > > > > > > > > every few minutes.
> > > > > > > > >
> > > > > > > > > When i start my pipeline,  i can see that it reads all the
> > > 1000+
> > > > > > > messages
> > > > > > > > > from Kafka.  However, Window does not fire untill a new
> > message
> > > > > > arrives
> > > > > > > > in
> > > > > > > > > Kafka.  And Sink does not receive any message until that
> > point.
> > > > > Do i
> > > > > > > > need
> > > > > > > > > to override the WaterMarkFn here? Since i am not providing
> > any
> > > > > > > > timeStampFn
> > > > > > > > > , i am assuming that timestamps will be assigned as in when
> > > > message
> > > > > > > > arrives
> > > > > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > > > > implementation?
> > > > > > > Is
> > > > > > > > > the Window not supposed to be fired based on Ingestion
> time?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi Thomas

Sorry i tried with DirectRunner but ran into some kafka issues.  Following
is the snippet i am working on, and will post more details once i get it
working ( as of now i am unable to read messages from Kafka using
DirectRunner)


PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setRunner(DirectPipelineRunner.class);
Pipeline pipeline = Pipeline.create(pipelineOptions);
pipeline.apply(KafkaIO.read()
        .withMaxNumRecords(500)
        .withTopics(ImmutableList.of("mytopic"))
        .withBootstrapServers("localhost:9092")
        .updateConsumerProperties(ImmutableMap.of(
                ConsumerConfig.GROUP_ID_CONFIG, "test1",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
        ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
KV<String, String>>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        KV<byte[], byte[]> record = c.element().getKV();
        c.output(KV.of(new String(record.getKey()), new
String(record.getValue())));
    }
}))
        .apply("WindowByMinute", Window.<KV<String,
String>>into(FixedWindows.of(Duration.standardSeconds(10)))
                .withAllowedLateness(Duration.standardSeconds(1))
                .triggering(
                        Repeatedly.forever(
                                AfterFirst.of(

AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(30)),
                                        AfterPane.elementCountAtLeast(100)
                                )))
                .discardingFiredPanes())
        .apply("GroupByTenant", GroupByKey.create())
        .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                KV<String, Iterable<String>> element = c.element();
                Iterator<String> iterator = element.getValue().iterator();
                int count = 0;
                while (iterator.hasNext()) {
                    iterator.next();
                    count++;
                }
                System.out.println(String.format("Key %s Value Count
%d", element.getKey(), count));
            }
        }));
pipeline.run();



Regards
Sumit Chawla


On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <tg...@google.com.invalid>
wrote:

> If you use the DirectRunner, do you observe the same behavior?
>
> On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Hi Thomas
> >
> > I am using FlinkRunner.  Yes the second part of trigger never fires for
> me,
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> > > Hey Sumit;
> > >
> > > What runner are you using? I can set up a test with the same trigger
> > > reading from an unbounded input using the DirectRunner and I get the
> > > expected output panes.
> > >
> > > Just to clarify, the second half of the trigger ('when the first
> element
> > > has been there for at least 30+ seconds') simply never fires?
> > >
> > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > >
> > > > Hi Thomas
> > > >
> > > > That did not work.
> > > >
> > > > I tried following instead:
> > > >
> > > > .triggering(
> > > >         Repeatedly.forever(
> > > >                 AfterFirst.of(
> > > >                               AfterProcessingTime.
> > > pastFirstElementInPane()
> > > >                                 .plusDelayOf(Duration.standard
> > > > Seconds(30)),
> > > >                               AfterPane.elementCountAtLeast(100)
> > > >                         )))
> > > > .discardingFiredPanes()
> > > >
> > > > What i am trying to do here.  This is to make sure that followup
> > > > operations receive batches of records.
> > > >
> > > > 1.  Fire when at Pane has 100+ elements
> > > >
> > > > 2.  Or Fire when the first element has been there for atleast 30
> sec+.
> > > >
> > > > However,  2 point does not seem to work.  e.g. I have 540 records in
> > > > Kafka.  The first 500 records are available immediately,
> > > >
> > > > but the remaining 40 don't pass through. I was expecting 2nd to
> > > > trigger to help here.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> <tgroh@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > You can adjust the trigger in the windowing transform if your sink
> > can
> > > > > handle being written to multiple times for the same window. For
> > > example,
> > > > if
> > > > > the sink appends to the output when it receives new data in a
> window,
> > > you
> > > > > could add something like
> > > > >
> > > > > Window.into(...).withAllowedLateness(...).
> triggering(AfterWatermark.
> > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > pastFirstElementInPane().withDelayOf(Duration.
> standardSeconds(5))).
> > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > > > gFiredPanes();
> > > > >
> > > > > This will cause elements to be output some amount of time after
> they
> > > are
> > > > > first received from Kafka, even if Kafka does not have any new
> > > elements.
> > > > > Elements will only be output by the GroupByKey once.
> > > > >
> > > > > We should still have a JIRA to improve the KafkaIO watermark
> tracking
> > > in
> > > > > the absence of new records .
> > > > >
> > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > sumitkchawla@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Thanks Raghu.
> > > > > >
> > > > > > I don't have much control over changing KafkaIO properties.  I
> > added
> > > > > > KafkaIO code for completing the example.  Are there any changes
> > that
> > > > can
> > > > > be
> > > > > > done to Windowing to achieve the same behavior?
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > <rangadi@google.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > The default implementation returns processing timestamp of the
> > last
> > > > > > record
> > > > > > > (in effect. more accurately it returns same as getTimestamp(),
> > > which
> > > > > > might
> > > > > > > overridden by user).
> > > > > > >
> > > > > > > As a work around, yes, you can provide your own watermarkFn
> that
> > > > > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > )
> > > > > > >
> > > > > > > I think default watermark should be smarter. it should advance
> to
> > > > > current
> > > > > > > time if there aren't any records to read from Kafka. Could you
> > > file a
> > > > > > jira?
> > > > > > >
> > > > > > > thanks,
> > > > > > > Raghu.
> > > > > > >
> > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > sumitkchawla@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All
> > > > > > > >
> > > > > > > >
> > > > > > > > I am trying to do some simple batch processing on KafkaIO
> > > records.
> > > > > My
> > > > > > > beam
> > > > > > > > pipeline looks like following:
> > > > > > > >
> > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) //
> > > > Emits a
> > > > > > > > KV<String,String>
> > > > > > > >
> > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > > > >
> > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > >
> > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > >
> > > > > > > >
> > > > > > > > My Kafka Source already has some messages 1000+, and new
> > messages
> > > > > > arrive
> > > > > > > > every few minutes.
> > > > > > > >
> > > > > > > > When i start my pipeline,  i can see that it reads all the
> > 1000+
> > > > > > messages
> > > > > > > > from Kafka.  However, Window does not fire untill a new
> message
> > > > > arrives
> > > > > > > in
> > > > > > > > Kafka.  And Sink does not receive any message until that
> point.
> > > > Do i
> > > > > > > need
> > > > > > > > to override the WaterMarkFn here? Since i am not providing
> any
> > > > > > > timeStampFn
> > > > > > > > , i am assuming that timestamps will be assigned as in when
> > > message
> > > > > > > arrives
> > > > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > > > implementation?
> > > > > > Is
> > > > > > > > the Window not supposed to be fired based on Ingestion time?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Thomas Groh <tg...@google.com.INVALID>.
If you use the DirectRunner, do you observe the same behavior?

On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> Hi Thomas
>
> I am using FlinkRunner.  Yes the second part of trigger never fires for me,
>
> Regards
> Sumit Chawla
>
>
> On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > Hey Sumit;
> >
> > What runner are you using? I can set up a test with the same trigger
> > reading from an unbounded input using the DirectRunner and I get the
> > expected output panes.
> >
> > Just to clarify, the second half of the trigger ('when the first element
> > has been there for at least 30+ seconds') simply never fires?
> >
> > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Hi Thomas
> > >
> > > That did not work.
> > >
> > > I tried following instead:
> > >
> > > .triggering(
> > >         Repeatedly.forever(
> > >                 AfterFirst.of(
> > >                               AfterProcessingTime.
> > pastFirstElementInPane()
> > >                                 .plusDelayOf(Duration.standard
> > > Seconds(30)),
> > >                               AfterPane.elementCountAtLeast(100)
> > >                         )))
> > > .discardingFiredPanes()
> > >
> > > What i am trying to do here.  This is to make sure that followup
> > > operations receive batches of records.
> > >
> > > 1.  Fire when at Pane has 100+ elements
> > >
> > > 2.  Or Fire when the first element has been there for atleast 30 sec+.
> > >
> > > However,  2 point does not seem to work.  e.g. I have 540 records in
> > > Kafka.  The first 500 records are available immediately,
> > >
> > > but the remaining 40 don't pass through. I was expecting 2nd to
> > > trigger to help here.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tgroh@google.com.invalid
> >
> > > wrote:
> > >
> > > > You can adjust the trigger in the windowing transform if your sink
> can
> > > > handle being written to multiple times for the same window. For
> > example,
> > > if
> > > > the sink appends to the output when it receives new data in a window,
> > you
> > > > could add something like
> > > >
> > > > Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > > gFiredPanes();
> > > >
> > > > This will cause elements to be output some amount of time after they
> > are
> > > > first received from Kafka, even if Kafka does not have any new
> > elements.
> > > > Elements will only be output by the GroupByKey once.
> > > >
> > > > We should still have a JIRA to improve the KafkaIO watermark tracking
> > in
> > > > the absence of new records .
> > > >
> > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> sumitkchawla@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thanks Raghu.
> > > > >
> > > > > I don't have much control over changing KafkaIO properties.  I
> added
> > > > > KafkaIO code for completing the example.  Are there any changes
> that
> > > can
> > > > be
> > > > > done to Windowing to achieve the same behavior?
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > <rangadi@google.com.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > The default implementation returns processing timestamp of the
> last
> > > > > record
> > > > > > (in effect. more accurately it returns same as getTimestamp(),
> > which
> > > > > might
> > > > > > overridden by user).
> > > > > >
> > > > > > As a work around, yes, you can provide your own watermarkFn that
> > > > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > > kafka/KafkaIO.java#L138>
> > > > > > )
> > > > > >
> > > > > > I think default watermark should be smarter. it should advance to
> > > > current
> > > > > > time if there aren't any records to read from Kafka. Could you
> > file a
> > > > > jira?
> > > > > >
> > > > > > thanks,
> > > > > > Raghu.
> > > > > >
> > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > sumitkchawla@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All
> > > > > > >
> > > > > > >
> > > > > > > I am trying to do some simple batch processing on KafkaIO
> > records.
> > > > My
> > > > > > beam
> > > > > > > pipeline looks like following:
> > > > > > >
> > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) //
> > > Emits a
> > > > > > > KV<String,String>
> > > > > > >
> > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > > >
> > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > >
> > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > >
> > > > > > >
> > > > > > > My Kafka Source already has some messages 1000+, and new
> messages
> > > > > arrive
> > > > > > > every few minutes.
> > > > > > >
> > > > > > > When i start my pipeline,  i can see that it reads all the
> 1000+
> > > > > messages
> > > > > > > from Kafka.  However, Window does not fire untill a new message
> > > > arrives
> > > > > > in
> > > > > > > Kafka.  And Sink does not receive any message until that point.
> > > Do i
> > > > > > need
> > > > > > > to override the WaterMarkFn here? Since i am not providing any
> > > > > > timeStampFn
> > > > > > > , i am assuming that timestamps will be assigned as in when
> > message
> > > > > > arrives
> > > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > > implementation?
> > > > > Is
> > > > > > > the Window not supposed to be fired based on Ingestion time?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi Thomas

I am using FlinkRunner.  Yes the second part of trigger never fires for me,

Regards
Sumit Chawla


On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> Hey Sumit;
>
> What runner are you using? I can set up a test with the same trigger
> reading from an unbounded input using the DirectRunner and I get the
> expected output panes.
>
> Just to clarify, the second half of the trigger ('when the first element
> has been there for at least 30+ seconds') simply never fires?
>
> On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Hi Thomas
> >
> > That did not work.
> >
> > I tried following instead:
> >
> > .triggering(
> >         Repeatedly.forever(
> >                 AfterFirst.of(
> >                               AfterProcessingTime.
> pastFirstElementInPane()
> >                                 .plusDelayOf(Duration.standard
> > Seconds(30)),
> >                               AfterPane.elementCountAtLeast(100)
> >                         )))
> > .discardingFiredPanes()
> >
> > What i am trying to do here.  This is to make sure that followup
> > operations receive batches of records.
> >
> > 1.  Fire when at Pane has 100+ elements
> >
> > 2.  Or Fire when the first element has been there for atleast 30 sec+.
> >
> > However,  2 point does not seem to work.  e.g. I have 540 records in
> > Kafka.  The first 500 records are available immediately,
> >
> > but the remaining 40 don't pass through. I was expecting 2nd to
> > trigger to help here.
> >
> >
> >
> >
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> > > You can adjust the trigger in the windowing transform if your sink can
> > > handle being written to multiple times for the same window. For
> example,
> > if
> > > the sink appends to the output when it receives new data in a window,
> you
> > > could add something like
> > >
> > > Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > gFiredPanes();
> > >
> > > This will cause elements to be output some amount of time after they
> are
> > > first received from Kafka, even if Kafka does not have any new
> elements.
> > > Elements will only be output by the GroupByKey once.
> > >
> > > We should still have a JIRA to improve the KafkaIO watermark tracking
> in
> > > the absence of new records .
> > >
> > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <sumitkchawla@gmail.com
> >
> > > wrote:
> > >
> > > > Thanks Raghu.
> > > >
> > > > I don't have much control over changing KafkaIO properties.  I added
> > > > KafkaIO code for completing the example.  Are there any changes that
> > can
> > > be
> > > > done to Windowing to achieve the same behavior?
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > <rangadi@google.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > The default implementation returns processing timestamp of the last
> > > > record
> > > > > (in effect. more accurately it returns same as getTimestamp(),
> which
> > > > might
> > > > > overridden by user).
> > > > >
> > > > > As a work around, yes, you can provide your own watermarkFn that
> > > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > kafka/KafkaIO.java#L138>
> > > > > )
> > > > >
> > > > > I think default watermark should be smarter. it should advance to
> > > current
> > > > > time if there aren't any records to read from Kafka. Could you
> file a
> > > > jira?
> > > > >
> > > > > thanks,
> > > > > Raghu.
> > > > >
> > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All
> > > > > >
> > > > > >
> > > > > > I am trying to do some simple batch processing on KafkaIO
> records.
> > > My
> > > > > beam
> > > > > > pipeline looks like following:
> > > > > >
> > > > > > pipeline.apply(KafkaIO.read()
> > > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > > >         .withBootstrapServers("localhost:9200")
> > > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) //
> > Emits a
> > > > > > KV<String,String>
> > > > > >
> > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > >
> > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > >
> > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > >
> > > > > >
> > > > > > My Kafka Source already has some messages 1000+, and new messages
> > > > arrive
> > > > > > every few minutes.
> > > > > >
> > > > > > When i start my pipeline,  i can see that it reads all the 1000+
> > > > messages
> > > > > > from Kafka.  However, Window does not fire untill a new message
> > > arrives
> > > > > in
> > > > > > Kafka.  And Sink does not receive any message until that point.
> > Do i
> > > > > need
> > > > > > to override the WaterMarkFn here? Since i am not providing any
> > > > > timeStampFn
> > > > > > , i am assuming that timestamps will be assigned as in when
> message
> > > > > arrives
> > > > > > i.e. ingestion time.  What is the default WaterMarkFn
> > implementation?
> > > > Is
> > > > > > the Window not supposed to be fired based on Ingestion time?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Thomas Groh <tg...@google.com.INVALID>.
Hey Sumit;

What runner are you using? I can set up a test with the same trigger
reading from an unbounded input using the DirectRunner and I get the
expected output panes.

Just to clarify, the second half of the trigger ('when the first element
has been there for at least 30+ seconds') simply never fires?

On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> Hi Thomas
>
> That did not work.
>
> I tried following instead:
>
> .triggering(
>         Repeatedly.forever(
>                 AfterFirst.of(
>                               AfterProcessingTime.pastFirstElementInPane()
>                                 .plusDelayOf(Duration.standard
> Seconds(30)),
>                               AfterPane.elementCountAtLeast(100)
>                         )))
> .discardingFiredPanes()
>
> What i am trying to do here.  This is to make sure that followup
> operations receive batches of records.
>
> 1.  Fire when at Pane has 100+ elements
>
> 2.  Or Fire when the first element has been there for atleast 30 sec+.
>
> However,  2 point does not seem to work.  e.g. I have 540 records in
> Kafka.  The first 500 records are available immediately,
>
> but the remaining 40 don't pass through. I was expecting 2nd to
> trigger to help here.
>
>
>
>
>
>
>
> Regards
> Sumit Chawla
>
>
> On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > You can adjust the trigger in the windowing transform if your sink can
> > handle being written to multiple times for the same window. For example,
> if
> > the sink appends to the output when it receives new data in a window, you
> > could add something like
> >
> > Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> gFiredPanes();
> >
> > This will cause elements to be output some amount of time after they are
> > first received from Kafka, even if Kafka does not have any new elements.
> > Elements will only be output by the GroupByKey once.
> >
> > We should still have a JIRA to improve the KafkaIO watermark tracking in
> > the absence of new records .
> >
> > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Thanks Raghu.
> > >
> > > I don't have much control over changing KafkaIO properties.  I added
> > > KafkaIO code for completing the example.  Are there any changes that
> can
> > be
> > > done to Windowing to achieve the same behavior?
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> <rangadi@google.com.invalid
> > >
> > > wrote:
> > >
> > > > The default implementation returns processing timestamp of the last
> > > record
> > > > (in effect. more accurately it returns same as getTimestamp(), which
> > > might
> > > > overridden by user).
> > > >
> > > > As a work around, yes, you can provide your own watermarkFn that
> > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > kafka/KafkaIO.java#L138>
> > > > )
> > > >
> > > > I think default watermark should be smarter. it should advance to
> > current
> > > > time if there aren't any records to read from Kafka. Could you file a
> > > jira?
> > > >
> > > > thanks,
> > > > Raghu.
> > > >
> > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> sumitkchawla@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All
> > > > >
> > > > >
> > > > > I am trying to do some simple batch processing on KafkaIO records.
> > My
> > > > beam
> > > > > pipeline looks like following:
> > > > >
> > > > > pipeline.apply(KafkaIO.read()
> > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > >         .withBootstrapServers("localhost:9200")
> > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) //
> Emits a
> > > > > KV<String,String>
> > > > >
> > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > >
> > > > > .apply("GroupByKey", GroupByKey.create())
> > > > >
> > > > > .apply("Sink", ParDo.of(new MySink())
> > > > >
> > > > >
> > > > > My Kafka Source already has some messages 1000+, and new messages
> > > arrive
> > > > > every few minutes.
> > > > >
> > > > > When i start my pipeline,  i can see that it reads all the 1000+
> > > messages
> > > > > from Kafka.  However, Window does not fire untill a new message
> > arrives
> > > > in
> > > > > Kafka.  And Sink does not receive any message until that point.
> Do i
> > > > need
> > > > > to override the WaterMarkFn here? Since i am not providing any
> > > > timeStampFn
> > > > > , i am assuming that timestamps will be assigned as in when message
> > > > arrives
> > > > > i.e. ingestion time.  What is the default WaterMarkFn
> implementation?
> > > Is
> > > > > the Window not supposed to be fired based on Ingestion time?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi Thomas

That did not work.

I tried following instead:

.triggering(
        Repeatedly.forever(
                AfterFirst.of(
                              AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(30)),
                              AfterPane.elementCountAtLeast(100)
                        )))
.discardingFiredPanes()

What i am trying to do here.  This is to make sure that followup
operations receive batches of records.

1.  Fire when at Pane has 100+ elements

2.  Or Fire when the first element has been there for atleast 30 sec+.

However,  2 point does not seem to work.  e.g. I have 540 records in
Kafka.  The first 500 records are available immediately,

but the remaining 40 don't pass through. I was expecting 2nd to
trigger to help here.







Regards
Sumit Chawla


On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> You can adjust the trigger in the windowing transform if your sink can
> handle being written to multiple times for the same window. For example, if
> the sink appends to the output when it receives new data in a window, you
> could add something like
>
> Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> withLateFirings(AfterPane.elementCountAtLeast(1))).discardingFiredPanes();
>
> This will cause elements to be output some amount of time after they are
> first received from Kafka, even if Kafka does not have any new elements.
> Elements will only be output by the GroupByKey once.
>
> We should still have a JIRA to improve the KafkaIO watermark tracking in
> the absence of new records .
>
> On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Thanks Raghu.
> >
> > I don't have much control over changing KafkaIO properties.  I added
> > KafkaIO code for completing the example.  Are there any changes that can
> be
> > done to Windowing to achieve the same behavior?
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <rangadi@google.com.invalid
> >
> > wrote:
> >
> > > The default implementation returns processing timestamp of the last
> > record
> > > (in effect. more accurately it returns same as getTimestamp(), which
> > might
> > > overridden by user).
> > >
> > > As a work around, yes, you can provide your own watermarkFn that
> > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > <https://github.com/apache/incubator-beam/blob/master/
> > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > kafka/KafkaIO.java#L138>
> > > )
> > >
> > > I think default watermark should be smarter. it should advance to
> current
> > > time if there aren't any records to read from Kafka. Could you file a
> > jira?
> > >
> > > thanks,
> > > Raghu.
> > >
> > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > >
> > > > Hi All
> > > >
> > > >
> > > > I am trying to do some simple batch processing on KafkaIO records.
> My
> > > beam
> > > > pipeline looks like following:
> > > >
> > > > pipeline.apply(KafkaIO.read()
> > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > >         .withBootstrapServers("localhost:9200")
> > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > > > KV<String,String>
> > > >
> > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > >
> > > > .apply("GroupByKey", GroupByKey.create())
> > > >
> > > > .apply("Sink", ParDo.of(new MySink())
> > > >
> > > >
> > > > My Kafka Source already has some messages 1000+, and new messages
> > arrive
> > > > every few minutes.
> > > >
> > > > When i start my pipeline,  i can see that it reads all the 1000+
> > messages
> > > > from Kafka.  However, Window does not fire untill a new message
> arrives
> > > in
> > > > Kafka.  And Sink does not receive any message until that point.  Do i
> > > need
> > > > to override the WaterMarkFn here? Since i am not providing any
> > > timeStampFn
> > > > , i am assuming that timestamps will be assigned as in when message
> > > arrives
> > > > i.e. ingestion time.  What is the default WaterMarkFn implementation?
> > Is
> > > > the Window not supposed to be fired based on Ingestion time?
> > > >
> > > >
> > > >
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by Thomas Groh <tg...@google.com.INVALID>.
You can adjust the trigger in the windowing transform if your sink can
handle being written to multiple times for the same window. For example, if
the sink appends to the output when it receives new data in a window, you
could add something like

Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).withLateFirings(AfterPane.elementCountAtLeast(1))).discardingFiredPanes();

This will cause elements to be output some amount of time after they are
first received from Kafka, even if Kafka does not have any new elements.
Elements will only be output by the GroupByKey once.

We should still have a JIRA to improve the KafkaIO watermark tracking in
the absence of new records .

On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <su...@gmail.com>
wrote:

> Thanks Raghu.
>
> I don't have much control over changing KafkaIO properties.  I added
> KafkaIO code for completing the example.  Are there any changes that can be
> done to Windowing to achieve the same behavior?
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <ra...@google.com.invalid>
> wrote:
>
> > The default implementation returns processing timestamp of the last
> record
> > (in effect. more accurately it returns same as getTimestamp(), which
> might
> > overridden by user).
> >
> > As a work around, yes, you can provide your own watermarkFn that
> > essentially returns Now() or Now()-1sec. (usage in javadoc
> > <https://github.com/apache/incubator-beam/blob/master/
> > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > kafka/KafkaIO.java#L138>
> > )
> >
> > I think default watermark should be smarter. it should advance to current
> > time if there aren't any records to read from Kafka. Could you file a
> jira?
> >
> > thanks,
> > Raghu.
> >
> > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Hi All
> > >
> > >
> > > I am trying to do some simple batch processing on KafkaIO records.  My
> > beam
> > > pipeline looks like following:
> > >
> > > pipeline.apply(KafkaIO.read()
> > >         .withTopics(ImmutableList.of(s"mytopic"))
> > >         .withBootstrapServers("localhost:9200")
> > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > > KV<String,String>
> > >
> > > .apply("WindowBy10Sec", Window.<KV<String,
> > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > >
> > > .apply("GroupByKey", GroupByKey.create())
> > >
> > > .apply("Sink", ParDo.of(new MySink())
> > >
> > >
> > > My Kafka Source already has some messages 1000+, and new messages
> arrive
> > > every few minutes.
> > >
> > > When i start my pipeline,  i can see that it reads all the 1000+
> messages
> > > from Kafka.  However, Window does not fire untill a new message arrives
> > in
> > > Kafka.  And Sink does not receive any message until that point.  Do i
> > need
> > > to override the WaterMarkFn here? Since i am not providing any
> > timeStampFn
> > > , i am assuming that timestamps will be assigned as in when message
> > arrives
> > > i.e. ingestion time.  What is the default WaterMarkFn implementation?
> Is
> > > the Window not supposed to be fired based on Ingestion time?
> > >
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> >
>

Re: KafkaIO Windowing Fn

Posted by "Chawla,Sumit " <su...@gmail.com>.
Thanks Raghu.

I don't have much control over changing KafkaIO properties.  I added
KafkaIO code for completing the example.  Are there any changes that can be
done to Windowing to achieve the same behavior?

Regards
Sumit Chawla


On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <ra...@google.com.invalid>
wrote:

> The default implementation returns processing timestamp of the last record
> (in effect. more accurately it returns same as getTimestamp(), which might
> overridden by user).
>
> As a work around, yes, you can provide your own watermarkFn that
> essentially returns Now() or Now()-1sec. (usage in javadoc
> <https://github.com/apache/incubator-beam/blob/master/
> sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> kafka/KafkaIO.java#L138>
> )
>
> I think default watermark should be smarter. it should advance to current
> time if there aren't any records to read from Kafka. Could you file a jira?
>
> thanks,
> Raghu.
>
> On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Hi All
> >
> >
> > I am trying to do some simple batch processing on KafkaIO records.  My
> beam
> > pipeline looks like following:
> >
> > pipeline.apply(KafkaIO.read()
> >         .withTopics(ImmutableList.of(s"mytopic"))
> >         .withBootstrapServers("localhost:9200")
> > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > KV<String,String>
> >
> > .apply("WindowBy10Sec", Window.<KV<String,
> > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> >
> > .apply("GroupByKey", GroupByKey.create())
> >
> > .apply("Sink", ParDo.of(new MySink())
> >
> >
> > My Kafka Source already has some messages 1000+, and new messages arrive
> > every few minutes.
> >
> > When i start my pipeline,  i can see that it reads all the 1000+ messages
> > from Kafka.  However, Window does not fire untill a new message arrives
> in
> > Kafka.  And Sink does not receive any message until that point.  Do i
> need
> > to override the WaterMarkFn here? Since i am not providing any
> timeStampFn
> > , i am assuming that timestamps will be assigned as in when message
> arrives
> > i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
> > the Window not supposed to be fired based on Ingestion time?
> >
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
>

Re: KafkaIO Windowing Fn

Posted by Raghu Angadi <ra...@google.com.INVALID>.
The default implementation returns processing timestamp of the last record
(in effect. more accurately it returns same as getTimestamp(), which might
overridden by user).

As a work around, yes, you can provide your own watermarkFn that
essentially returns Now() or Now()-1sec. (usage in javadoc
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L138>
)

I think default watermark should be smarter. it should advance to current
time if there aren't any records to read from Kafka. Could you file a jira?

thanks,
Raghu.

On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> Hi All
>
>
> I am trying to do some simple batch processing on KafkaIO records.  My beam
> pipeline looks like following:
>
> pipeline.apply(KafkaIO.read()
>         .withTopics(ImmutableList.of(s"mytopic"))
>         .withBootstrapServers("localhost:9200")
> .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> KV<String,String>
>
> .apply("WindowBy10Sec", Window.<KV<String,
> JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> 10))).withAllowedLateness(Duration.standardSeconds(1)))
>
> .apply("GroupByKey", GroupByKey.create())
>
> .apply("Sink", ParDo.of(new MySink())
>
>
> My Kafka Source already has some messages 1000+, and new messages arrive
> every few minutes.
>
> When i start my pipeline,  i can see that it reads all the 1000+ messages
> from Kafka.  However, Window does not fire untill a new message arrives in
> Kafka.  And Sink does not receive any message until that point.  Do i need
> to override the WaterMarkFn here? Since i am not providing any timeStampFn
> , i am assuming that timestamps will be assigned as in when message arrives
> i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
> the Window not supposed to be fired based on Ingestion time?
>
>
>
>
> Regards
> Sumit Chawla
>