You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jiayuan Ma <ji...@gmail.com> on 2018/04/10 22:13:09 UTC

How to implement @SplitRestriction for Splittable DoFn

Hi all,

I'm trying to use ReplicateFn mentioned in this
<https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up a
nested for loop. The use case is exactly the same as "*Counting friends in
common (cross join by key)*"  section. However, I have trouble to make it
work with beam 2.4.0 SDK.

I'm implementing @SplitRestriction as follows:

@SplitRestriction
public void splitRestriction(A element, OffsetRange range,
OutputReceiver<OffsetRange> out) {
  for (final OffsetRange p : range.split(1000, 10)) {
     out.output(p);
  }
}

Dataflow runner throws exception like this:

java.util.NoSuchElementException
com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)
com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97)
com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71)
com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61)
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

I also tried the following as suggested by the javadoc
<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html>
but
it has errors during pipeline construction.

@SplitRestriction
public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
  return range.split(1000, 10);
}

Without implementing @SplitRestriction, my pipeline can run without any
errors. However, I think the SDF is not really splitted by default, which
defeats the purpose of improving performance.

Does anyone know if @SplitRestriction is currently supported by Dataflow
runner? How can I write a working version with the latest SDK?

Thanks,
Jiayuan

Re: How to implement @SplitRestriction for Splittable DoFn

Posted by Eugene Kirpichov <ki...@google.com>.
Hm which runner is this, does this reproduce with direct runner? The
NullPointerException is particularly worrying, I'd like to investigate it.

On Thu, Apr 12, 2018 at 6:49 PM Jiayuan Ma <ji...@gmail.com> wrote:

> Hi Eugene,
>
> Thanks for your reply. I'm no longer having the previous error. I think
> that error might be because I didn't do a clean build after upgrading SDK from
> 2.3.0 to 2.4.0.
>
> However, I'm having other exceptions with my SDF.
>
> java.lang.OutOfMemoryError: unable to create new native thread
> java.lang.Thread.start0(Native Method)
> java.lang.Thread.start(Thread.java:714)
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
> java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1587)
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334)
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729)
> org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed(OutputAndTimeBoundedSplittableProcessElementInvoker.java:265)
> org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.tryClaim(RestrictionTracker.java:75)
>
> and
>
> java.lang.NullPointerException
> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:96)
> org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:216)
> org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:369)
>
> The old pipeline I'm trying to optimize is like
>
> .apply(GroupByKey.create())
> .apply(ParDo.of(new DoFn<KV<String, Iterable<Object>>, KV<String,
> KV<String, String>>> {
> @ProcessElement
> public void process(...) {
> Iterable<Object> groupedValues = context.element().getValue();
>                for (final Object o1 : groupedValues) {
>                  for (final Object o2 : groupedValues) {
>                       ....
>                  }
>                }
> }
> }))
>
> The optimization I'm doing right now with SDF is roughly like
>
> @ProcessElement
> public void processElement(ProcessContext context, OffsetRangeTracker tracker) {
>     final Iterable<Object> groupedValues = context.element().getValue();
>     final Iterator<Object> it = actions.iterator();
>
>     long index = tracker.currentRestriction().getFrom();
>     Iterators.advance(it, Math.toIntExact(index));
>
>     for (; it.hasNext() && tracker.tryClaim(index); ++index) {
>         final Object o1 = it.next();
>         for (final Object o2 : actions) {
>             ... same old logic ...
>         }
>     }
> }
>
> @GetInitialRestriction
> public OffsetRange getInitialRestriction(final KV<String, Iterable<Object>> groupedValues) {
>     final long size = Iterables.size(groupedValues.getValue());
>     return new OffsetRange(0, size);
> }
>
> @SplitRestriction
> public void splitRestriction(final KV<String, Iterable<Object>> groupedValues,
>                                           final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
>
>   final long size = Iterables.size(groupedValues.getValue());
>
>     for (final OffsetRange p : range.split(1000000 / size, 10)) {
>         receiver.output(p);
>     }
> }
>
> @NewTracker
> public OffsetRangeTracker newTracker(OffsetRange range) {
>     return new OffsetRangeTracker(range);
> }
>
>
> Jiayuan
>
>
>
>
> On Wed, Apr 11, 2018 at 3:54 PM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi! This looks concerning. Can you show a full code example please? Does
>> it run in direct runner?
>>
>> On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <ji...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to use ReplicateFn mentioned in this
>>> <https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up
>>> a nested for loop. The use case is exactly the same as "*Counting
>>> friends in common (cross join by key)*"  section. However, I have
>>> trouble to make it work with beam 2.4.0 SDK.
>>>
>>> I'm implementing @SplitRestriction as follows:
>>>
>>> @SplitRestriction
>>> public void splitRestriction(A element, OffsetRange range,
>>> OutputReceiver<OffsetRange> out) {
>>>   for (final OffsetRange p : range.split(1000, 10)) {
>>>      out.output(p);
>>>   }
>>> }
>>>
>>> Dataflow runner throws exception like this:
>>>
>>> java.util.NoSuchElementException
>>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)
>>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
>>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
>>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97)
>>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71)
>>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61)
>>> com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
>>> com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
>>> com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
>>> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
>>> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
>>> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
>>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211)
>>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
>>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959)
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> java.lang.Thread.run(Thread.java:745)
>>>
>>> I also tried the following as suggested by the javadoc
>>> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html> but
>>> it has errors during pipeline construction.
>>>
>>> @SplitRestriction
>>> public List<OffsetRange> splitRestriction(A element, OffsetRange range)
>>> {
>>>   return range.split(1000, 10);
>>> }
>>>
>>> Without implementing @SplitRestriction, my pipeline can run without any
>>> errors. However, I think the SDF is not really splitted by default, which
>>> defeats the purpose of improving performance.
>>>
>>> Does anyone know if @SplitRestriction is currently supported by
>>> Dataflow runner? How can I write a working version with the latest SDK?
>>>
>>> Thanks,
>>> Jiayuan
>>>
>>
>

Re: How to implement @SplitRestriction for Splittable DoFn

Posted by Jiayuan Ma <ji...@gmail.com>.
Hi Eugene,

Thanks for your reply. I'm no longer having the previous error. I think
that error might be because I didn't do a clean build after upgrading SDK from
2.3.0 to 2.4.0.

However, I'm having other exceptions with my SDF.

java.lang.OutOfMemoryError: unable to create new native thread
java.lang.Thread.start0(Native Method)
java.lang.Thread.start(Thread.java:714)
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1587)
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334)
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729)
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed(OutputAndTimeBoundedSplittableProcessElementInvoker.java:265)
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.tryClaim(RestrictionTracker.java:75)

and

java.lang.NullPointerException
org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:96)
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:216)
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:369)

The old pipeline I'm trying to optimize is like

.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn<KV<String, Iterable<Object>>, KV<String,
KV<String, String>>> {
@ProcessElement
public void process(...) {
Iterable<Object> groupedValues = context.element().getValue();
               for (final Object o1 : groupedValues) {
                 for (final Object o2 : groupedValues) {
                      ....
                 }
               }
}
}))

The optimization I'm doing right now with SDF is roughly like

@ProcessElement
public void processElement(ProcessContext context, OffsetRangeTracker tracker) {
    final Iterable<Object> groupedValues = context.element().getValue();
    final Iterator<Object> it = actions.iterator();

    long index = tracker.currentRestriction().getFrom();
    Iterators.advance(it, Math.toIntExact(index));

    for (; it.hasNext() && tracker.tryClaim(index); ++index) {
        final Object o1 = it.next();
        for (final Object o2 : actions) {
            ... same old logic ...
        }
    }
}

@GetInitialRestriction
public OffsetRange getInitialRestriction(final KV<String,
Iterable<Object>> groupedValues) {
    final long size = Iterables.size(groupedValues.getValue());
    return new OffsetRange(0, size);
}

@SplitRestriction
public void splitRestriction(final KV<String, Iterable<Object>> groupedValues,
                                          final OffsetRange range,
final OutputReceiver<OffsetRange> receiver) {

  final long size = Iterables.size(groupedValues.getValue());

    for (final OffsetRange p : range.split(1000000 / size, 10)) {
        receiver.output(p);
    }
}

@NewTracker
public OffsetRangeTracker newTracker(OffsetRange range) {
    return new OffsetRangeTracker(range);
}


Jiayuan




On Wed, Apr 11, 2018 at 3:54 PM, Eugene Kirpichov <ki...@google.com>
wrote:

> Hi! This looks concerning. Can you show a full code example please? Does
> it run in direct runner?
>
> On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <ji...@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm trying to use ReplicateFn mentioned in this
>> <https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up a
>> nested for loop. The use case is exactly the same as "*Counting friends
>> in common (cross join by key)*"  section. However, I have trouble to
>> make it work with beam 2.4.0 SDK.
>>
>> I'm implementing @SplitRestriction as follows:
>>
>> @SplitRestriction
>> public void splitRestriction(A element, OffsetRange range,
>> OutputReceiver<OffsetRange> out) {
>>   for (final OffsetRange p : range.split(1000, 10)) {
>>      out.output(p);
>>   }
>> }
>>
>> Dataflow runner throws exception like this:
>>
>> java.util.NoSuchElementException com.google.cloud.dataflow.
>> worker.repackaged.com.google.common.collect.
>> MultitransformedIterator.next(MultitransformedIterator.java:63)
>> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.
>> TransformedIterator.next(TransformedIterator.java:47)
>> com.google.cloud.dataflow.worker.repackaged.com.google.
>> common.collect.Iterators.getOnlyElement(Iterators.java:308)
>> com.google.cloud.dataflow.worker.repackaged.com.google.
>> common.collect.Iterables.getOnlyElement(Iterables.java:294)
>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.
>> getUnderlyingWindow(DataflowProcessFnRunner.java:97)
>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.
>> placeIntoElementWindow(DataflowProcessFnRunner.java:71)
>> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(
>> DataflowProcessFnRunner.java:61) com.google.cloud.dataflow.
>> worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
>> com.google.cloud.dataflow.worker.util.common.worker.
>> ParDoOperation.process(ParDoOperation.java:43) com.google.cloud.dataflow.
>> worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
>> com.google.cloud.dataflow.worker.util.common.worker.
>> ReadOperation.runReadLoop(ReadOperation.java:200)
>> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
>> com.google.cloud.dataflow.worker.util.common.worker.
>> MapTaskExecutor.execute(MapTaskExecutor.java:75)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(
>> StreamingDataflowWorker.java:1211) com.google.cloud.dataflow.worker.
>> StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(
>> StreamingDataflowWorker.java:959) java.util.concurrent.
>> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745)
>>
>> I also tried the following as suggested by the javadoc
>> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html> but
>> it has errors during pipeline construction.
>>
>> @SplitRestriction
>> public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
>>   return range.split(1000, 10);
>> }
>>
>> Without implementing @SplitRestriction, my pipeline can run without any
>> errors. However, I think the SDF is not really splitted by default, which
>> defeats the purpose of improving performance.
>>
>> Does anyone know if @SplitRestriction is currently supported by Dataflow
>> runner? How can I write a working version with the latest SDK?
>>
>> Thanks,
>> Jiayuan
>>
>

Re: How to implement @SplitRestriction for Splittable DoFn

Posted by Eugene Kirpichov <ki...@google.com>.
Hi! This looks concerning. Can you show a full code example please? Does it
run in direct runner?

On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <ji...@gmail.com> wrote:

> Hi all,
>
> I'm trying to use ReplicateFn mentioned in this
> <https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up a
> nested for loop. The use case is exactly the same as "*Counting friends
> in common (cross join by key)*"  section. However, I have trouble to make
> it work with beam 2.4.0 SDK.
>
> I'm implementing @SplitRestriction as follows:
>
> @SplitRestriction
> public void splitRestriction(A element, OffsetRange range,
> OutputReceiver<OffsetRange> out) {
>   for (final OffsetRange p : range.split(1000, 10)) {
>      out.output(p);
>   }
> }
>
> Dataflow runner throws exception like this:
>
> java.util.NoSuchElementException
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97)
> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71)
> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61)
> com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
> com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
> com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> I also tried the following as suggested by the javadoc
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html> but
> it has errors during pipeline construction.
>
> @SplitRestriction
> public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
>   return range.split(1000, 10);
> }
>
> Without implementing @SplitRestriction, my pipeline can run without any
> errors. However, I think the SDF is not really splitted by default, which
> defeats the purpose of improving performance.
>
> Does anyone know if @SplitRestriction is currently supported by Dataflow
> runner? How can I write a working version with the latest SDK?
>
> Thanks,
> Jiayuan
>