You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Thomas Weise <th...@gmail.com> on 2016/08/17 04:40:20 UTC

OldDoFn - CounterSet replacement

I'm trying to rebase a PR and adjust for the DoFn changes.

CounterSet is gone and there is now AggregatorFactory and I'm looking to
fix an existing usage of org.apache.beam.sdk.util.DoFnRunners.simpleRunner.

Given the instance of OldDoFn, what is the recommended way to obtain the
aggregator factory when creating the fn runner?

Thanks!


java.lang.NullPointerException
at
org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.createAggregatorInternal(DoFnRunnerBase.java:348)
at
org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(OldDoFn.java:224)
at
org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(OldDoFn.java:215)
at
org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<init>(DoFnRunnerBase.java:214)
at org.apache.beam.sdk.util.DoFnRunnerBase.<init>(DoFnRunnerBase.java:87)
at
org.apache.beam.sdk.util.SimpleDoFnRunner.<init>(SimpleDoFnRunner.java:42)
at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(DoFnRunners.java:60)

Re: OldDoFn - CounterSet replacement

Posted by Ben Chambers <bc...@google.com.INVALID>.
Ah, I see. I wasn't aware this was in the context of a new runner.

So -- the new mechanism makes it harder to do the "wrong thing" (not wire
up aggregators). But, since you're goal is to get a baseline without proper
support for aggregators, you should be able to create a NoopAggregator
(that ignores the values) and a NoopAggregatorFactory (that creates
NoopAggregators). That should establish your baseline, while also making it
really clear that aggregators are not supported.

Also note that as part of looking at
https://issues.apache.org/jira/browse/BEAM-147 and
https://issues.apache.org/jira/browse/BEAM-458 aggregators may be getting
simpler to support, so let us know before you spend a lot of time actually
wiring them up.

On Wed, Aug 17, 2016 at 8:43 AM Thomas Weise <th...@gmail.com> wrote:

> Hi Ben,
>
> Thanks for the reply. Here is the PR:
>
> https://github.com/apache/incubator-beam/pull/540
>
> The doFnRunner instantiation in old style is here:
>
>
> https://github.com/apache/incubator-beam/pull/540/files#diff-86746f538c22ebafd06fca17f0d0aa94R116
>
> I should also note that focus of the PR is to establish the Apex runner
> baseline and proper support for aggregators isn't part of it, it's
> something I was planning to take up in subsequent round.
>
> Thomas
>
>
> On Wed, Aug 17, 2016 at 8:14 AM, Ben Chambers <bc...@apache.org>
> wrote:
>
> > Hi Thomas!
> >
> > On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise <th...@gmail.com>
> > wrote:
> >
> > > I'm trying to rebase a PR and adjust for the DoFn changes.
> > >
> >
> > Can you elaborate on what you're trying to do (or send a link to the PR)?
> >
> >
> > > CounterSet is gone and there is now AggregatorFactory and I'm looking
> to
> > > fix an existing usage of org.apache.beam.sdk.util.
> > DoFnRunners.simpleRunner.
> > >
> >
> > In practice, these should act the same. CounterSet was an implementation
> > detail used to create implementation-specific Counters. The DoFnRunner
> was
> > supposed to get the CounterSet that was wired up correctly. Now, the
> > AggregatorFactory serves the role of creating wired-up Aggregators. As
> > before, the DoFnRunner should be instantiated with an AggregatorFactory
> > wired up to appropriately.
> >
> >
> > > Given the instance of OldDoFn, what is the recommended way to obtain
> the
> > > aggregator factory when creating the fn runner?
> > >
> >
> > This should come from the runner. When the runner wants to instantiate a
> > DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
> > will wire up aggregators appropriately.
> >
> >
> > > Thanks!
> > >
> > >
> > > java.lang.NullPointerException
> > > at
> > >
> > > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.
> > createAggregatorInternal(DoFnRunnerBase.java:348)
> > > at
> > >
> > > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(
> > OldDoFn.java:224)
> > > at
> > >
> > >
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(
> > OldDoFn.java:215)
> > > at
> > >
> > > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<
> > init>(DoFnRunnerBase.java:214)
> > > at org.apache.beam.sdk.util.DoFnRunnerBase.<init>(
> > DoFnRunnerBase.java:87)
> > > at
> > > org.apache.beam.sdk.util.SimpleDoFnRunner.<init>(
> > SimpleDoFnRunner.java:42)
> > > at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(
> > DoFnRunners.java:60)
> > >
> >
>

Re: OldDoFn - CounterSet replacement

Posted by Thomas Weise <th...@gmail.com>.
Hi Ben,

Thanks for the reply. Here is the PR:

https://github.com/apache/incubator-beam/pull/540

The doFnRunner instantiation in old style is here:

https://github.com/apache/incubator-beam/pull/540/files#diff-86746f538c22ebafd06fca17f0d0aa94R116

I should also note that focus of the PR is to establish the Apex runner
baseline and proper support for aggregators isn't part of it, it's
something I was planning to take up in subsequent round.

Thomas


On Wed, Aug 17, 2016 at 8:14 AM, Ben Chambers <bc...@apache.org> wrote:

> Hi Thomas!
>
> On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise <th...@gmail.com>
> wrote:
>
> > I'm trying to rebase a PR and adjust for the DoFn changes.
> >
>
> Can you elaborate on what you're trying to do (or send a link to the PR)?
>
>
> > CounterSet is gone and there is now AggregatorFactory and I'm looking to
> > fix an existing usage of org.apache.beam.sdk.util.
> DoFnRunners.simpleRunner.
> >
>
> In practice, these should act the same. CounterSet was an implementation
> detail used to create implementation-specific Counters. The DoFnRunner was
> supposed to get the CounterSet that was wired up correctly. Now, the
> AggregatorFactory serves the role of creating wired-up Aggregators. As
> before, the DoFnRunner should be instantiated with an AggregatorFactory
> wired up to appropriately.
>
>
> > Given the instance of OldDoFn, what is the recommended way to obtain the
> > aggregator factory when creating the fn runner?
> >
>
> This should come from the runner. When the runner wants to instantiate a
> DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
> will wire up aggregators appropriately.
>
>
> > Thanks!
> >
> >
> > java.lang.NullPointerException
> > at
> >
> > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.
> createAggregatorInternal(DoFnRunnerBase.java:348)
> > at
> >
> > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(
> OldDoFn.java:224)
> > at
> >
> > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(
> OldDoFn.java:215)
> > at
> >
> > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<
> init>(DoFnRunnerBase.java:214)
> > at org.apache.beam.sdk.util.DoFnRunnerBase.<init>(
> DoFnRunnerBase.java:87)
> > at
> > org.apache.beam.sdk.util.SimpleDoFnRunner.<init>(
> SimpleDoFnRunner.java:42)
> > at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(
> DoFnRunners.java:60)
> >
>

Re: OldDoFn - CounterSet replacement

Posted by Ben Chambers <bc...@apache.org>.
Hi Thomas!

On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise <th...@gmail.com> wrote:

> I'm trying to rebase a PR and adjust for the DoFn changes.
>

Can you elaborate on what you're trying to do (or send a link to the PR)?


> CounterSet is gone and there is now AggregatorFactory and I'm looking to
> fix an existing usage of org.apache.beam.sdk.util.DoFnRunners.simpleRunner.
>

In practice, these should act the same. CounterSet was an implementation
detail used to create implementation-specific Counters. The DoFnRunner was
supposed to get the CounterSet that was wired up correctly. Now, the
AggregatorFactory serves the role of creating wired-up Aggregators. As
before, the DoFnRunner should be instantiated with an AggregatorFactory
wired up to appropriately.


> Given the instance of OldDoFn, what is the recommended way to obtain the
> aggregator factory when creating the fn runner?
>

This should come from the runner. When the runner wants to instantiate a
DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
will wire up aggregators appropriately.


> Thanks!
>
>
> java.lang.NullPointerException
> at
>
> org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.createAggregatorInternal(DoFnRunnerBase.java:348)
> at
>
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(OldDoFn.java:224)
> at
>
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(OldDoFn.java:215)
> at
>
> org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<init>(DoFnRunnerBase.java:214)
> at org.apache.beam.sdk.util.DoFnRunnerBase.<init>(DoFnRunnerBase.java:87)
> at
> org.apache.beam.sdk.util.SimpleDoFnRunner.<init>(SimpleDoFnRunner.java:42)
> at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(DoFnRunners.java:60)
>