You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tobias Feldhaus <To...@localsearch.ch> on 2017/02/17 10:36:17 UTC

Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

Hello,

could it be, that it's no longer possible to run pipelines with a BigQuery sink 
locally on the dev machine? I migrated a "Read JSON from GCS, parse and 
write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK. 
All tests are green, the pipeline runs successfully on the Dataflow service with 
the test files, but locally with the DirectRunner I get a NPE.

It happens right after I create the TableRow element which I even double 
checked not to be null. Even when I artificially create a LogLine 
element in this step without taking the one from the input the NPE is thrown:


static class Outputter extends DoFn<LogLine, TableRow> {
(...)
	LogLine logLine = c.element();

	TableRow tableRow = logLine.toTableRow();
	tableRow.set("ts", c.timestamp().toString());

	if (c != null && tableRow != null){
	    try {

	        c.output(tableRow);
	    }
	    catch(NullPointerException e){
	        LOG.error("catched NPE");
	        e.printStackTrace();
	    }
	}

The corrensponding Stacktrace looks like this:

ERROR: catched NPE
java.lang.NullPointerException
	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
	at java.util.Arrays.hashCode(Arrays.java:4146)
	at java.util.Objects.hash(Objects.java:128)
	at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
	at java.util.HashMap.hash(HashMap.java:338)
	at java.util.HashMap.get(HashMap.java:556)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
	at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
	at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
	at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
	at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
	at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Best,
Tobias


Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

Posted by Tobias Feldhaus <To...@localsearch.ch>.
Thank you Dan, that was it!

Tobi


This  looks like the same issue that Scio encountered with the Google API Client libraries: https://github.com/spotify/scio/issues/388

I think that if the `value` is null, you are supposed by BigQuery to omit the key rather than include it with a null value.

On Fri, Feb 17, 2017 at 11:38 PM, Dan Halperin <dh...@google.com>> wrote:
It looks to me like the NPE comes from the Google API client library. It looks like maybe you are creating an invalid tablerow (null key? null value?)

        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)

Dan

On Fri, Feb 17, 2017 at 3:19 PM, Kenneth Knowles <kl...@google.com>> wrote:
Hi Tobias,

The specific error there looks like you have a forbidden null somewhere deep inside the output of logLine.toTableRow(). Hard to say more with this information.

Kenn

On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <To...@localsearch.ch>> wrote:
It seems like this is caused by the fact that the workaround I am using to write
daily-partitioned tables in batch mode does not work.

My problem is that with more than 1000 days, the date-sharded table in BQ will
be too large to be converted automatically via a simple “bq partition” command
into a partitioned table as such table cannot have more than 1000 days.

So the solution will be a divide-and-conquer strategy I guess.

On 17.02.17, 11:36, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:

    Hello,

    could it be, that it's no longer possible to run pipelines with a BigQuery sink
    locally on the dev machine? I migrated a "Read JSON from GCS, parse and
    write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
    All tests are green, the pipeline runs successfully on the Dataflow service with
    the test files, but locally with the DirectRunner I get a NPE.

    It happens right after I create the TableRow element which I even double
    checked not to be null. Even when I artificially create a LogLine
    element in this step without taking the one from the input the NPE is thrown:


    static class Outputter extends DoFn<LogLine, TableRow> {
    (...)
        LogLine logLine = c.element();

        TableRow tableRow = logLine.toTableRow();
        tableRow.set("ts", c.timestamp().toString());

        if (c != null && tableRow != null){
            try {

                c.output(tableRow);
            }
            catch(NullPointerException e){
                LOG.error("catched NPE");
                e.printStackTrace();
            }
        }

    The corrensponding Stacktrace looks like this:

    ERROR: catched NPE
    java.lang.NullPointerException
        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
        at java.util.AbstractMap.hashCode(AbstractMap.java:530)
        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
        at java.util.AbstractMap.hashCode(AbstractMap.java:530)
        at java.util.Arrays.hashCode(Arrays.java:4146)
        at java.util.Objects.hash(Objects.java:128)
        at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
        at java.util.HashMap.hash(HashMap.java:338)
        at java.util.HashMap.get(HashMap.java:556)
        at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
        at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
        at org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
        at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
        at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
        at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
        at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
        at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
        at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
        at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
        at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    Best,
    Tobias






Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

Posted by Dan Halperin <dh...@google.com>.
This  looks like the same issue that Scio encountered with the Google API
Client libraries: https://github.com/spotify/scio/issues/388

I think that if the `value` is null, you are supposed by BigQuery to omit
the key rather than include it with a null value.

On Fri, Feb 17, 2017 at 11:38 PM, Dan Halperin <dh...@google.com> wrote:

> It looks to me like the NPE comes from the Google API client library. It
> looks like maybe you are creating an invalid tablerow (null key? null
> value?)
>
>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
> java:419)
>
> Dan
>
> On Fri, Feb 17, 2017 at 3:19 PM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Hi Tobias,
>>
>> The specific error there looks like you have a forbidden null somewhere
>> deep inside the output of logLine.toTableRow(). Hard to say more with this
>> information.
>>
>> Kenn
>>
>> On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
>> Tobias.Feldhaus@localsearch.ch> wrote:
>>
>>> It seems like this is caused by the fact that the workaround I am using
>>> to write
>>> daily-partitioned tables in batch mode does not work.
>>>
>>> My problem is that with more than 1000 days, the date-sharded table in
>>> BQ will
>>> be too large to be converted automatically via a simple “bq partition”
>>> command
>>> into a partitioned table as such table cannot have more than 1000 days.
>>>
>>> So the solution will be a divide-and-conquer strategy I guess.
>>>
>>> On 17.02.17, 11:36, "Tobias Feldhaus" <To...@localsearch.ch>
>>> wrote:
>>>
>>>     Hello,
>>>
>>>     could it be, that it's no longer possible to run pipelines with a
>>> BigQuery sink
>>>     locally on the dev machine? I migrated a "Read JSON from GCS, parse
>>> and
>>>     write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>>>     All tests are green, the pipeline runs successfully on the Dataflow
>>> service with
>>>     the test files, but locally with the DirectRunner I get a NPE.
>>>
>>>     It happens right after I create the TableRow element which I even
>>> double
>>>     checked not to be null. Even when I artificially create a LogLine
>>>     element in this step without taking the one from the input the NPE
>>> is thrown:
>>>
>>>
>>>     static class Outputter extends DoFn<LogLine, TableRow> {
>>>     (...)
>>>         LogLine logLine = c.element();
>>>
>>>         TableRow tableRow = logLine.toTableRow();
>>>         tableRow.set("ts", c.timestamp().toString());
>>>
>>>         if (c != null && tableRow != null){
>>>             try {
>>>
>>>                 c.output(tableRow);
>>>             }
>>>             catch(NullPointerException e){
>>>                 LOG.error("catched NPE");
>>>                 e.printStackTrace();
>>>             }
>>>         }
>>>
>>>     The corrensponding Stacktrace looks like this:
>>>
>>>     ERROR: catched NPE
>>>     java.lang.NullPointerException
>>>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>>> java:419)
>>>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>>> java:419)
>>>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>>         at java.util.Arrays.hashCode(Arrays.java:4146)
>>>         at java.util.Objects.hash(Objects.java:128)
>>>         at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlo
>>> balWindow.hashCode(WindowedValue.java:409)
>>>         at java.util.HashMap.hash(HashMap.java:338)
>>>         at java.util.HashMap.get(HashMap.java:556)
>>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>>> collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
>>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>>> collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>>> collect.HashMultimap.put(HashMultimap.java:49)
>>>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFac
>>> tory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBun
>>> dleFactory.java:112)
>>>         at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputMa
>>> nager.output(ParDoEvaluator.java:198)
>>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.ou
>>> tputWindowedValue(SimpleDoFnRunner.java:352)
>>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessCon
>>> text.output(SimpleDoFnRunner.java:553)
>>>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>>> .processElement(FrontendPipeline.java:181)
>>>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>>> $auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
>>>         at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessE
>>> lement(SimpleDoFnRunner.java:199)
>>>         at org.apache.beam.runners.core.SimpleDoFnRunner.processElement
>>> (SimpleDoFnRunner.java:161)
>>>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>>> cessElement(PushbackSideInputDoFnRunner.java:111)
>>>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>>> cessElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
>>>         at org.apache.beam.runners.direct.ParDoEvaluator.processElement
>>> (ParDoEvaluator.java:134)
>>>         at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingT
>>> ransformEvaluator.processElement(DoFnLifecycleManagerRemovin
>>> gTransformEvaluator.java:51)
>>>         at org.apache.beam.runners.direct.TransformExecutor.processElem
>>> ents(TransformExecutor.java:139)
>>>         at org.apache.beam.runners.direct.TransformExecutor.run(Transfo
>>> rmExecutor.java:107)
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>> s.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>     Best,
>>>     Tobias
>>>
>>>
>>>
>>>
>>
>

Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

Posted by Dan Halperin <dh...@google.com>.
It looks to me like the NPE comes from the Google API client library. It
looks like maybe you are creating an invalid tablerow (null key? null
value?)

        at com.google.api.client.util.ArrayMap$Entry.hashCode(
ArrayMap.java:419)

Dan

On Fri, Feb 17, 2017 at 3:19 PM, Kenneth Knowles <kl...@google.com> wrote:

> Hi Tobias,
>
> The specific error there looks like you have a forbidden null somewhere
> deep inside the output of logLine.toTableRow(). Hard to say more with this
> information.
>
> Kenn
>
> On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
> Tobias.Feldhaus@localsearch.ch> wrote:
>
>> It seems like this is caused by the fact that the workaround I am using
>> to write
>> daily-partitioned tables in batch mode does not work.
>>
>> My problem is that with more than 1000 days, the date-sharded table in BQ
>> will
>> be too large to be converted automatically via a simple “bq partition”
>> command
>> into a partitioned table as such table cannot have more than 1000 days.
>>
>> So the solution will be a divide-and-conquer strategy I guess.
>>
>> On 17.02.17, 11:36, "Tobias Feldhaus" <To...@localsearch.ch>
>> wrote:
>>
>>     Hello,
>>
>>     could it be, that it's no longer possible to run pipelines with a
>> BigQuery sink
>>     locally on the dev machine? I migrated a "Read JSON from GCS, parse
>> and
>>     write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>>     All tests are green, the pipeline runs successfully on the Dataflow
>> service with
>>     the test files, but locally with the DirectRunner I get a NPE.
>>
>>     It happens right after I create the TableRow element which I even
>> double
>>     checked not to be null. Even when I artificially create a LogLine
>>     element in this step without taking the one from the input the NPE is
>> thrown:
>>
>>
>>     static class Outputter extends DoFn<LogLine, TableRow> {
>>     (...)
>>         LogLine logLine = c.element();
>>
>>         TableRow tableRow = logLine.toTableRow();
>>         tableRow.set("ts", c.timestamp().toString());
>>
>>         if (c != null && tableRow != null){
>>             try {
>>
>>                 c.output(tableRow);
>>             }
>>             catch(NullPointerException e){
>>                 LOG.error("catched NPE");
>>                 e.printStackTrace();
>>             }
>>         }
>>
>>     The corrensponding Stacktrace looks like this:
>>
>>     ERROR: catched NPE
>>     java.lang.NullPointerException
>>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>> java:419)
>>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>> java:419)
>>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>         at java.util.Arrays.hashCode(Arrays.java:4146)
>>         at java.util.Objects.hash(Objects.java:128)
>>         at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlo
>> balWindow.hashCode(WindowedValue.java:409)
>>         at java.util.HashMap.hash(HashMap.java:338)
>>         at java.util.HashMap.get(HashMap.java:556)
>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.HashMultimap.put(HashMultimap.java:49)
>>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFac
>> tory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBun
>> dleFactory.java:112)
>>         at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputMa
>> nager.output(ParDoEvaluator.java:198)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.ou
>> tputWindowedValue(SimpleDoFnRunner.java:352)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessCon
>> text.output(SimpleDoFnRunner.java:553)
>>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>> .processElement(FrontendPipeline.java:181)
>>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>> $auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessE
>> lement(SimpleDoFnRunner.java:199)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner.processElement
>> (SimpleDoFnRunner.java:161)
>>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElement(PushbackSideInputDoFnRunner.java:111)
>>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
>>         at org.apache.beam.runners.direct.ParDoEvaluator.processElement
>> (ParDoEvaluator.java:134)
>>         at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingT
>> ransformEvaluator.processElement(DoFnLifecycleManagerRemovin
>> gTransformEvaluator.java:51)
>>         at org.apache.beam.runners.direct.TransformExecutor.processElem
>> ents(TransformExecutor.java:139)
>>         at org.apache.beam.runners.direct.TransformExecutor.run(Transfo
>> rmExecutor.java:107)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>     Best,
>>     Tobias
>>
>>
>>
>>
>

Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

Posted by Kenneth Knowles <kl...@google.com>.
Hi Tobias,

The specific error there looks like you have a forbidden null somewhere
deep inside the output of logLine.toTableRow(). Hard to say more with this
information.

Kenn

On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
Tobias.Feldhaus@localsearch.ch> wrote:

> It seems like this is caused by the fact that the workaround I am using to
> write
> daily-partitioned tables in batch mode does not work.
>
> My problem is that with more than 1000 days, the date-sharded table in BQ
> will
> be too large to be converted automatically via a simple “bq partition”
> command
> into a partitioned table as such table cannot have more than 1000 days.
>
> So the solution will be a divide-and-conquer strategy I guess.
>
> On 17.02.17, 11:36, "Tobias Feldhaus" <To...@localsearch.ch>
> wrote:
>
>     Hello,
>
>     could it be, that it's no longer possible to run pipelines with a
> BigQuery sink
>     locally on the dev machine? I migrated a "Read JSON from GCS, parse and
>     write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>     All tests are green, the pipeline runs successfully on the Dataflow
> service with
>     the test files, but locally with the DirectRunner I get a NPE.
>
>     It happens right after I create the TableRow element which I even
> double
>     checked not to be null. Even when I artificially create a LogLine
>     element in this step without taking the one from the input the NPE is
> thrown:
>
>
>     static class Outputter extends DoFn<LogLine, TableRow> {
>     (...)
>         LogLine logLine = c.element();
>
>         TableRow tableRow = logLine.toTableRow();
>         tableRow.set("ts", c.timestamp().toString());
>
>         if (c != null && tableRow != null){
>             try {
>
>                 c.output(tableRow);
>             }
>             catch(NullPointerException e){
>                 LOG.error("catched NPE");
>                 e.printStackTrace();
>             }
>         }
>
>     The corrensponding Stacktrace looks like this:
>
>     ERROR: catched NPE
>     java.lang.NullPointerException
>         at com.google.api.client.util.ArrayMap$Entry.hashCode(
> ArrayMap.java:419)
>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>         at com.google.api.client.util.ArrayMap$Entry.hashCode(
> ArrayMap.java:419)
>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>         at java.util.Arrays.hashCode(Arrays.java:4146)
>         at java.util.Objects.hash(Objects.java:128)
>         at org.apache.beam.sdk.util.WindowedValue$
> TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
>         at java.util.HashMap.hash(HashMap.java:338)
>         at java.util.HashMap.get(HashMap.java:556)
>         at org.apache.beam.runners.direct.repackaged.com.google.
> common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:
> 193)
>         at org.apache.beam.runners.direct.repackaged.com.google.
> common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>         at org.apache.beam.runners.direct.repackaged.com.google.
> common.collect.HashMultimap.put(HashMultimap.java:49)
>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFact
> ory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFact
> ory.java:112)
>         at org.apache.beam.runners.direct.ParDoEvaluator$
> BundleOutputManager.output(ParDoEvaluator.java:198)
>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.
> outputWindowedValue(SimpleDoFnRunner.java:352)
>         at org.apache.beam.runners.core.SimpleDoFnRunner$
> DoFnProcessContext.output(SimpleDoFnRunner.java:553)
>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$
> Outputter.processElement(FrontendPipeline.java:181)
>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$
> Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
>         at org.apache.beam.runners.core.SimpleDoFnRunner.
> invokeProcessElement(SimpleDoFnRunner.java:199)
>         at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(
> SimpleDoFnRunner.java:161)
>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.
> processElement(PushbackSideInputDoFnRunner.java:111)
>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.
> processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
>         at org.apache.beam.runners.direct.ParDoEvaluator.
> processElement(ParDoEvaluator.java:134)
>         at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTr
> ansformEvaluator.processElement(DoFnLifecycleManagerRemovingTr
> ansformEvaluator.java:51)
>         at org.apache.beam.runners.direct.TransformExecutor.
> processElements(TransformExecutor.java:139)
>         at org.apache.beam.runners.direct.TransformExecutor.run(
> TransformExecutor.java:107)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>     Best,
>     Tobias
>
>
>
>

Re: Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner

Posted by Tobias Feldhaus <To...@localsearch.ch>.
It seems like this is caused by the fact that the workaround I am using to write 
daily-partitioned tables in batch mode does not work.

My problem is that with more than 1000 days, the date-sharded table in BQ will 
be too large to be converted automatically via a simple “bq partition” command 
into a partitioned table as such table cannot have more than 1000 days. 

So the solution will be a divide-and-conquer strategy I guess.

On 17.02.17, 11:36, "Tobias Feldhaus" <To...@localsearch.ch> wrote:

    Hello,
    
    could it be, that it's no longer possible to run pipelines with a BigQuery sink 
    locally on the dev machine? I migrated a "Read JSON from GCS, parse and 
    write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK. 
    All tests are green, the pipeline runs successfully on the Dataflow service with 
    the test files, but locally with the DirectRunner I get a NPE.
    
    It happens right after I create the TableRow element which I even double 
    checked not to be null. Even when I artificially create a LogLine 
    element in this step without taking the one from the input the NPE is thrown:
    
    
    static class Outputter extends DoFn<LogLine, TableRow> {
    (...)
    	LogLine logLine = c.element();
    
    	TableRow tableRow = logLine.toTableRow();
    	tableRow.set("ts", c.timestamp().toString());
    
    	if (c != null && tableRow != null){
    	    try {
    
    	        c.output(tableRow);
    	    }
    	    catch(NullPointerException e){
    	        LOG.error("catched NPE");
    	        e.printStackTrace();
    	    }
    	}
    
    The corrensponding Stacktrace looks like this:
    
    ERROR: catched NPE
    java.lang.NullPointerException
    	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
    	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
    	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
    	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
    	at java.util.Arrays.hashCode(Arrays.java:4146)
    	at java.util.Objects.hash(Objects.java:128)
    	at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
    	at java.util.HashMap.hash(HashMap.java:338)
    	at java.util.HashMap.get(HashMap.java:556)
    	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
    	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
    	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
    	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
    	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
    	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
    	at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
    	at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
    	at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
    	at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
    	at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
    	at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
    	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
    	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
    	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
    	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    
    Best,
    Tobias