You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kjetil Halvorsen <kj...@cognite.com> on 2020/03/02 19:50:15 UTC

Splittable DoFn and Dataflow, "conflicting bucketing functions"

Hi,

I am looking for pointers to a Dataflow runner error message: Workflow
failed. Causes: Step s22 has conflicting bucketing functions,

This happens at the very startup of the job execution, and I am unable to
find any pointer as to where in the code/job definition the origin of the
conflict is. The same job runs just fine in the DirectRunner.

The job contains a splittable DoFn (unbound) and I have tried it with both
a windowing transform and without a windowing transform--both fail with the
same result on Dataflow.

This is my first foray into splittable DoFn territory so I am sure I have
just made some basic missteps.

Cheers,
Kjetil




-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | kjetil.halvorsen@cognite.com
www.cognite.com | LIBERATE YOUR DATA™

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Luke Cwik <lc...@google.com>.
Due to a different issue, I was able to track down this 6 year old issue
and fix it within the Dataflow service.

Please tell me if you still hit this issue.

On Mon, Nov 16, 2020 at 4:01 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Kjetil,
>
> I spent some time on this and I don't think it's a problem with SDF but a
> problem with how dataflow does GBK. It seems like you pipeline looks like:
>
> File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF
>
>            ^
>
>
>  Create()   - |
>
> I figured out a workaround for you. If you do a Reshuffle after Create,
> like:
> File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF
>
>            ^
>
>                    Create() -> Reshuffle.viaRandomKey()   - |
> you will have your pipeline start to run successfully.
>
> On Mon, Nov 16, 2020 at 3:53 PM Boyuan Zhang <bo...@apache.org> wrote:
>
>>
>>
>> On 2020/03/26 13:42:51, Kjetil Halvorsen <kj...@cognite.com>
>> wrote:
>> > Another update on this issue. I observe the same with bounded SDFs when
>> > running in streaming mode. The general pipeline is [unbounded watcher,
>> sdf]
>> > -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...
>> >
>> > This also fails with the conflicting bucketing function error message.
>> When
>> > I remove the File.IO side input, the pipeline executes again (on
>> Dataflow).
>> >
>> > This one hurts us a bit because we use the File.IO side inputs to feed
>> the
>> > pipeline with config settings, so it is not trivial for us to remove it.
>> >
>> > Best,
>> > Kjetil
>> >
>> > On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen <
>> > kjetil.halvorsen@cognite.com> wrote:
>> >
>> > > Perfect, thanks.
>> > >
>> > > I did some more testing, and it seems to narrow down to using
>> FileIO.match
>> > > -> readMatches -> to drive the upstream side input. I have attached a
>> > > pipeline that reproduces the error. When I run it with Beam 2.17 or
>> 2.18 it
>> > > will fail on Dataflow. I have not tested with 2.19 due to the blocker
>> on
>> > > Win Java.
>> > >
>> > > Please let me know if there is anything else I can do to help. I am
>> very
>> > > motivated to get this sorted out as we have lots of scenarios lined
>> up.
>> > >
>> > > Best,
>> > > Kjetil
>> > >
>> > > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote:
>> > >
>> > >> That doesn't sound like it should be an issue and sounds like a bug
>> in
>> > >> Dataflow.
>> > >>
>> > >> If you're willing to share a minimal pipeline that gets this error.
>> I can
>> > >> get an issue opened up internally and assigned.
>> > >>
>> > >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
>> > >> kjetil.halvorsen@cognite.com> wrote:
>> > >>
>> > >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware
>> of
>> > >>> it, and it was of great help to interpret the error message from
>> Dataflow.
>> > >>>
>> > >>> I found the error/bug in an upstream DoFn (execute before the SDF)
>> with
>> > >>> a side-input. Both the main input to the DoFn and the side input
>> were
>> > >>> bounded and using the default window and trigger (i.e. no windowing
>> nor
>> > >>> trigger specified in the job).
>> > >>>
>> > >>> When I moved that particular DoFn to be downstream to the SDF, the
>> job
>> > >>> started working.
>> > >>>
>> > >>> Maybe this is by design and I just hadn't registered that one cannot
>> > >>> have a side-input DoFn upstream to an unbound SDF?
>> > >>>
>> > >>> In any case, thank you for the patience and willingness to help out.
>> > >>>
>> > >>> Best,
>> > >>> Kjetil
>> > >>>
>> > >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>> > >>>
>> > >>>>
>> > >>>>
>> > >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
>> > >>>> kjetil.halvorsen@cognite.com> wrote:
>> > >>>>
>> > >>>>> Thanks for looking into this. I have been distracted on a separate
>> > >>>>> (Beam) feature the past week so it took me some time to make
>> progress. In
>> > >>>>> any case, I have run new tests on Dataflow with a minimal
>> pipeline.
>> > >>>>> Unfortunately with the same results: "step 22 has conflicting
>> bucketing
>> > >>>>> functions". More info inline below.
>> > >>>>>
>> > >>>>> Best,
>> > >>>>> Kjetil
>> > >>>>>
>> > >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com>
>> wrote:
>> > >>>>>
>> > >>>>>> The bucketing "error" is likely related to what windowing
>> > >>>>>> strategy/pipeline shape you have. Have you tried running your
>> SDF inside an
>> > >>>>>> empty pipeline possibly followed by a ParDo to log what records
>> you are
>> > >>>>>> seeing?
>> > >>>>>>
>> > >>>>>
>> > >>>>> I slimmed the pipeline down to just being this sdf plus a
>> MapElements
>> > >>>>> that log the records. No windowing definitions nor any trigger
>> definitions.
>> > >>>>> The results were exactly the same: The job fails somewhere in the
>> > >>>>> startup/verification phase in Dataflow (i.e. after compile/upload
>> from the
>> > >>>>> client, but as a part of the Dataflow startup procedure). "Step
>> 22 has
>> > >>>>> conflicting bucketing functions".
>> > >>>>>
>> > >>>>
>> > >>>> The error is because the windowing fn on the GBKs are different.
>> You
>> > >>>> can dump and inspect the JSON job description using the flag
>> > >>>> --dataflowJobFile=/path/to/dump/file.json
>> > >>>>
>> > >>>>
>> > >>>>>
>> > >>>>>>
>> > >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>> > >>>>>> kjetil.halvorsen@cognite.com> wrote:
>> > >>>>>>
>> > >>>>>>> Thank's for the willingness to help out. The general context is
>> that
>> > >>>>>>> we are developing a set of new Beam based connectors/readers.
>> > >>>>>>>
>> > >>>>>>> I had hoped that SDF was ready for use with Dataflow--just
>> because
>> > >>>>>>> the interface is nice to work with. In general, would you
>> recommend that we
>> > >>>>>>> look at the legacy source APIs for building our
>> connectors/readers?
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>> I would not. A few contributors have been making rapid progress
>> over
>> > >>>>>> the past few months to finish SDFs with Python done from an API
>> standpoint
>> > >>>>>> (there is some additional integration/scaling testing going on),
>> Java is
>> > >>>>>> missing progress reporting from the API and watermark estimation
>> but I was
>> > >>>>>> hoping to finish those API pieces this month and Go has started
>> on the
>> > >>>>>> batch API implementation.
>> > >>>>>>
>> > >>>>>
>> > >>>>> Great, I am happy to hear that. Would love to just keep investing
>> in
>> > >>>>> the SDF implementations we started.
>> > >>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>>
>> > >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I
>> apologize
>> > >>>>>>> for the bad formatting--still learning the grips of
>> communicating code via
>> > >>>>>>> e-mail). . We have used the overall pattern from the file
>> watcher. I.e. the
>> > >>>>>>> SDF creates "poll requests" at regular intervals which a
>> downstream parDo
>> > >>>>>>> executes. The SDF uses the built-in OffserRange as the basis
>> for the range
>> > >>>>>>> tracker.
>> > >>>>>>>
>> > >>>>>>> I am happy to receive any pointers on improvements, changes,
>> > >>>>>>> debugging paths.
>> > >>>>>>>
>> > >>>>>>> /**
>> > >>>>>>>  * This function generates an unbounded stream of source
>> queries.
>> > >>>>>>>  */
>> > >>>>>>> @DoFn.UnboundedPerElement
>> > >>>>>>> public class GenerateTsPointRequestsUnboundFn extends
>> > >>>>>>> DoFn<RequestParameters, RequestParameters> {
>> > >>>>>>>
>> > >>>>>>>     @Setup
>> > >>>>>>>     public void setup() {
>> > >>>>>>>         validate();
>> > >>>>>>>     }
>> > >>>>>>>
>> > >>>>>>>     @ProcessElement
>> > >>>>>>>     public ProcessContinuation processElement(@Element Element
>> > >>>>>>> inputElement,
>> > >>>>>>>
>> > >>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>> > >>>>>>>
>> > >>>>>>> OutputReceiver<outputElement> out,
>> > >>>>>>>                                               ProcessContext
>> > >>>>>>> context) throws Exception {
>> > >>>>>>>
>> > >>>>>>>         long startRange =
>> tracker.currentRestriction().getFrom();
>> > >>>>>>>         long endRange = tracker.currentRestriction().getTo();
>> > >>>>>>>
>> > >>>>>>>         while (startRange < (System.currentTimeMillis() -
>> > >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>> > >>>>>>>             // Set the query's max end to current time - offset.
>> > >>>>>>>             if (endRange > (System.currentTimeMillis() -
>> > >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>> > >>>>>>>                 endRange = (System.currentTimeMillis() -
>> > >>>>>>> readerConfig.getPollOffset().get().toMillis());
>> > >>>>>>>             }
>> > >>>>>>>
>> > >>>>>>>             if (tracker.tryClaim(endRange - 1)) {
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>> Why do you try and claim to the endRange here? Shouldn't you
>> claim
>> > >>>>>> subranges, so [start, start+pollsize), [start+pollisize,
>> start+pollsize*2),
>> > >>>>>> ..., [start+pollsize*N, end)?
>> > >>>>>>
>> > >>>>>> Also, if start is significantly smaller then current time, you
>> could
>> > >>>>>> implement the @SplitRestriction method.
>> > >>>>>>
>> > >>>>>>
>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>> > >>>>>>
>> > >>>>>
>> > >>>>> Good points! My original thinking was to have a second (bounded)
>> SDF
>> > >>>>> that splits the ranges and executes the actual reads from the
>> source.
>> > >>>>> Similar to the "watch + read" pattern. That way I can reuse most
>> of the
>> > >>>>> code between the unbounded and bounded scenario. Maybe that's a
>> sub-optimal
>> > >>>>> approach?
>> > >>>>>
>> > >>>>
>> > >>>> Following a watch + read pattern works well.
>> > >>>>
>> > >>>> And claiming the entire range when writing a generator function
>> makes
>> > >>>> sense.
>> > >>>>
>> > >>>>
>> > >>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>> > >>>>>>>
>> > >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement,
>> startRange,
>> > >>>>>>> endRange),
>> > >>>>>>>
>> > >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
>> > >>>>>>>
>> > >>>>>>>                 // Update the start and end range for the next
>> > >>>>>>> iteration
>> > >>>>>>>                 startRange = endRange;
>> > >>>>>>>                 endRange = tracker.currentRestriction().getTo();
>> > >>>>>>>             } else {
>> > >>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work
>> due to
>> > >>>>>>> checkpointing or splitting.");
>> > >>>>>>>                 return ProcessContinuation.stop();
>> > >>>>>>>             }
>> > >>>>>>>
>> > >>>>>>>             if (startRange >=
>> tracker.currentRestriction().getTo()) {
>> > >>>>>>>                 LOG.info(localLoggingPrefix + "Completed the
>> request
>> > >>>>>>> time range. Will stop the reader.");
>> > >>>>>>>                 return ProcessContinuation.stop();
>> > >>>>>>>             }
>> > >>>>>>>
>> > >>>>>>>             return
>> > >>>>>>>
>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>> > >>>>>>>
>> > >>>>>>> readerConfig.getPollInterval().get().toMillis()));
>> > >>>>>>>         }
>> > >>>>>>>
>> > >>>>>>>         return
>> > >>>>>>>
>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>> > >>>>>>>
>>  readerConfig.getPollInterval().get().toMillis()));
>> > >>>>>>>     }
>> > >>>>>>>
>> > >>>>>>>     private OutputElement buildOutputElement(Element element,
>> > >>>>>>>                                                      long start,
>> > >>>>>>>                                                      long end) {
>> > >>>>>>>         return outputElement
>> > >>>>>>>                 .withParameter(START_KEY, start)
>> > >>>>>>>                 .withParameter(END_KEY, end);
>> > >>>>>>>     }
>> > >>>>>>>
>> > >>>>>>>     @GetInitialRestriction
>> > >>>>>>>     public OffsetRange getInitialRestriction(Element element)
>> throws
>> > >>>>>>> Exception {
>> > >>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>> > >>>>>>>     }
>> > >>>>>>> }
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com>
>> wrote:
>> > >>>>>>>
>> > >>>>>>>> SplittableDoFn has experimental support within Dataflow so the
>> way
>> > >>>>>>>> you may be using it could be correct but unsupported.
>> > >>>>>>>>
>> > >>>>>>>> Can you provide snippets/details of your splittable dofn
>> > >>>>>>>> implementation?
>> > >>>>>>>>
>> > >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>> > >>>>>>>> kjetil.halvorsen@cognite.com> wrote:
>> > >>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> Hi,
>> > >>>>>>>>>
>> > >>>>>>>>> I am looking for pointers to a Dataflow runner error message:
>> Workflow
>> > >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>> > >>>>>>>>>
>> > >>>>>>>>> This happens at the very startup of the job execution, and I
>> am
>> > >>>>>>>>> unable to find any pointer as to where in the code/job
>> definition the
>> > >>>>>>>>> origin of the conflict is. The same job runs just fine in the
>> DirectRunner.
>> > >>>>>>>>>
>> > >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried
>> it
>> > >>>>>>>>> with both a windowing transform and without a windowing
>> transform--both
>> > >>>>>>>>> fail with the same result on Dataflow.
>> > >>>>>>>>>
>> > >>>>>>>>> This is my first foray into splittable DoFn territory so I am
>> sure
>> > >>>>>>>>> I have just made some basic missteps.
>> > >>>>>>>>>
>> > >>>>>>>>> Cheers,
>> > >>>>>>>>> Kjetil
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> --
>> > >>>>>>>>>
>> > >>>>>>>>> *Kjetil Halvorsen*
>> > >>>>>>>>> Chief Architect, Enterprise Integration
>> > >>>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> > >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>
>> > >>>>>>> --
>> > >>>>>>>
>> > >>>>>>> *Kjetil Halvorsen*
>> > >>>>>>> Chief Architect, Enterprise Integration
>> > >>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> > >>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>
>> > >>>>> --
>> > >>>>>
>> > >>>>> *Kjetil Halvorsen*
>> > >>>>> Chief Architect, Enterprise Integration
>> > >>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> > >>>>> www.cognite.com | LIBERATE YOUR DATA™
>> > >>>>>
>> > >>>>>
>> > >>>
>> > >>> --
>> > >>>
>> > >>> *Kjetil Halvorsen*
>> > >>> Chief Architect, Enterprise Integration
>> > >>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> > >>> www.cognite.com | LIBERATE YOUR DATA™
>> > >>>
>> > >>>
>> > >
>> > > --
>> > >
>> > > *Kjetil Halvorsen*
>> > > Chief Architect, Enterprise Integration
>> > > +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> > > www.cognite.com | LIBERATE YOUR DATA™
>> > >
>> > >
>> >
>> > --
>> >
>> > *Kjetil Halvorsen*
>> > Chief Architect, Enterprise Integration
>> > +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> > www.cognite.com | LIBERATE YOUR DATA™
>> >
>>
>> +boyuanz@google.com
>>
>

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Boyuan Zhang <bo...@google.com>.
Hi Kjetil,

I spent some time on this and I don't think it's a problem with SDF but a
problem with how dataflow does GBK. It seems like you pipeline looks like:

File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF

         ^

 Create()   - |

I figured out a workaround for you. If you do a Reshuffle after Create,
like:
File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF

         ^
                   Create() -> Reshuffle.viaRandomKey()   - |
you will have your pipeline start to run successfully.

On Mon, Nov 16, 2020 at 3:53 PM Boyuan Zhang <bo...@apache.org> wrote:

>
>
> On 2020/03/26 13:42:51, Kjetil Halvorsen <kj...@cognite.com>
> wrote:
> > Another update on this issue. I observe the same with bounded SDFs when
> > running in streaming mode. The general pipeline is [unbounded watcher,
> sdf]
> > -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...
> >
> > This also fails with the conflicting bucketing function error message.
> When
> > I remove the File.IO side input, the pipeline executes again (on
> Dataflow).
> >
> > This one hurts us a bit because we use the File.IO side inputs to feed
> the
> > pipeline with config settings, so it is not trivial for us to remove it.
> >
> > Best,
> > Kjetil
> >
> > On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen <
> > kjetil.halvorsen@cognite.com> wrote:
> >
> > > Perfect, thanks.
> > >
> > > I did some more testing, and it seems to narrow down to using
> FileIO.match
> > > -> readMatches -> to drive the upstream side input. I have attached a
> > > pipeline that reproduces the error. When I run it with Beam 2.17 or
> 2.18 it
> > > will fail on Dataflow. I have not tested with 2.19 due to the blocker
> on
> > > Win Java.
> > >
> > > Please let me know if there is anything else I can do to help. I am
> very
> > > motivated to get this sorted out as we have lots of scenarios lined up.
> > >
> > > Best,
> > > Kjetil
> > >
> > > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > >> That doesn't sound like it should be an issue and sounds like a bug in
> > >> Dataflow.
> > >>
> > >> If you're willing to share a minimal pipeline that gets this error. I
> can
> > >> get an issue opened up internally and assigned.
> > >>
> > >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
> > >> kjetil.halvorsen@cognite.com> wrote:
> > >>
> > >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware
> of
> > >>> it, and it was of great help to interpret the error message from
> Dataflow.
> > >>>
> > >>> I found the error/bug in an upstream DoFn (execute before the SDF)
> with
> > >>> a side-input. Both the main input to the DoFn and the side input were
> > >>> bounded and using the default window and trigger (i.e. no windowing
> nor
> > >>> trigger specified in the job).
> > >>>
> > >>> When I moved that particular DoFn to be downstream to the SDF, the
> job
> > >>> started working.
> > >>>
> > >>> Maybe this is by design and I just hadn't registered that one cannot
> > >>> have a side-input DoFn upstream to an unbound SDF?
> > >>>
> > >>> In any case, thank you for the patience and willingness to help out.
> > >>>
> > >>> Best,
> > >>> Kjetil
> > >>>
> > >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
> > >>>
> > >>>>
> > >>>>
> > >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
> > >>>> kjetil.halvorsen@cognite.com> wrote:
> > >>>>
> > >>>>> Thanks for looking into this. I have been distracted on a separate
> > >>>>> (Beam) feature the past week so it took me some time to make
> progress. In
> > >>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
> > >>>>> Unfortunately with the same results: "step 22 has conflicting
> bucketing
> > >>>>> functions". More info inline below.
> > >>>>>
> > >>>>> Best,
> > >>>>> Kjetil
> > >>>>>
> > >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com>
> wrote:
> > >>>>>
> > >>>>>> The bucketing "error" is likely related to what windowing
> > >>>>>> strategy/pipeline shape you have. Have you tried running your SDF
> inside an
> > >>>>>> empty pipeline possibly followed by a ParDo to log what records
> you are
> > >>>>>> seeing?
> > >>>>>>
> > >>>>>
> > >>>>> I slimmed the pipeline down to just being this sdf plus a
> MapElements
> > >>>>> that log the records. No windowing definitions nor any trigger
> definitions.
> > >>>>> The results were exactly the same: The job fails somewhere in the
> > >>>>> startup/verification phase in Dataflow (i.e. after compile/upload
> from the
> > >>>>> client, but as a part of the Dataflow startup procedure). "Step 22
> has
> > >>>>> conflicting bucketing functions".
> > >>>>>
> > >>>>
> > >>>> The error is because the windowing fn on the GBKs are different. You
> > >>>> can dump and inspect the JSON job description using the flag
> > >>>> --dataflowJobFile=/path/to/dump/file.json
> > >>>>
> > >>>>
> > >>>>>
> > >>>>>>
> > >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
> > >>>>>> kjetil.halvorsen@cognite.com> wrote:
> > >>>>>>
> > >>>>>>> Thank's for the willingness to help out. The general context is
> that
> > >>>>>>> we are developing a set of new Beam based connectors/readers.
> > >>>>>>>
> > >>>>>>> I had hoped that SDF was ready for use with Dataflow--just
> because
> > >>>>>>> the interface is nice to work with. In general, would you
> recommend that we
> > >>>>>>> look at the legacy source APIs for building our
> connectors/readers?
> > >>>>>>>
> > >>>>>>
> > >>>>>> I would not. A few contributors have been making rapid progress
> over
> > >>>>>> the past few months to finish SDFs with Python done from an API
> standpoint
> > >>>>>> (there is some additional integration/scaling testing going on),
> Java is
> > >>>>>> missing progress reporting from the API and watermark estimation
> but I was
> > >>>>>> hoping to finish those API pieces this month and Go has started
> on the
> > >>>>>> batch API implementation.
> > >>>>>>
> > >>>>>
> > >>>>> Great, I am happy to hear that. Would love to just keep investing
> in
> > >>>>> the SDF implementations we started.
> > >>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>>
> > >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize
> > >>>>>>> for the bad formatting--still learning the grips of
> communicating code via
> > >>>>>>> e-mail). . We have used the overall pattern from the file
> watcher. I.e. the
> > >>>>>>> SDF creates "poll requests" at regular intervals which a
> downstream parDo
> > >>>>>>> executes. The SDF uses the built-in OffserRange as the basis for
> the range
> > >>>>>>> tracker.
> > >>>>>>>
> > >>>>>>> I am happy to receive any pointers on improvements, changes,
> > >>>>>>> debugging paths.
> > >>>>>>>
> > >>>>>>> /**
> > >>>>>>>  * This function generates an unbounded stream of source queries.
> > >>>>>>>  */
> > >>>>>>> @DoFn.UnboundedPerElement
> > >>>>>>> public class GenerateTsPointRequestsUnboundFn extends
> > >>>>>>> DoFn<RequestParameters, RequestParameters> {
> > >>>>>>>
> > >>>>>>>     @Setup
> > >>>>>>>     public void setup() {
> > >>>>>>>         validate();
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>>     @ProcessElement
> > >>>>>>>     public ProcessContinuation processElement(@Element Element
> > >>>>>>> inputElement,
> > >>>>>>>
> > >>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
> > >>>>>>>
> > >>>>>>> OutputReceiver<outputElement> out,
> > >>>>>>>                                               ProcessContext
> > >>>>>>> context) throws Exception {
> > >>>>>>>
> > >>>>>>>         long startRange = tracker.currentRestriction().getFrom();
> > >>>>>>>         long endRange = tracker.currentRestriction().getTo();
> > >>>>>>>
> > >>>>>>>         while (startRange < (System.currentTimeMillis() -
> > >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> > >>>>>>>             // Set the query's max end to current time - offset.
> > >>>>>>>             if (endRange > (System.currentTimeMillis() -
> > >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> > >>>>>>>                 endRange = (System.currentTimeMillis() -
> > >>>>>>> readerConfig.getPollOffset().get().toMillis());
> > >>>>>>>             }
> > >>>>>>>
> > >>>>>>>             if (tracker.tryClaim(endRange - 1)) {
> > >>>>>>>
> > >>>>>>
> > >>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
> > >>>>>> subranges, so [start, start+pollsize), [start+pollisize,
> start+pollsize*2),
> > >>>>>> ..., [start+pollsize*N, end)?
> > >>>>>>
> > >>>>>> Also, if start is significantly smaller then current time, you
> could
> > >>>>>> implement the @SplitRestriction method.
> > >>>>>>
> > >>>>>>
> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
> > >>>>>>
> > >>>>>
> > >>>>> Good points! My original thinking was to have a second (bounded)
> SDF
> > >>>>> that splits the ranges and executes the actual reads from the
> source.
> > >>>>> Similar to the "watch + read" pattern. That way I can reuse most
> of the
> > >>>>> code between the unbounded and bounded scenario. Maybe that's a
> sub-optimal
> > >>>>> approach?
> > >>>>>
> > >>>>
> > >>>> Following a watch + read pattern works well.
> > >>>>
> > >>>> And claiming the entire range when writing a generator function
> makes
> > >>>> sense.
> > >>>>
> > >>>>
> > >>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
> > >>>>>>>
> > >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement,
> startRange,
> > >>>>>>> endRange),
> > >>>>>>>
> > >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
> > >>>>>>>
> > >>>>>>>                 // Update the start and end range for the next
> > >>>>>>> iteration
> > >>>>>>>                 startRange = endRange;
> > >>>>>>>                 endRange = tracker.currentRestriction().getTo();
> > >>>>>>>             } else {
> > >>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due
> to
> > >>>>>>> checkpointing or splitting.");
> > >>>>>>>                 return ProcessContinuation.stop();
> > >>>>>>>             }
> > >>>>>>>
> > >>>>>>>             if (startRange >=
> tracker.currentRestriction().getTo()) {
> > >>>>>>>                 LOG.info(localLoggingPrefix + "Completed the
> request
> > >>>>>>> time range. Will stop the reader.");
> > >>>>>>>                 return ProcessContinuation.stop();
> > >>>>>>>             }
> > >>>>>>>
> > >>>>>>>             return
> > >>>>>>>
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> > >>>>>>>
> > >>>>>>> readerConfig.getPollInterval().get().toMillis()));
> > >>>>>>>         }
> > >>>>>>>
> > >>>>>>>         return
> > >>>>>>>
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> > >>>>>>>
>  readerConfig.getPollInterval().get().toMillis()));
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>>     private OutputElement buildOutputElement(Element element,
> > >>>>>>>                                                      long start,
> > >>>>>>>                                                      long end) {
> > >>>>>>>         return outputElement
> > >>>>>>>                 .withParameter(START_KEY, start)
> > >>>>>>>                 .withParameter(END_KEY, end);
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>>     @GetInitialRestriction
> > >>>>>>>     public OffsetRange getInitialRestriction(Element element)
> throws
> > >>>>>>> Exception {
> > >>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
> > >>>>>>>     }
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com>
> wrote:
> > >>>>>>>
> > >>>>>>>> SplittableDoFn has experimental support within Dataflow so the
> way
> > >>>>>>>> you may be using it could be correct but unsupported.
> > >>>>>>>>
> > >>>>>>>> Can you provide snippets/details of your splittable dofn
> > >>>>>>>> implementation?
> > >>>>>>>>
> > >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> > >>>>>>>> kjetil.halvorsen@cognite.com> wrote:
> > >>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> I am looking for pointers to a Dataflow runner error message:
> Workflow
> > >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
> > >>>>>>>>>
> > >>>>>>>>> This happens at the very startup of the job execution, and I am
> > >>>>>>>>> unable to find any pointer as to where in the code/job
> definition the
> > >>>>>>>>> origin of the conflict is. The same job runs just fine in the
> DirectRunner.
> > >>>>>>>>>
> > >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried
> it
> > >>>>>>>>> with both a windowing transform and without a windowing
> transform--both
> > >>>>>>>>> fail with the same result on Dataflow.
> > >>>>>>>>>
> > >>>>>>>>> This is my first foray into splittable DoFn territory so I am
> sure
> > >>>>>>>>> I have just made some basic missteps.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Kjetil
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>>
> > >>>>>>>>> *Kjetil Halvorsen*
> > >>>>>>>>> Chief Architect, Enterprise Integration
> > >>>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>>
> > >>>>>>> *Kjetil Halvorsen*
> > >>>>>>> Chief Architect, Enterprise Integration
> > >>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > >>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>> --
> > >>>>>
> > >>>>> *Kjetil Halvorsen*
> > >>>>> Chief Architect, Enterprise Integration
> > >>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > >>>>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>>>
> > >>>>>
> > >>>
> > >>> --
> > >>>
> > >>> *Kjetil Halvorsen*
> > >>> Chief Architect, Enterprise Integration
> > >>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > >>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>
> > >>>
> > >
> > > --
> > >
> > > *Kjetil Halvorsen*
> > > Chief Architect, Enterprise Integration
> > > +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > > www.cognite.com | LIBERATE YOUR DATA™
> > >
> > >
> >
> > --
> >
> > *Kjetil Halvorsen*
> > Chief Architect, Enterprise Integration
> > +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > www.cognite.com | LIBERATE YOUR DATA™
> >
>
> +boyuanz@google.com
>

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Boyuan Zhang <bo...@apache.org>.

On 2020/03/26 13:42:51, Kjetil Halvorsen <kj...@cognite.com> wrote: 
> Another update on this issue. I observe the same with bounded SDFs when
> running in streaming mode. The general pipeline is [unbounded watcher, sdf]
> -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...
> 
> This also fails with the conflicting bucketing function error message. When
> I remove the File.IO side input, the pipeline executes again (on Dataflow).
> 
> This one hurts us a bit because we use the File.IO side inputs to feed the
> pipeline with config settings, so it is not trivial for us to remove it.
> 
> Best,
> Kjetil
> 
> On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen <
> kjetil.halvorsen@cognite.com> wrote:
> 
> > Perfect, thanks.
> >
> > I did some more testing, and it seems to narrow down to using FileIO.match
> > -> readMatches -> to drive the upstream side input. I have attached a
> > pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it
> > will fail on Dataflow. I have not tested with 2.19 due to the blocker on
> > Win Java.
> >
> > Please let me know if there is anything else I can do to help. I am very
> > motivated to get this sorted out as we have lots of scenarios lined up.
> >
> > Best,
> > Kjetil
> >
> > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote:
> >
> >> That doesn't sound like it should be an issue and sounds like a bug in
> >> Dataflow.
> >>
> >> If you're willing to share a minimal pipeline that gets this error. I can
> >> get an issue opened up internally and assigned.
> >>
> >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
> >> kjetil.halvorsen@cognite.com> wrote:
> >>
> >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of
> >>> it, and it was of great help to interpret the error message from Dataflow.
> >>>
> >>> I found the error/bug in an upstream DoFn (execute before the SDF) with
> >>> a side-input. Both the main input to the DoFn and the side input were
> >>> bounded and using the default window and trigger (i.e. no windowing nor
> >>> trigger specified in the job).
> >>>
> >>> When I moved that particular DoFn to be downstream to the SDF, the job
> >>> started working.
> >>>
> >>> Maybe this is by design and I just hadn't registered that one cannot
> >>> have a side-input DoFn upstream to an unbound SDF?
> >>>
> >>> In any case, thank you for the patience and willingness to help out.
> >>>
> >>> Best,
> >>> Kjetil
> >>>
> >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
> >>>
> >>>>
> >>>>
> >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
> >>>> kjetil.halvorsen@cognite.com> wrote:
> >>>>
> >>>>> Thanks for looking into this. I have been distracted on a separate
> >>>>> (Beam) feature the past week so it took me some time to make progress. In
> >>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
> >>>>> Unfortunately with the same results: "step 22 has conflicting bucketing
> >>>>> functions". More info inline below.
> >>>>>
> >>>>> Best,
> >>>>> Kjetil
> >>>>>
> >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
> >>>>>
> >>>>>> The bucketing "error" is likely related to what windowing
> >>>>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
> >>>>>> empty pipeline possibly followed by a ParDo to log what records you are
> >>>>>> seeing?
> >>>>>>
> >>>>>
> >>>>> I slimmed the pipeline down to just being this sdf plus a MapElements
> >>>>> that log the records. No windowing definitions nor any trigger definitions.
> >>>>> The results were exactly the same: The job fails somewhere in the
> >>>>> startup/verification phase in Dataflow (i.e. after compile/upload from the
> >>>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
> >>>>> conflicting bucketing functions".
> >>>>>
> >>>>
> >>>> The error is because the windowing fn on the GBKs are different. You
> >>>> can dump and inspect the JSON job description using the flag
> >>>> --dataflowJobFile=/path/to/dump/file.json
> >>>>
> >>>>
> >>>>>
> >>>>>>
> >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
> >>>>>> kjetil.halvorsen@cognite.com> wrote:
> >>>>>>
> >>>>>>> Thank's for the willingness to help out. The general context is that
> >>>>>>> we are developing a set of new Beam based connectors/readers.
> >>>>>>>
> >>>>>>> I had hoped that SDF was ready for use with Dataflow--just because
> >>>>>>> the interface is nice to work with. In general, would you recommend that we
> >>>>>>> look at the legacy source APIs for building our connectors/readers?
> >>>>>>>
> >>>>>>
> >>>>>> I would not. A few contributors have been making rapid progress over
> >>>>>> the past few months to finish SDFs with Python done from an API standpoint
> >>>>>> (there is some additional integration/scaling testing going on), Java is
> >>>>>> missing progress reporting from the API and watermark estimation but I was
> >>>>>> hoping to finish those API pieces this month and Go has started on the
> >>>>>> batch API implementation.
> >>>>>>
> >>>>>
> >>>>> Great, I am happy to hear that. Would love to just keep investing in
> >>>>> the SDF implementations we started.
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize
> >>>>>>> for the bad formatting--still learning the grips of communicating code via
> >>>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
> >>>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
> >>>>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
> >>>>>>> tracker.
> >>>>>>>
> >>>>>>> I am happy to receive any pointers on improvements, changes,
> >>>>>>> debugging paths.
> >>>>>>>
> >>>>>>> /**
> >>>>>>>  * This function generates an unbounded stream of source queries.
> >>>>>>>  */
> >>>>>>> @DoFn.UnboundedPerElement
> >>>>>>> public class GenerateTsPointRequestsUnboundFn extends
> >>>>>>> DoFn<RequestParameters, RequestParameters> {
> >>>>>>>
> >>>>>>>     @Setup
> >>>>>>>     public void setup() {
> >>>>>>>         validate();
> >>>>>>>     }
> >>>>>>>
> >>>>>>>     @ProcessElement
> >>>>>>>     public ProcessContinuation processElement(@Element Element
> >>>>>>> inputElement,
> >>>>>>>
> >>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
> >>>>>>>
> >>>>>>> OutputReceiver<outputElement> out,
> >>>>>>>                                               ProcessContext
> >>>>>>> context) throws Exception {
> >>>>>>>
> >>>>>>>         long startRange = tracker.currentRestriction().getFrom();
> >>>>>>>         long endRange = tracker.currentRestriction().getTo();
> >>>>>>>
> >>>>>>>         while (startRange < (System.currentTimeMillis() -
> >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> >>>>>>>             // Set the query's max end to current time - offset.
> >>>>>>>             if (endRange > (System.currentTimeMillis() -
> >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> >>>>>>>                 endRange = (System.currentTimeMillis() -
> >>>>>>> readerConfig.getPollOffset().get().toMillis());
> >>>>>>>             }
> >>>>>>>
> >>>>>>>             if (tracker.tryClaim(endRange - 1)) {
> >>>>>>>
> >>>>>>
> >>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
> >>>>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
> >>>>>> ..., [start+pollsize*N, end)?
> >>>>>>
> >>>>>> Also, if start is significantly smaller then current time, you could
> >>>>>> implement the @SplitRestriction method.
> >>>>>>
> >>>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
> >>>>>>
> >>>>>
> >>>>> Good points! My original thinking was to have a second (bounded) SDF
> >>>>> that splits the ranges and executes the actual reads from the source.
> >>>>> Similar to the "watch + read" pattern. That way I can reuse most of the
> >>>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal
> >>>>> approach?
> >>>>>
> >>>>
> >>>> Following a watch + read pattern works well.
> >>>>
> >>>> And claiming the entire range when writing a generator function makes
> >>>> sense.
> >>>>
> >>>>
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
> >>>>>>>
> >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
> >>>>>>> endRange),
> >>>>>>>
> >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
> >>>>>>>
> >>>>>>>                 // Update the start and end range for the next
> >>>>>>> iteration
> >>>>>>>                 startRange = endRange;
> >>>>>>>                 endRange = tracker.currentRestriction().getTo();
> >>>>>>>             } else {
> >>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
> >>>>>>> checkpointing or splitting.");
> >>>>>>>                 return ProcessContinuation.stop();
> >>>>>>>             }
> >>>>>>>
> >>>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
> >>>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
> >>>>>>> time range. Will stop the reader.");
> >>>>>>>                 return ProcessContinuation.stop();
> >>>>>>>             }
> >>>>>>>
> >>>>>>>             return
> >>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> >>>>>>>
> >>>>>>> readerConfig.getPollInterval().get().toMillis()));
> >>>>>>>         }
> >>>>>>>
> >>>>>>>         return
> >>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> >>>>>>>                 readerConfig.getPollInterval().get().toMillis()));
> >>>>>>>     }
> >>>>>>>
> >>>>>>>     private OutputElement buildOutputElement(Element element,
> >>>>>>>                                                      long start,
> >>>>>>>                                                      long end) {
> >>>>>>>         return outputElement
> >>>>>>>                 .withParameter(START_KEY, start)
> >>>>>>>                 .withParameter(END_KEY, end);
> >>>>>>>     }
> >>>>>>>
> >>>>>>>     @GetInitialRestriction
> >>>>>>>     public OffsetRange getInitialRestriction(Element element) throws
> >>>>>>> Exception {
> >>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
> >>>>>>>     }
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
> >>>>>>>
> >>>>>>>> SplittableDoFn has experimental support within Dataflow so the way
> >>>>>>>> you may be using it could be correct but unsupported.
> >>>>>>>>
> >>>>>>>> Can you provide snippets/details of your splittable dofn
> >>>>>>>> implementation?
> >>>>>>>>
> >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> >>>>>>>> kjetil.halvorsen@cognite.com> wrote:
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
> >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
> >>>>>>>>>
> >>>>>>>>> This happens at the very startup of the job execution, and I am
> >>>>>>>>> unable to find any pointer as to where in the code/job definition the
> >>>>>>>>> origin of the conflict is. The same job runs just fine in the DirectRunner.
> >>>>>>>>>
> >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
> >>>>>>>>> with both a windowing transform and without a windowing transform--both
> >>>>>>>>> fail with the same result on Dataflow.
> >>>>>>>>>
> >>>>>>>>> This is my first foray into splittable DoFn territory so I am sure
> >>>>>>>>> I have just made some basic missteps.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Kjetil
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>> *Kjetil Halvorsen*
> >>>>>>>>> Chief Architect, Enterprise Integration
> >>>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>> *Kjetil Halvorsen*
> >>>>>>> Chief Architect, Enterprise Integration
> >>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> >>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> *Kjetil Halvorsen*
> >>>>> Chief Architect, Enterprise Integration
> >>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> >>>>> www.cognite.com | LIBERATE YOUR DATA™
> >>>>>
> >>>>>
> >>>
> >>> --
> >>>
> >>> *Kjetil Halvorsen*
> >>> Chief Architect, Enterprise Integration
> >>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> >>> www.cognite.com | LIBERATE YOUR DATA™
> >>>
> >>>
> >
> > --
> >
> > *Kjetil Halvorsen*
> > Chief Architect, Enterprise Integration
> > +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> > www.cognite.com | LIBERATE YOUR DATA™
> >
> >
> 
> -- 
> 
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
> 

+boyuanz@google.com

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Kjetil Halvorsen <kj...@cognite.com>.
Another update on this issue. I observe the same with bounded SDFs when
running in streaming mode. The general pipeline is [unbounded watcher, sdf]
-> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...

This also fails with the conflicting bucketing function error message. When
I remove the File.IO side input, the pipeline executes again (on Dataflow).

This one hurts us a bit because we use the File.IO side inputs to feed the
pipeline with config settings, so it is not trivial for us to remove it.

Best,
Kjetil

On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen <
kjetil.halvorsen@cognite.com> wrote:

> Perfect, thanks.
>
> I did some more testing, and it seems to narrow down to using FileIO.match
> -> readMatches -> to drive the upstream side input. I have attached a
> pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it
> will fail on Dataflow. I have not tested with 2.19 due to the blocker on
> Win Java.
>
> Please let me know if there is anything else I can do to help. I am very
> motivated to get this sorted out as we have lots of scenarios lined up.
>
> Best,
> Kjetil
>
> On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote:
>
>> That doesn't sound like it should be an issue and sounds like a bug in
>> Dataflow.
>>
>> If you're willing to share a minimal pipeline that gets this error. I can
>> get an issue opened up internally and assigned.
>>
>> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
>> kjetil.halvorsen@cognite.com> wrote:
>>
>>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of
>>> it, and it was of great help to interpret the error message from Dataflow.
>>>
>>> I found the error/bug in an upstream DoFn (execute before the SDF) with
>>> a side-input. Both the main input to the DoFn and the side input were
>>> bounded and using the default window and trigger (i.e. no windowing nor
>>> trigger specified in the job).
>>>
>>> When I moved that particular DoFn to be downstream to the SDF, the job
>>> started working.
>>>
>>> Maybe this is by design and I just hadn't registered that one cannot
>>> have a side-input DoFn upstream to an unbound SDF?
>>>
>>> In any case, thank you for the patience and willingness to help out.
>>>
>>> Best,
>>> Kjetil
>>>
>>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>
>>>>> Thanks for looking into this. I have been distracted on a separate
>>>>> (Beam) feature the past week so it took me some time to make progress. In
>>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
>>>>> Unfortunately with the same results: "step 22 has conflicting bucketing
>>>>> functions". More info inline below.
>>>>>
>>>>> Best,
>>>>> Kjetil
>>>>>
>>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> The bucketing "error" is likely related to what windowing
>>>>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>>>>>> empty pipeline possibly followed by a ParDo to log what records you are
>>>>>> seeing?
>>>>>>
>>>>>
>>>>> I slimmed the pipeline down to just being this sdf plus a MapElements
>>>>> that log the records. No windowing definitions nor any trigger definitions.
>>>>> The results were exactly the same: The job fails somewhere in the
>>>>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>>>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>>>>> conflicting bucketing functions".
>>>>>
>>>>
>>>> The error is because the windowing fn on the GBKs are different. You
>>>> can dump and inspect the JSON job description using the flag
>>>> --dataflowJobFile=/path/to/dump/file.json
>>>>
>>>>
>>>>>
>>>>>>
>>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>>>
>>>>>>> Thank's for the willingness to help out. The general context is that
>>>>>>> we are developing a set of new Beam based connectors/readers.
>>>>>>>
>>>>>>> I had hoped that SDF was ready for use with Dataflow--just because
>>>>>>> the interface is nice to work with. In general, would you recommend that we
>>>>>>> look at the legacy source APIs for building our connectors/readers?
>>>>>>>
>>>>>>
>>>>>> I would not. A few contributors have been making rapid progress over
>>>>>> the past few months to finish SDFs with Python done from an API standpoint
>>>>>> (there is some additional integration/scaling testing going on), Java is
>>>>>> missing progress reporting from the API and watermark estimation but I was
>>>>>> hoping to finish those API pieces this month and Go has started on the
>>>>>> batch API implementation.
>>>>>>
>>>>>
>>>>> Great, I am happy to hear that. Would love to just keep investing in
>>>>> the SDF implementations we started.
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize
>>>>>>> for the bad formatting--still learning the grips of communicating code via
>>>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>>>>>> tracker.
>>>>>>>
>>>>>>> I am happy to receive any pointers on improvements, changes,
>>>>>>> debugging paths.
>>>>>>>
>>>>>>> /**
>>>>>>>  * This function generates an unbounded stream of source queries.
>>>>>>>  */
>>>>>>> @DoFn.UnboundedPerElement
>>>>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>>>>> DoFn<RequestParameters, RequestParameters> {
>>>>>>>
>>>>>>>     @Setup
>>>>>>>     public void setup() {
>>>>>>>         validate();
>>>>>>>     }
>>>>>>>
>>>>>>>     @ProcessElement
>>>>>>>     public ProcessContinuation processElement(@Element Element
>>>>>>> inputElement,
>>>>>>>
>>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>>
>>>>>>> OutputReceiver<outputElement> out,
>>>>>>>                                               ProcessContext
>>>>>>> context) throws Exception {
>>>>>>>
>>>>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>>>>
>>>>>>>         while (startRange < (System.currentTimeMillis() -
>>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>>>             // Set the query's max end to current time - offset.
>>>>>>>             if (endRange > (System.currentTimeMillis() -
>>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>>>                 endRange = (System.currentTimeMillis() -
>>>>>>> readerConfig.getPollOffset().get().toMillis());
>>>>>>>             }
>>>>>>>
>>>>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>>>>
>>>>>>
>>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>>>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>>>>>> ..., [start+pollsize*N, end)?
>>>>>>
>>>>>> Also, if start is significantly smaller then current time, you could
>>>>>> implement the @SplitRestriction method.
>>>>>>
>>>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>>>>
>>>>>
>>>>> Good points! My original thinking was to have a second (bounded) SDF
>>>>> that splits the ranges and executes the actual reads from the source.
>>>>> Similar to the "watch + read" pattern. That way I can reuse most of the
>>>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal
>>>>> approach?
>>>>>
>>>>
>>>> Following a watch + read pattern works well.
>>>>
>>>> And claiming the entire range when writing a generator function makes
>>>> sense.
>>>>
>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>>>
>>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>>>>> endRange),
>>>>>>>
>>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>>>
>>>>>>>                 // Update the start and end range for the next
>>>>>>> iteration
>>>>>>>                 startRange = endRange;
>>>>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>>>>             } else {
>>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>>>>> checkpointing or splitting.");
>>>>>>>                 return ProcessContinuation.stop();
>>>>>>>             }
>>>>>>>
>>>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>>>>> time range. Will stop the reader.");
>>>>>>>                 return ProcessContinuation.stop();
>>>>>>>             }
>>>>>>>
>>>>>>>             return
>>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>>>
>>>>>>> readerConfig.getPollInterval().get().toMillis()));
>>>>>>>         }
>>>>>>>
>>>>>>>         return
>>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>>>>     }
>>>>>>>
>>>>>>>     private OutputElement buildOutputElement(Element element,
>>>>>>>                                                      long start,
>>>>>>>                                                      long end) {
>>>>>>>         return outputElement
>>>>>>>                 .withParameter(START_KEY, start)
>>>>>>>                 .withParameter(END_KEY, end);
>>>>>>>     }
>>>>>>>
>>>>>>>     @GetInitialRestriction
>>>>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>>>>> Exception {
>>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> SplittableDoFn has experimental support within Dataflow so the way
>>>>>>>> you may be using it could be correct but unsupported.
>>>>>>>>
>>>>>>>> Can you provide snippets/details of your splittable dofn
>>>>>>>> implementation?
>>>>>>>>
>>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>>>>
>>>>>>>>> This happens at the very startup of the job execution, and I am
>>>>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>>>>> origin of the conflict is. The same job runs just fine in the DirectRunner.
>>>>>>>>>
>>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
>>>>>>>>> with both a windowing transform and without a windowing transform--both
>>>>>>>>> fail with the same result on Dataflow.
>>>>>>>>>
>>>>>>>>> This is my first foray into splittable DoFn territory so I am sure
>>>>>>>>> I have just made some basic missteps.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kjetil
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> *Kjetil Halvorsen*
>>>>>>>>> Chief Architect, Enterprise Integration
>>>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *Kjetil Halvorsen*
>>>>>>> Chief Architect, Enterprise Integration
>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Kjetil Halvorsen*
>>>>> Chief Architect, Enterprise Integration
>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | kjetil.halvorsen@cognite.com
www.cognite.com | LIBERATE YOUR DATA™

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Kjetil Halvorsen <kj...@cognite.com>.
Perfect, thanks.

I did some more testing, and it seems to narrow down to using FileIO.match
-> readMatches -> to drive the upstream side input. I have attached a
pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it
will fail on Dataflow. I have not tested with 2.19 due to the blocker on
Win Java.

Please let me know if there is anything else I can do to help. I am very
motivated to get this sorted out as we have lots of scenarios lined up.

Best,
Kjetil

On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote:

> That doesn't sound like it should be an issue and sounds like a bug in
> Dataflow.
>
> If you're willing to share a minimal pipeline that gets this error. I can
> get an issue opened up internally and assigned.
>
> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
> kjetil.halvorsen@cognite.com> wrote:
>
>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of
>> it, and it was of great help to interpret the error message from Dataflow.
>>
>> I found the error/bug in an upstream DoFn (execute before the SDF) with a
>> side-input. Both the main input to the DoFn and the side input were bounded
>> and using the default window and trigger (i.e. no windowing nor trigger
>> specified in the job).
>>
>> When I moved that particular DoFn to be downstream to the SDF, the job
>> started working.
>>
>> Maybe this is by design and I just hadn't registered that one cannot have
>> a side-input DoFn upstream to an unbound SDF?
>>
>> In any case, thank you for the patience and willingness to help out.
>>
>> Best,
>> Kjetil
>>
>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
>>> kjetil.halvorsen@cognite.com> wrote:
>>>
>>>> Thanks for looking into this. I have been distracted on a separate
>>>> (Beam) feature the past week so it took me some time to make progress. In
>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
>>>> Unfortunately with the same results: "step 22 has conflicting bucketing
>>>> functions". More info inline below.
>>>>
>>>> Best,
>>>> Kjetil
>>>>
>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> The bucketing "error" is likely related to what windowing
>>>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>>>>> empty pipeline possibly followed by a ParDo to log what records you are
>>>>> seeing?
>>>>>
>>>>
>>>> I slimmed the pipeline down to just being this sdf plus a MapElements
>>>> that log the records. No windowing definitions nor any trigger definitions.
>>>> The results were exactly the same: The job fails somewhere in the
>>>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>>>> conflicting bucketing functions".
>>>>
>>>
>>> The error is because the windowing fn on the GBKs are different. You can
>>> dump and inspect the JSON job description using the flag
>>> --dataflowJobFile=/path/to/dump/file.json
>>>
>>>
>>>>
>>>>>
>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>>
>>>>>> Thank's for the willingness to help out. The general context is that
>>>>>> we are developing a set of new Beam based connectors/readers.
>>>>>>
>>>>>> I had hoped that SDF was ready for use with Dataflow--just because
>>>>>> the interface is nice to work with. In general, would you recommend that we
>>>>>> look at the legacy source APIs for building our connectors/readers?
>>>>>>
>>>>>
>>>>> I would not. A few contributors have been making rapid progress over
>>>>> the past few months to finish SDFs with Python done from an API standpoint
>>>>> (there is some additional integration/scaling testing going on), Java is
>>>>> missing progress reporting from the API and watermark estimation but I was
>>>>> hoping to finish those API pieces this month and Go has started on the
>>>>> batch API implementation.
>>>>>
>>>>
>>>> Great, I am happy to hear that. Would love to just keep investing in
>>>> the SDF implementations we started.
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>>>>> the bad formatting--still learning the grips of communicating code via
>>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>>>>> tracker.
>>>>>>
>>>>>> I am happy to receive any pointers on improvements, changes,
>>>>>> debugging paths.
>>>>>>
>>>>>> /**
>>>>>>  * This function generates an unbounded stream of source queries.
>>>>>>  */
>>>>>> @DoFn.UnboundedPerElement
>>>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>>>> DoFn<RequestParameters, RequestParameters> {
>>>>>>
>>>>>>     @Setup
>>>>>>     public void setup() {
>>>>>>         validate();
>>>>>>     }
>>>>>>
>>>>>>     @ProcessElement
>>>>>>     public ProcessContinuation processElement(@Element Element
>>>>>> inputElement,
>>>>>>
>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>
>>>>>> OutputReceiver<outputElement> out,
>>>>>>                                               ProcessContext context)
>>>>>> throws Exception {
>>>>>>
>>>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>>>
>>>>>>         while (startRange < (System.currentTimeMillis() -
>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>>             // Set the query's max end to current time - offset.
>>>>>>             if (endRange > (System.currentTimeMillis() -
>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>>                 endRange = (System.currentTimeMillis() -
>>>>>> readerConfig.getPollOffset().get().toMillis());
>>>>>>             }
>>>>>>
>>>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>>>
>>>>>
>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>>>>> ..., [start+pollsize*N, end)?
>>>>>
>>>>> Also, if start is significantly smaller then current time, you could
>>>>> implement the @SplitRestriction method.
>>>>>
>>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>>>
>>>>
>>>> Good points! My original thinking was to have a second (bounded) SDF
>>>> that splits the ranges and executes the actual reads from the source.
>>>> Similar to the "watch + read" pattern. That way I can reuse most of the
>>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal
>>>> approach?
>>>>
>>>
>>> Following a watch + read pattern works well.
>>>
>>> And claiming the entire range when writing a generator function makes
>>> sense.
>>>
>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>>
>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>>>> endRange),
>>>>>>
>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>>
>>>>>>                 // Update the start and end range for the next
>>>>>> iteration
>>>>>>                 startRange = endRange;
>>>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>>>             } else {
>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>>>> checkpointing or splitting.");
>>>>>>                 return ProcessContinuation.stop();
>>>>>>             }
>>>>>>
>>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>>>> time range. Will stop the reader.");
>>>>>>                 return ProcessContinuation.stop();
>>>>>>             }
>>>>>>
>>>>>>             return
>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>>>>         }
>>>>>>
>>>>>>         return
>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>>>     }
>>>>>>
>>>>>>     private OutputElement buildOutputElement(Element element,
>>>>>>                                                      long start,
>>>>>>                                                      long end) {
>>>>>>         return outputElement
>>>>>>                 .withParameter(START_KEY, start)
>>>>>>                 .withParameter(END_KEY, end);
>>>>>>     }
>>>>>>
>>>>>>     @GetInitialRestriction
>>>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>>>> Exception {
>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> SplittableDoFn has experimental support within Dataflow so the way
>>>>>>> you may be using it could be correct but unsupported.
>>>>>>>
>>>>>>> Can you provide snippets/details of your splittable dofn
>>>>>>> implementation?
>>>>>>>
>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>>>
>>>>>>>> This happens at the very startup of the job execution, and I am
>>>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>>>> origin of the conflict is. The same job runs just fine in the DirectRunner.
>>>>>>>>
>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
>>>>>>>> with both a windowing transform and without a windowing transform--both
>>>>>>>> fail with the same result on Dataflow.
>>>>>>>>
>>>>>>>> This is my first foray into splittable DoFn territory so I am sure
>>>>>>>> I have just made some basic missteps.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kjetil
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *Kjetil Halvorsen*
>>>>>>>> Chief Architect, Enterprise Integration
>>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Kjetil Halvorsen*
>>>>>> Chief Architect, Enterprise Integration
>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *Kjetil Halvorsen*
>>>> Chief Architect, Enterprise Integration
>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>
>>>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | kjetil.halvorsen@cognite.com
www.cognite.com | LIBERATE YOUR DATA™

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Luke Cwik <lc...@google.com>.
That doesn't sound like it should be an issue and sounds like a bug in
Dataflow.

If you're willing to share a minimal pipeline that gets this error. I can
get an issue opened up internally and assigned.

On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
kjetil.halvorsen@cognite.com> wrote:

> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of it,
> and it was of great help to interpret the error message from Dataflow.
>
> I found the error/bug in an upstream DoFn (execute before the SDF) with a
> side-input. Both the main input to the DoFn and the side input were bounded
> and using the default window and trigger (i.e. no windowing nor trigger
> specified in the job).
>
> When I moved that particular DoFn to be downstream to the SDF, the job
> started working.
>
> Maybe this is by design and I just hadn't registered that one cannot have
> a side-input DoFn upstream to an unbound SDF?
>
> In any case, thank you for the patience and willingness to help out.
>
> Best,
> Kjetil
>
> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
>> kjetil.halvorsen@cognite.com> wrote:
>>
>>> Thanks for looking into this. I have been distracted on a separate
>>> (Beam) feature the past week so it took me some time to make progress. In
>>> any case, I have run new tests on Dataflow with a minimal pipeline.
>>> Unfortunately with the same results: "step 22 has conflicting bucketing
>>> functions". More info inline below.
>>>
>>> Best,
>>> Kjetil
>>>
>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> The bucketing "error" is likely related to what windowing
>>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>>>> empty pipeline possibly followed by a ParDo to log what records you are
>>>> seeing?
>>>>
>>>
>>> I slimmed the pipeline down to just being this sdf plus a MapElements
>>> that log the records. No windowing definitions nor any trigger definitions.
>>> The results were exactly the same: The job fails somewhere in the
>>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>>> conflicting bucketing functions".
>>>
>>
>> The error is because the windowing fn on the GBKs are different. You can
>> dump and inspect the JSON job description using the flag
>> --dataflowJobFile=/path/to/dump/file.json
>>
>>
>>>
>>>>
>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>
>>>>> Thank's for the willingness to help out. The general context is that
>>>>> we are developing a set of new Beam based connectors/readers.
>>>>>
>>>>> I had hoped that SDF was ready for use with Dataflow--just because the
>>>>> interface is nice to work with. In general, would you recommend that we
>>>>> look at the legacy source APIs for building our connectors/readers?
>>>>>
>>>>
>>>> I would not. A few contributors have been making rapid progress over
>>>> the past few months to finish SDFs with Python done from an API standpoint
>>>> (there is some additional integration/scaling testing going on), Java is
>>>> missing progress reporting from the API and watermark estimation but I was
>>>> hoping to finish those API pieces this month and Go has started on the
>>>> batch API implementation.
>>>>
>>>
>>> Great, I am happy to hear that. Would love to just keep investing in the
>>> SDF implementations we started.
>>>
>>>>
>>>>
>>>>>
>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>>>> the bad formatting--still learning the grips of communicating code via
>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>>>> tracker.
>>>>>
>>>>> I am happy to receive any pointers on improvements, changes, debugging
>>>>> paths.
>>>>>
>>>>> /**
>>>>>  * This function generates an unbounded stream of source queries.
>>>>>  */
>>>>> @DoFn.UnboundedPerElement
>>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>>> DoFn<RequestParameters, RequestParameters> {
>>>>>
>>>>>     @Setup
>>>>>     public void setup() {
>>>>>         validate();
>>>>>     }
>>>>>
>>>>>     @ProcessElement
>>>>>     public ProcessContinuation processElement(@Element Element
>>>>> inputElement,
>>>>>
>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>
>>>>> OutputReceiver<outputElement> out,
>>>>>                                               ProcessContext context)
>>>>> throws Exception {
>>>>>
>>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>>
>>>>>         while (startRange < (System.currentTimeMillis() -
>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>             // Set the query's max end to current time - offset.
>>>>>             if (endRange > (System.currentTimeMillis() -
>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>                 endRange = (System.currentTimeMillis() -
>>>>> readerConfig.getPollOffset().get().toMillis());
>>>>>             }
>>>>>
>>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>>
>>>>
>>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>>>> ..., [start+pollsize*N, end)?
>>>>
>>>> Also, if start is significantly smaller then current time, you could
>>>> implement the @SplitRestriction method.
>>>>
>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>>
>>>
>>> Good points! My original thinking was to have a second (bounded) SDF
>>> that splits the ranges and executes the actual reads from the source.
>>> Similar to the "watch + read" pattern. That way I can reuse most of the
>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal
>>> approach?
>>>
>>
>> Following a watch + read pattern works well.
>>
>> And claiming the entire range when writing a generator function makes
>> sense.
>>
>>
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>
>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>>> endRange),
>>>>>
>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>
>>>>>                 // Update the start and end range for the next
>>>>> iteration
>>>>>                 startRange = endRange;
>>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>>             } else {
>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>>> checkpointing or splitting.");
>>>>>                 return ProcessContinuation.stop();
>>>>>             }
>>>>>
>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>>> time range. Will stop the reader.");
>>>>>                 return ProcessContinuation.stop();
>>>>>             }
>>>>>
>>>>>             return
>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>>>         }
>>>>>
>>>>>         return
>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>>     }
>>>>>
>>>>>     private OutputElement buildOutputElement(Element element,
>>>>>                                                      long start,
>>>>>                                                      long end) {
>>>>>         return outputElement
>>>>>                 .withParameter(START_KEY, start)
>>>>>                 .withParameter(END_KEY, end);
>>>>>     }
>>>>>
>>>>>     @GetInitialRestriction
>>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>>> Exception {
>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> SplittableDoFn has experimental support within Dataflow so the way
>>>>>> you may be using it could be correct but unsupported.
>>>>>>
>>>>>> Can you provide snippets/details of your splittable dofn
>>>>>> implementation?
>>>>>>
>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>>
>>>>>>> This happens at the very startup of the job execution, and I am
>>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>>> origin of the conflict is. The same job runs just fine in the DirectRunner.
>>>>>>>
>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
>>>>>>> with both a windowing transform and without a windowing transform--both
>>>>>>> fail with the same result on Dataflow.
>>>>>>>
>>>>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>>>>> have just made some basic missteps.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kjetil
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *Kjetil Halvorsen*
>>>>>>> Chief Architect, Enterprise Integration
>>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Kjetil Halvorsen*
>>>>> Chief Architect, Enterprise Integration
>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Kjetil Halvorsen <kj...@cognite.com>.
Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of it,
and it was of great help to interpret the error message from Dataflow.

I found the error/bug in an upstream DoFn (execute before the SDF) with a
side-input. Both the main input to the DoFn and the side input were bounded
and using the default window and trigger (i.e. no windowing nor trigger
specified in the job).

When I moved that particular DoFn to be downstream to the SDF, the job
started working.

Maybe this is by design and I just hadn't registered that one cannot have a
side-input DoFn upstream to an unbound SDF?

In any case, thank you for the patience and willingness to help out.

Best,
Kjetil

On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
> kjetil.halvorsen@cognite.com> wrote:
>
>> Thanks for looking into this. I have been distracted on a separate (Beam)
>> feature the past week so it took me some time to make progress. In any
>> case, I have run new tests on Dataflow with a minimal pipeline.
>> Unfortunately with the same results: "step 22 has conflicting bucketing
>> functions". More info inline below.
>>
>> Best,
>> Kjetil
>>
>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> The bucketing "error" is likely related to what windowing
>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>>> empty pipeline possibly followed by a ParDo to log what records you are
>>> seeing?
>>>
>>
>> I slimmed the pipeline down to just being this sdf plus a MapElements
>> that log the records. No windowing definitions nor any trigger definitions.
>> The results were exactly the same: The job fails somewhere in the
>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>> conflicting bucketing functions".
>>
>
> The error is because the windowing fn on the GBKs are different. You can
> dump and inspect the JSON job description using the flag
> --dataflowJobFile=/path/to/dump/file.json
>
>
>>
>>>
>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>> kjetil.halvorsen@cognite.com> wrote:
>>>
>>>> Thank's for the willingness to help out. The general context is that we
>>>> are developing a set of new Beam based connectors/readers.
>>>>
>>>> I had hoped that SDF was ready for use with Dataflow--just because the
>>>> interface is nice to work with. In general, would you recommend that we
>>>> look at the legacy source APIs for building our connectors/readers?
>>>>
>>>
>>> I would not. A few contributors have been making rapid progress over the
>>> past few months to finish SDFs with Python done from an API standpoint
>>> (there is some additional integration/scaling testing going on), Java is
>>> missing progress reporting from the API and watermark estimation but I was
>>> hoping to finish those API pieces this month and Go has started on the
>>> batch API implementation.
>>>
>>
>> Great, I am happy to hear that. Would love to just keep investing in the
>> SDF implementations we started.
>>
>>>
>>>
>>>>
>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>>> the bad formatting--still learning the grips of communicating code via
>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>>> tracker.
>>>>
>>>> I am happy to receive any pointers on improvements, changes, debugging
>>>> paths.
>>>>
>>>> /**
>>>>  * This function generates an unbounded stream of source queries.
>>>>  */
>>>> @DoFn.UnboundedPerElement
>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>> DoFn<RequestParameters, RequestParameters> {
>>>>
>>>>     @Setup
>>>>     public void setup() {
>>>>         validate();
>>>>     }
>>>>
>>>>     @ProcessElement
>>>>     public ProcessContinuation processElement(@Element Element
>>>> inputElement,
>>>>
>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>
>>>> OutputReceiver<outputElement> out,
>>>>                                               ProcessContext context)
>>>> throws Exception {
>>>>
>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>
>>>>         while (startRange < (System.currentTimeMillis() -
>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>             // Set the query's max end to current time - offset.
>>>>             if (endRange > (System.currentTimeMillis() -
>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>                 endRange = (System.currentTimeMillis() -
>>>> readerConfig.getPollOffset().get().toMillis());
>>>>             }
>>>>
>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>
>>>
>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>>> ..., [start+pollsize*N, end)?
>>>
>>> Also, if start is significantly smaller then current time, you could
>>> implement the @SplitRestriction method.
>>>
>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>
>>
>> Good points! My original thinking was to have a second (bounded) SDF that
>> splits the ranges and executes the actual reads from the source. Similar to
>> the "watch + read" pattern. That way I can reuse most of the code between
>> the unbounded and bounded scenario. Maybe that's a sub-optimal approach?
>>
>
> Following a watch + read pattern works well.
>
> And claiming the entire range when writing a generator function makes
> sense.
>
>
>>
>>>
>>>
>>>>
>>>>
>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>
>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>> endRange),
>>>>                         org.joda.time.Instant.ofEpochMilli(startRange));
>>>>
>>>>                 // Update the start and end range for the next iteration
>>>>                 startRange = endRange;
>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>             } else {
>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>> checkpointing or splitting.");
>>>>                 return ProcessContinuation.stop();
>>>>             }
>>>>
>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>> time range. Will stop the reader.");
>>>>                 return ProcessContinuation.stop();
>>>>             }
>>>>
>>>>             return
>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>>         }
>>>>
>>>>         return
>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>     }
>>>>
>>>>     private OutputElement buildOutputElement(Element element,
>>>>                                                      long start,
>>>>                                                      long end) {
>>>>         return outputElement
>>>>                 .withParameter(START_KEY, start)
>>>>                 .withParameter(END_KEY, end);
>>>>     }
>>>>
>>>>     @GetInitialRestriction
>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>> Exception {
>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>     }
>>>> }
>>>>
>>>>
>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> SplittableDoFn has experimental support within Dataflow so the way you
>>>>> may be using it could be correct but unsupported.
>>>>>
>>>>> Can you provide snippets/details of your splittable dofn
>>>>> implementation?
>>>>>
>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>
>>>>>> This happens at the very startup of the job execution, and I am
>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>> origin of the conflict is. The same job runs just fine in the DirectRunner.
>>>>>>
>>>>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>>>>> both a windowing transform and without a windowing transform--both fail
>>>>>> with the same result on Dataflow.
>>>>>>
>>>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>>>> have just made some basic missteps.
>>>>>>
>>>>>> Cheers,
>>>>>> Kjetil
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Kjetil Halvorsen*
>>>>>> Chief Architect, Enterprise Integration
>>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *Kjetil Halvorsen*
>>>> Chief Architect, Enterprise Integration
>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>
>>>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | kjetil.halvorsen@cognite.com
www.cognite.com | LIBERATE YOUR DATA™

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Luke Cwik <lc...@google.com>.
On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
kjetil.halvorsen@cognite.com> wrote:

> Thanks for looking into this. I have been distracted on a separate (Beam)
> feature the past week so it took me some time to make progress. In any
> case, I have run new tests on Dataflow with a minimal pipeline.
> Unfortunately with the same results: "step 22 has conflicting bucketing
> functions". More info inline below.
>
> Best,
> Kjetil
>
> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>
>> The bucketing "error" is likely related to what windowing
>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>> empty pipeline possibly followed by a ParDo to log what records you are
>> seeing?
>>
>
> I slimmed the pipeline down to just being this sdf plus a MapElements that
> log the records. No windowing definitions nor any trigger definitions. The
> results were exactly the same: The job fails somewhere in the
> startup/verification phase in Dataflow (i.e. after compile/upload from the
> client, but as a part of the Dataflow startup procedure). "Step 22 has
> conflicting bucketing functions".
>

The error is because the windowing fn on the GBKs are different. You can
dump and inspect the JSON job description using the flag
--dataflowJobFile=/path/to/dump/file.json


>
>>
>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>> kjetil.halvorsen@cognite.com> wrote:
>>
>>> Thank's for the willingness to help out. The general context is that we
>>> are developing a set of new Beam based connectors/readers.
>>>
>>> I had hoped that SDF was ready for use with Dataflow--just because the
>>> interface is nice to work with. In general, would you recommend that we
>>> look at the legacy source APIs for building our connectors/readers?
>>>
>>
>> I would not. A few contributors have been making rapid progress over the
>> past few months to finish SDFs with Python done from an API standpoint
>> (there is some additional integration/scaling testing going on), Java is
>> missing progress reporting from the API and watermark estimation but I was
>> hoping to finish those API pieces this month and Go has started on the
>> batch API implementation.
>>
>
> Great, I am happy to hear that. Would love to just keep investing in the
> SDF implementations we started.
>
>>
>>
>>>
>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>> the bad formatting--still learning the grips of communicating code via
>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>> tracker.
>>>
>>> I am happy to receive any pointers on improvements, changes, debugging
>>> paths.
>>>
>>> /**
>>>  * This function generates an unbounded stream of source queries.
>>>  */
>>> @DoFn.UnboundedPerElement
>>> public class GenerateTsPointRequestsUnboundFn extends
>>> DoFn<RequestParameters, RequestParameters> {
>>>
>>>     @Setup
>>>     public void setup() {
>>>         validate();
>>>     }
>>>
>>>     @ProcessElement
>>>     public ProcessContinuation processElement(@Element Element
>>> inputElement,
>>>
>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>
>>> OutputReceiver<outputElement> out,
>>>                                               ProcessContext context)
>>> throws Exception {
>>>
>>>         long startRange = tracker.currentRestriction().getFrom();
>>>         long endRange = tracker.currentRestriction().getTo();
>>>
>>>         while (startRange < (System.currentTimeMillis() -
>>> readerConfig.getPollOffset().get().toMillis())) {
>>>             // Set the query's max end to current time - offset.
>>>             if (endRange > (System.currentTimeMillis() -
>>> readerConfig.getPollOffset().get().toMillis())) {
>>>                 endRange = (System.currentTimeMillis() -
>>> readerConfig.getPollOffset().get().toMillis());
>>>             }
>>>
>>>             if (tracker.tryClaim(endRange - 1)) {
>>>
>>
>> Why do you try and claim to the endRange here? Shouldn't you claim
>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>> ..., [start+pollsize*N, end)?
>>
>> Also, if start is significantly smaller then current time, you could
>> implement the @SplitRestriction method.
>>
>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>
>
> Good points! My original thinking was to have a second (bounded) SDF that
> splits the ranges and executes the actual reads from the source. Similar to
> the "watch + read" pattern. That way I can reuse most of the code between
> the unbounded and bounded scenario. Maybe that's a sub-optimal approach?
>

Following a watch + read pattern works well.

And claiming the entire range when writing a generator function makes sense.


>
>>
>>
>>>
>>>
>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>                 out.outputWithTimestamp(buildOutputElement(inputElement,
>>> startRange, endRange),
>>>                         org.joda.time.Instant.ofEpochMilli(startRange));
>>>
>>>                 // Update the start and end range for the next iteration
>>>                 startRange = endRange;
>>>                 endRange = tracker.currentRestriction().getTo();
>>>             } else {
>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>> checkpointing or splitting.");
>>>                 return ProcessContinuation.stop();
>>>             }
>>>
>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>> time range. Will stop the reader.");
>>>                 return ProcessContinuation.stop();
>>>             }
>>>
>>>             return
>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>         }
>>>
>>>         return
>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>     }
>>>
>>>     private OutputElement buildOutputElement(Element element,
>>>                                                      long start,
>>>                                                      long end) {
>>>         return outputElement
>>>                 .withParameter(START_KEY, start)
>>>                 .withParameter(END_KEY, end);
>>>     }
>>>
>>>     @GetInitialRestriction
>>>     public OffsetRange getInitialRestriction(Element element) throws
>>> Exception {
>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>     }
>>> }
>>>
>>>
>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> SplittableDoFn has experimental support within Dataflow so the way you
>>>> may be using it could be correct but unsupported.
>>>>
>>>> Can you provide snippets/details of your splittable dofn implementation?
>>>>
>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>> kjetil.halvorsen@cognite.com> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>
>>>>> This happens at the very startup of the job execution, and I am unable
>>>>> to find any pointer as to where in the code/job definition the origin of
>>>>> the conflict is. The same job runs just fine in the DirectRunner.
>>>>>
>>>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>>>> both a windowing transform and without a windowing transform--both fail
>>>>> with the same result on Dataflow.
>>>>>
>>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>>> have just made some basic missteps.
>>>>>
>>>>> Cheers,
>>>>> Kjetil
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Kjetil Halvorsen*
>>>>> Chief Architect, Enterprise Integration
>>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Kjetil Halvorsen <kj...@cognite.com>.
Thanks for looking into this. I have been distracted on a separate (Beam)
feature the past week so it took me some time to make progress. In any
case, I have run new tests on Dataflow with a minimal pipeline.
Unfortunately with the same results: "step 22 has conflicting bucketing
functions". More info inline below.

Best,
Kjetil

On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:

> The bucketing "error" is likely related to what windowing
> strategy/pipeline shape you have. Have you tried running your SDF inside an
> empty pipeline possibly followed by a ParDo to log what records you are
> seeing?
>

I slimmed the pipeline down to just being this sdf plus a MapElements that
log the records. No windowing definitions nor any trigger definitions. The
results were exactly the same: The job fails somewhere in the
startup/verification phase in Dataflow (i.e. after compile/upload from the
client, but as a part of the Dataflow startup procedure). "Step 22 has
conflicting bucketing functions".


>
> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
> kjetil.halvorsen@cognite.com> wrote:
>
>> Thank's for the willingness to help out. The general context is that we
>> are developing a set of new Beam based connectors/readers.
>>
>> I had hoped that SDF was ready for use with Dataflow--just because the
>> interface is nice to work with. In general, would you recommend that we
>> look at the legacy source APIs for building our connectors/readers?
>>
>
> I would not. A few contributors have been making rapid progress over the
> past few months to finish SDFs with Python done from an API standpoint
> (there is some additional integration/scaling testing going on), Java is
> missing progress reporting from the API and watermark estimation but I was
> hoping to finish those API pieces this month and Go has started on the
> batch API implementation.
>

Great, I am happy to hear that. Would love to just keep investing in the
SDF implementations we started.

>
>
>>
>> Anyways, I have pasted the skeleton of the SDF below (I apologize for the
>> bad formatting--still learning the grips of communicating code via e-mail).
>> . We have used the overall pattern from the file watcher. I.e. the SDF
>> creates "poll requests" at regular intervals which a downstream parDo
>> executes. The SDF uses the built-in OffserRange as the basis for the range
>> tracker.
>>
>> I am happy to receive any pointers on improvements, changes, debugging
>> paths.
>>
>> /**
>>  * This function generates an unbounded stream of source queries.
>>  */
>> @DoFn.UnboundedPerElement
>> public class GenerateTsPointRequestsUnboundFn extends
>> DoFn<RequestParameters, RequestParameters> {
>>
>>     @Setup
>>     public void setup() {
>>         validate();
>>     }
>>
>>     @ProcessElement
>>     public ProcessContinuation processElement(@Element Element
>> inputElement,
>>
>> RestrictionTracker<OffsetRange, Long> tracker,
>>
>> OutputReceiver<outputElement> out,
>>                                               ProcessContext context)
>> throws Exception {
>>
>>         long startRange = tracker.currentRestriction().getFrom();
>>         long endRange = tracker.currentRestriction().getTo();
>>
>>         while (startRange < (System.currentTimeMillis() -
>> readerConfig.getPollOffset().get().toMillis())) {
>>             // Set the query's max end to current time - offset.
>>             if (endRange > (System.currentTimeMillis() -
>> readerConfig.getPollOffset().get().toMillis())) {
>>                 endRange = (System.currentTimeMillis() -
>> readerConfig.getPollOffset().get().toMillis());
>>             }
>>
>>             if (tracker.tryClaim(endRange - 1)) {
>>
>
> Why do you try and claim to the endRange here? Shouldn't you claim
> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
> ..., [start+pollsize*N, end)?
>
> Also, if start is significantly smaller then current time, you could
> implement the @SplitRestriction method.
>
> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>

Good points! My original thinking was to have a second (bounded) SDF that
splits the ranges and executes the actual reads from the source. Similar to
the "watch + read" pattern. That way I can reuse most of the code between
the unbounded and bounded scenario. Maybe that's a sub-optimal approach?

>
>
>
>>
>>
>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>                 out.outputWithTimestamp(buildOutputElement(inputElement,
>> startRange, endRange),
>>                         org.joda.time.Instant.ofEpochMilli(startRange));
>>
>>                 // Update the start and end range for the next iteration
>>                 startRange = endRange;
>>                 endRange = tracker.currentRestriction().getTo();
>>             } else {
>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>> checkpointing or splitting.");
>>                 return ProcessContinuation.stop();
>>             }
>>
>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>                 LOG.info(localLoggingPrefix + "Completed the request time
>> range. Will stop the reader.");
>>                 return ProcessContinuation.stop();
>>             }
>>
>>             return
>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>                     readerConfig.getPollInterval().get().toMillis()));
>>         }
>>
>>         return
>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>                 readerConfig.getPollInterval().get().toMillis()));
>>     }
>>
>>     private OutputElement buildOutputElement(Element element,
>>                                                      long start,
>>                                                      long end) {
>>         return outputElement
>>                 .withParameter(START_KEY, start)
>>                 .withParameter(END_KEY, end);
>>     }
>>
>>     @GetInitialRestriction
>>     public OffsetRange getInitialRestriction(Element element) throws
>> Exception {
>>         return new OffsetRange(startTimestamp, endTimestamp);
>>     }
>> }
>>
>>
>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> SplittableDoFn has experimental support within Dataflow so the way you
>>> may be using it could be correct but unsupported.
>>>
>>> Can you provide snippets/details of your splittable dofn implementation?
>>>
>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>> kjetil.halvorsen@cognite.com> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>
>>>> This happens at the very startup of the job execution, and I am unable
>>>> to find any pointer as to where in the code/job definition the origin of
>>>> the conflict is. The same job runs just fine in the DirectRunner.
>>>>
>>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>>> both a windowing transform and without a windowing transform--both fail
>>>> with the same result on Dataflow.
>>>>
>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>> have just made some basic missteps.
>>>>
>>>> Cheers,
>>>> Kjetil
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *Kjetil Halvorsen*
>>>> Chief Architect, Enterprise Integration
>>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>
>>>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | kjetil.halvorsen@cognite.com
www.cognite.com | LIBERATE YOUR DATA™

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Luke Cwik <lc...@google.com>.
The bucketing "error" is likely related to what windowing strategy/pipeline
shape you have. Have you tried running your SDF inside an empty pipeline
possibly followed by a ParDo to log what records you are seeing?

On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
kjetil.halvorsen@cognite.com> wrote:

> Thank's for the willingness to help out. The general context is that we
> are developing a set of new Beam based connectors/readers.
>
> I had hoped that SDF was ready for use with Dataflow--just because the
> interface is nice to work with. In general, would you recommend that we
> look at the legacy source APIs for building our connectors/readers?
>

I would not. A few contributors have been making rapid progress over the
past few months to finish SDFs with Python done from an API standpoint
(there is some additional integration/scaling testing going on), Java is
missing progress reporting from the API and watermark estimation but I was
hoping to finish those API pieces this month and Go has started on the
batch API implementation.


>
> Anyways, I have pasted the skeleton of the SDF below (I apologize for the
> bad formatting--still learning the grips of communicating code via e-mail).
> . We have used the overall pattern from the file watcher. I.e. the SDF
> creates "poll requests" at regular intervals which a downstream parDo
> executes. The SDF uses the built-in OffserRange as the basis for the range
> tracker.
>
> I am happy to receive any pointers on improvements, changes, debugging
> paths.
>
> /**
>  * This function generates an unbounded stream of source queries.
>  */
> @DoFn.UnboundedPerElement
> public class GenerateTsPointRequestsUnboundFn extends
> DoFn<RequestParameters, RequestParameters> {
>
>     @Setup
>     public void setup() {
>         validate();
>     }
>
>     @ProcessElement
>     public ProcessContinuation processElement(@Element Element
> inputElement,
>
> RestrictionTracker<OffsetRange, Long> tracker,
>
> OutputReceiver<outputElement> out,
>                                               ProcessContext context)
> throws Exception {
>
>         long startRange = tracker.currentRestriction().getFrom();
>         long endRange = tracker.currentRestriction().getTo();
>
>         while (startRange < (System.currentTimeMillis() -
> readerConfig.getPollOffset().get().toMillis())) {
>             // Set the query's max end to current time - offset.
>             if (endRange > (System.currentTimeMillis() -
> readerConfig.getPollOffset().get().toMillis())) {
>                 endRange = (System.currentTimeMillis() -
> readerConfig.getPollOffset().get().toMillis());
>             }
>
>             if (tracker.tryClaim(endRange - 1)) {
>

Why do you try and claim to the endRange here? Shouldn't you claim
subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
..., [start+pollsize*N, end)?

Also, if start is significantly smaller then current time, you could
implement the @SplitRestriction method.
https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990


>
>
> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>                 out.outputWithTimestamp(buildOutputElement(inputElement,
> startRange, endRange),
>                         org.joda.time.Instant.ofEpochMilli(startRange));
>
>                 // Update the start and end range for the next iteration
>                 startRange = endRange;
>                 endRange = tracker.currentRestriction().getTo();
>             } else {
>                 LOG.info(localLoggingPrefix + "Stopping work due to
> checkpointing or splitting.");
>                 return ProcessContinuation.stop();
>             }
>
>             if (startRange >= tracker.currentRestriction().getTo()) {
>                 LOG.info(localLoggingPrefix + "Completed the request time
> range. Will stop the reader.");
>                 return ProcessContinuation.stop();
>             }
>
>             return
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>                     readerConfig.getPollInterval().get().toMillis()));
>         }
>
>         return
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>                 readerConfig.getPollInterval().get().toMillis()));
>     }
>
>     private OutputElement buildOutputElement(Element element,
>                                                      long start,
>                                                      long end) {
>         return outputElement
>                 .withParameter(START_KEY, start)
>                 .withParameter(END_KEY, end);
>     }
>
>     @GetInitialRestriction
>     public OffsetRange getInitialRestriction(Element element) throws
> Exception {
>         return new OffsetRange(startTimestamp, endTimestamp);
>     }
> }
>
>
> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>
>> SplittableDoFn has experimental support within Dataflow so the way you
>> may be using it could be correct but unsupported.
>>
>> Can you provide snippets/details of your splittable dofn implementation?
>>
>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>> kjetil.halvorsen@cognite.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>
>>> This happens at the very startup of the job execution, and I am unable
>>> to find any pointer as to where in the code/job definition the origin of
>>> the conflict is. The same job runs just fine in the DirectRunner.
>>>
>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>> both a windowing transform and without a windowing transform--both fail
>>> with the same result on Dataflow.
>>>
>>> This is my first foray into splittable DoFn territory so I am sure I
>>> have just made some basic missteps.
>>>
>>> Cheers,
>>> Kjetil
>>>
>>>
>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Kjetil Halvorsen <kj...@cognite.com>.
Thank's for the willingness to help out. The general context is that we are
developing a set of new Beam based connectors/readers.

I had hoped that SDF was ready for use with Dataflow--just because the
interface is nice to work with. In general, would you recommend that we
look at the legacy source APIs for building our connectors/readers?

Anyways, I have pasted the skeleton of the SDF below (I apologize for the
bad formatting--still learning the grips of communicating code via e-mail).
. We have used the overall pattern from the file watcher. I.e. the SDF
creates "poll requests" at regular intervals which a downstream parDo
executes. The SDF uses the built-in OffserRange as the basis for the range
tracker.

I am happy to receive any pointers on improvements, changes, debugging
paths.

/**
 * This function generates an unbounded stream of source queries.
 */
@DoFn.UnboundedPerElement
public class GenerateTsPointRequestsUnboundFn extends
DoFn<RequestParameters, RequestParameters> {

    @Setup
    public void setup() {
        validate();
    }

    @ProcessElement
    public ProcessContinuation processElement(@Element Element inputElement,

RestrictionTracker<OffsetRange, Long> tracker,
                                              OutputReceiver<outputElement>
out,
                                              ProcessContext context)
throws Exception {

        long startRange = tracker.currentRestriction().getFrom();
        long endRange = tracker.currentRestriction().getTo();

        while (startRange < (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis())) {
            // Set the query's max end to current time - offset.
            if (endRange > (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis())) {
                endRange = (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis());
            }

            if (tracker.tryClaim(endRange - 1)) {


context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
                out.outputWithTimestamp(buildOutputElement(inputElement,
startRange, endRange),
                        org.joda.time.Instant.ofEpochMilli(startRange));

                // Update the start and end range for the next iteration
                startRange = endRange;
                endRange = tracker.currentRestriction().getTo();
            } else {
                LOG.info(localLoggingPrefix + "Stopping work due to
checkpointing or splitting.");
                return ProcessContinuation.stop();
            }

            if (startRange >= tracker.currentRestriction().getTo()) {
                LOG.info(localLoggingPrefix + "Completed the request time
range. Will stop the reader.");
                return ProcessContinuation.stop();
            }

            return
ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
                    readerConfig.getPollInterval().get().toMillis()));
        }

        return
ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
                readerConfig.getPollInterval().get().toMillis()));
    }

    private OutputElement buildOutputElement(Element element,
                                                     long start,
                                                     long end) {
        return outputElement
                .withParameter(START_KEY, start)
                .withParameter(END_KEY, end);
    }

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(Element element) throws
Exception {
        return new OffsetRange(startTimestamp, endTimestamp);
    }
}


On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:

> SplittableDoFn has experimental support within Dataflow so the way you may
> be using it could be correct but unsupported.
>
> Can you provide snippets/details of your splittable dofn implementation?
>
> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> kjetil.halvorsen@cognite.com> wrote:
>
>>
>> Hi,
>>
>> I am looking for pointers to a Dataflow runner error message: Workflow
>> failed. Causes: Step s22 has conflicting bucketing functions,
>>
>> This happens at the very startup of the job execution, and I am unable to
>> find any pointer as to where in the code/job definition the origin of the
>> conflict is. The same job runs just fine in the DirectRunner.
>>
>> The job contains a splittable DoFn (unbound) and I have tried it with
>> both a windowing transform and without a windowing transform--both fail
>> with the same result on Dataflow.
>>
>> This is my first foray into splittable DoFn territory so I am sure I have
>> just made some basic missteps.
>>
>> Cheers,
>> Kjetil
>>
>>
>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | kjetil.halvorsen@cognite.com
www.cognite.com | LIBERATE YOUR DATA™

Re: Splittable DoFn and Dataflow, "conflicting bucketing functions"

Posted by Luke Cwik <lc...@google.com>.
SplittableDoFn has experimental support within Dataflow so the way you may
be using it could be correct but unsupported.

Can you provide snippets/details of your splittable dofn implementation?

On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
kjetil.halvorsen@cognite.com> wrote:

>
> Hi,
>
> I am looking for pointers to a Dataflow runner error message: Workflow
> failed. Causes: Step s22 has conflicting bucketing functions,
>
> This happens at the very startup of the job execution, and I am unable to
> find any pointer as to where in the code/job definition the origin of the
> conflict is. The same job runs just fine in the DirectRunner.
>
> The job contains a splittable DoFn (unbound) and I have tried it with both
> a windowing transform and without a windowing transform--both fail with the
> same result on Dataflow.
>
> This is my first foray into splittable DoFn territory so I am sure I have
> just made some basic missteps.
>
> Cheers,
> Kjetil
>
>
>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvorsen@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>