You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kirill Zhdanovich <kz...@gmail.com> on 2020/07/08 14:07:15 UTC

TableRow class is not the same after serialization

Hi,
I want to test pipeline and the input for it is PCollection of TableRows.
I've created a test, and when I run it I get an error:

java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
to class com.google.api.services.bigquery.model.TableRow

Is it a known issue? Thank you in advance

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
Cool! Thanks a lot for your explanation and your time, Jeff, very much
appreciated.

On Thu, 9 Jul 2020 at 17:27, Jeff Klukas <jk...@mozilla.com> wrote:

> On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> So I guess I need to switch to Map<String, Object> instead of TableRow?
>>
>
> Yes, I would definitely recommend that you switch to Map<String, Object>.
> That's the most basic interface, and every deserialization of a top-level
> TableRow object must provide objects matching that interface wherever the
> BQ schema has a nested STRUCT/RECORD.
>
> Note that the latest docs for BigQueryIO do include a table that maps BQ
> types to Java types, but unfortunately that table lists STRUCTs as mapping
> to avro GenericRecord, which doesn't give you the info you need to
> understand the Map<String, Object> interface inside TableRows:
>
>
> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html
>
> You may want to file a JIRA ticket requesting more explicit documentation
> about how to parse structs out of TableRow objects.
>
>
>> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> It looks like the fact that your pipeline in production produces nested
>>> TableRows is an artifact of the following decision within BigQueryIO logic:
>>>
>>>
>>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>>>
>>> The convertGenericRecordToTableRow function is used recursively for
>>> RECORD-type fields, so you end up with nested TableRows in the PCollection
>>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
>>> Jackson ObjectMapper, which makes different decisions as to what map type
>>> to use.
>>>
>>> > Thanks for explaining. Is it documented somewhere that TableRow
>>> contains Map<String, Object>?
>>>
>>> I don't see that explicitly spelled out anywhere. If you follow the
>>> trail of links from TableRow, you'll get to these docs about Google's JSON
>>> handling in Java, which may or may not be relevant to this question:
>>>
>>> https://googleapis.github.io/google-http-java-client/json.html
>>>
>>>
>>>
>>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kz...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for explaining. Is it documented somewhere that TableRow
>>>> contains Map<String, Object>?
>>>> I don't construct it, I fetch from Google Analytics export to BigQuery
>>>> table.
>>>>
>>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>
>>>>> I would expect the following line to fail:
>>>>>
>>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>>
>>>>> The top-level bigQueryRow will be a TableRow, but
>>>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>>>> class that implements `Map`. So that line needs to become:
>>>>>
>>>>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>>>>> bigQueryRow.get("hits"));
>>>>>
>>>>> And then your constructor for Hit must accept a Map<String, Object>
>>>>> rather than a TableRow.
>>>>>
>>>>> I imagine that TableRow is only intended to be used as a top-level
>>>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>>>> inside it are not logically table rows; they're BQ structs that are modeled
>>>>> as maps in JSON and Map<String, Object> in Java.
>>>>>
>>>>> Are you manually constructing TableRow objects with nested TableRows?
>>>>> I would expect that a result from BigQueryIO.read() would give a TableRow
>>>>> with some other map type for nested structs, so I'm surprised that this
>>>>> cast works in some contexts.
>>>>>
>>>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <
>>>>> kzhdanovich@gmail.com> wrote:
>>>>>
>>>>>>    I changed code a little bit not to use lambdas.
>>>>>>
>>>>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>>>         List<Hit> hits = new ArrayList<>();
>>>>>>
>>>>>>         for (TableRow tableRow : h) { <-- breaks here
>>>>>>             hits.add(new Hit(tableRow));
>>>>>>         }
>>>>>>         ...
>>>>>>     }
>>>>>>
>>>>>> Stack trace
>>>>>>
>>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>>> loader 'app')
>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>>>>> to class com.google.api.services.bigquery.model.TableRow
>>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>>> loader 'app')
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>>>>> at
>>>>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>> at
>>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>>>>> at
>>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>>>> at
>>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>>>>> at
>>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>>>> at
>>>>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>>>> at
>>>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>>>>> at
>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>>>>> at
>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>>>>> at
>>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>>>>> at
>>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>>>>> at
>>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>>>>> at
>>>>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>>>>> at
>>>>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>> at
>>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>>>> at
>>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>>>> at
>>>>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>>>>> at
>>>>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>>>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>>>>> at
>>>>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>> at
>>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>>>> at
>>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>>>> at
>>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>>>>> at
>>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>>>>> at
>>>>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>>>>> at
>>>>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>>>>> at
>>>>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>>>>> at
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>>> at
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>> at
>>>>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>>>> Caused by: java.lang.ClassCastException: class
>>>>>> java.util.LinkedHashMap cannot be cast to class
>>>>>> com.google.api.services.bigquery.model.TableRow (java.util.LinkedHashMap is
>>>>>> in module java.base of loader 'bootstrap';
>>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>>> loader 'app')
>>>>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>>>>> at
>>>>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>>>>
>>>>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>>>
>>>>>>> Does the stack trace tell you where specifically in the code the
>>>>>>> cast is happening? I'm guessing there may be assumptions inside your
>>>>>>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>>>>>>> objects.
>>>>>>>
>>>>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <
>>>>>>> kzhdanovich@gmail.com> wrote:
>>>>>>>
>>>>>>>> Interesting. All my code does is following:
>>>>>>>>
>>>>>>>> public static void main(String[] args) {
>>>>>>>>     PCollection<TableRow> bqResult =
>>>>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>>>>> string params);
>>>>>>>>     // store results
>>>>>>>> }
>>>>>>>>
>>>>>>>> and
>>>>>>>>
>>>>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>>>>> bqResult,
>>>>>>>>
>>>>>>>>         ...) {
>>>>>>>>         return bqResult
>>>>>>>>                 // In this step I convert TableRow into my custom
>>>>>>>> class object
>>>>>>>>                 .apply("Create metrics based on sessions",
>>>>>>>>                         ParDo.of(new CreateSessionMetrics(boolean
>>>>>>>> and string params)))
>>>>>>>>                // few more transformations
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> This is basically similar to examples you can find here
>>>>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>>>>
>>>>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>>>>> kzhdanovich@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> So from what I understand, it works like this by design and it's
>>>>>>>>>> not possible to test my code with the current coder implementation. Is that
>>>>>>>>>> correct?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would argue that this test failure is indicating an area of
>>>>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>>>>> your current production pipeline relies on fusion which is not guaranteed
>>>>>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>>>>
>>>>>>>>> Is it possible to update your code such that it does not need to
>>>>>>>>> make assumptions about the concrete Map type returned by TableRow objects?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards,
>>>>>>>> Kirill
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Kirill
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> So I guess I need to switch to Map<String, Object> instead of TableRow?
>

Yes, I would definitely recommend that you switch to Map<String, Object>.
That's the most basic interface, and every deserialization of a top-level
TableRow object must provide objects matching that interface wherever the
BQ schema has a nested STRUCT/RECORD.

Note that the latest docs for BigQueryIO do include a table that maps BQ
types to Java types, but unfortunately that table lists STRUCTs as mapping
to avro GenericRecord, which doesn't give you the info you need to
understand the Map<String, Object> interface inside TableRows:

https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

You may want to file a JIRA ticket requesting more explicit documentation
about how to parse structs out of TableRow objects.


> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas <jk...@mozilla.com> wrote:
>
>> It looks like the fact that your pipeline in production produces nested
>> TableRows is an artifact of the following decision within BigQueryIO logic:
>>
>>
>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>>
>> The convertGenericRecordToTableRow function is used recursively for
>> RECORD-type fields, so you end up with nested TableRows in the PCollection
>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
>> Jackson ObjectMapper, which makes different decisions as to what map type
>> to use.
>>
>> > Thanks for explaining. Is it documented somewhere that TableRow
>> contains Map<String, Object>?
>>
>> I don't see that explicitly spelled out anywhere. If you follow the trail
>> of links from TableRow, you'll get to these docs about Google's JSON
>> handling in Java, which may or may not be relevant to this question:
>>
>> https://googleapis.github.io/google-http-java-client/json.html
>>
>>
>>
>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kz...@gmail.com>
>> wrote:
>>
>>> Thanks for explaining. Is it documented somewhere that TableRow contains
>>> Map<String, Object>?
>>> I don't construct it, I fetch from Google Analytics export to BigQuery
>>> table.
>>>
>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> I would expect the following line to fail:
>>>>
>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>
>>>> The top-level bigQueryRow will be a TableRow, but
>>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>>> class that implements `Map`. So that line needs to become:
>>>>
>>>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>>>> bigQueryRow.get("hits"));
>>>>
>>>> And then your constructor for Hit must accept a Map<String, Object>
>>>> rather than a TableRow.
>>>>
>>>> I imagine that TableRow is only intended to be used as a top-level
>>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>>> inside it are not logically table rows; they're BQ structs that are modeled
>>>> as maps in JSON and Map<String, Object> in Java.
>>>>
>>>> Are you manually constructing TableRow objects with nested TableRows? I
>>>> would expect that a result from BigQueryIO.read() would give a TableRow
>>>> with some other map type for nested structs, so I'm surprised that this
>>>> cast works in some contexts.
>>>>
>>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kz...@gmail.com>
>>>> wrote:
>>>>
>>>>>    I changed code a little bit not to use lambdas.
>>>>>
>>>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>>         List<Hit> hits = new ArrayList<>();
>>>>>
>>>>>         for (TableRow tableRow : h) { <-- breaks here
>>>>>             hits.add(new Hit(tableRow));
>>>>>         }
>>>>>         ...
>>>>>     }
>>>>>
>>>>> Stack trace
>>>>>
>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>> loader 'app')
>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>>>> to class com.google.api.services.bigquery.model.TableRow
>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>> loader 'app')
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>>>> at
>>>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>>>> at
>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>>> at
>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>>>> at
>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>>> at
>>>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>>> at
>>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>>>> at
>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>>>> at
>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>>> at
>>>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>>>> at
>>>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>>> at
>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>>>> at
>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>>>> at
>>>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>>>> at
>>>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>>>> at
>>>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>> at
>>>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>>>>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>> loader 'app')
>>>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>>>> at
>>>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>>>
>>>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>>
>>>>>> Does the stack trace tell you where specifically in the code the cast
>>>>>> is happening? I'm guessing there may be assumptions inside your
>>>>>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>>>>>> objects.
>>>>>>
>>>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <
>>>>>> kzhdanovich@gmail.com> wrote:
>>>>>>
>>>>>>> Interesting. All my code does is following:
>>>>>>>
>>>>>>> public static void main(String[] args) {
>>>>>>>     PCollection<TableRow> bqResult =
>>>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>>>> string params);
>>>>>>>     // store results
>>>>>>> }
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>>>> bqResult,
>>>>>>>
>>>>>>>         ...) {
>>>>>>>         return bqResult
>>>>>>>                 // In this step I convert TableRow into my custom
>>>>>>> class object
>>>>>>>                 .apply("Create metrics based on sessions",
>>>>>>>                         ParDo.of(new CreateSessionMetrics(boolean
>>>>>>> and string params)))
>>>>>>>                // few more transformations
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> This is basically similar to examples you can find here
>>>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>>>
>>>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>>>> kzhdanovich@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> So from what I understand, it works like this by design and it's
>>>>>>>>> not possible to test my code with the current coder implementation. Is that
>>>>>>>>> correct?
>>>>>>>>>
>>>>>>>>
>>>>>>>> I would argue that this test failure is indicating an area of
>>>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>>>> your current production pipeline relies on fusion which is not guaranteed
>>>>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>>>
>>>>>>>> Is it possible to update your code such that it does not need to
>>>>>>>> make assumptions about the concrete Map type returned by TableRow objects?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Kirill
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Kirill
>>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>
> --
> Best Regards,
> Kirill
>

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
So I guess I need to switch to Map<String, Object> instead of TableRow?

On Thu, 9 Jul 2020 at 17:13, Jeff Klukas <jk...@mozilla.com> wrote:

> It looks like the fact that your pipeline in production produces nested
> TableRows is an artifact of the following decision within BigQueryIO logic:
>
>
> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>
> The convertGenericRecordToTableRow function is used recursively for
> RECORD-type fields, so you end up with nested TableRows in the PCollection
> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
> Jackson ObjectMapper, which makes different decisions as to what map type
> to use.
>
> > Thanks for explaining. Is it documented somewhere that TableRow contains
> Map<String, Object>?
>
> I don't see that explicitly spelled out anywhere. If you follow the trail
> of links from TableRow, you'll get to these docs about Google's JSON
> handling in Java, which may or may not be relevant to this question:
>
> https://googleapis.github.io/google-http-java-client/json.html
>
>
>
> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> Thanks for explaining. Is it documented somewhere that TableRow contains
>> Map<String, Object>?
>> I don't construct it, I fetch from Google Analytics export to BigQuery
>> table.
>>
>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> I would expect the following line to fail:
>>>
>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>
>>> The top-level bigQueryRow will be a TableRow, but
>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>> class that implements `Map`. So that line needs to become:
>>>
>>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>>> bigQueryRow.get("hits"));
>>>
>>> And then your constructor for Hit must accept a Map<String, Object>
>>> rather than a TableRow.
>>>
>>> I imagine that TableRow is only intended to be used as a top-level
>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>> inside it are not logically table rows; they're BQ structs that are modeled
>>> as maps in JSON and Map<String, Object> in Java.
>>>
>>> Are you manually constructing TableRow objects with nested TableRows? I
>>> would expect that a result from BigQueryIO.read() would give a TableRow
>>> with some other map type for nested structs, so I'm surprised that this
>>> cast works in some contexts.
>>>
>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kz...@gmail.com>
>>> wrote:
>>>
>>>>    I changed code a little bit not to use lambdas.
>>>>
>>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>         List<Hit> hits = new ArrayList<>();
>>>>
>>>>         for (TableRow tableRow : h) { <-- breaks here
>>>>             hits.add(new Hit(tableRow));
>>>>         }
>>>>         ...
>>>>     }
>>>>
>>>> Stack trace
>>>>
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>>> to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>>> at
>>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>>> at
>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>> at
>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>>> at
>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>> at
>>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>> at
>>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>>> at
>>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>>> at
>>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>> at
>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>> at
>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>>> at
>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>>> at
>>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>>> at
>>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>>> at
>>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> at
>>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>>>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>>> at
>>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>>
>>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>
>>>>> Does the stack trace tell you where specifically in the code the cast
>>>>> is happening? I'm guessing there may be assumptions inside your
>>>>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>>>>> objects.
>>>>>
>>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <
>>>>> kzhdanovich@gmail.com> wrote:
>>>>>
>>>>>> Interesting. All my code does is following:
>>>>>>
>>>>>> public static void main(String[] args) {
>>>>>>     PCollection<TableRow> bqResult =
>>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>>> string params);
>>>>>>     // store results
>>>>>> }
>>>>>>
>>>>>> and
>>>>>>
>>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>>> bqResult,
>>>>>>
>>>>>>       ...) {
>>>>>>         return bqResult
>>>>>>                 // In this step I convert TableRow into my custom
>>>>>> class object
>>>>>>                 .apply("Create metrics based on sessions",
>>>>>>                         ParDo.of(new CreateSessionMetrics(boolean and
>>>>>> string params)))
>>>>>>                // few more transformations
>>>>>>
>>>>>> }
>>>>>>
>>>>>> This is basically similar to examples you can find here
>>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>>
>>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>>>
>>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>>> kzhdanovich@gmail.com> wrote:
>>>>>>>
>>>>>>>> So from what I understand, it works like this by design and it's
>>>>>>>> not possible to test my code with the current coder implementation. Is that
>>>>>>>> correct?
>>>>>>>>
>>>>>>>
>>>>>>> I would argue that this test failure is indicating an area of
>>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>>> your current production pipeline relies on fusion which is not guaranteed
>>>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>>
>>>>>>> Is it possible to update your code such that it does not need to
>>>>>>> make assumptions about the concrete Map type returned by TableRow objects?
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Kirill
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
It looks like the fact that your pipeline in production produces nested
TableRows is an artifact of the following decision within BigQueryIO logic:

https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350

The convertGenericRecordToTableRow function is used recursively for
RECORD-type fields, so you end up with nested TableRows in the PCollection
returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
Jackson ObjectMapper, which makes different decisions as to what map type
to use.

> Thanks for explaining. Is it documented somewhere that TableRow contains
Map<String, Object>?

I don't see that explicitly spelled out anywhere. If you follow the trail
of links from TableRow, you'll get to these docs about Google's JSON
handling in Java, which may or may not be relevant to this question:

https://googleapis.github.io/google-http-java-client/json.html



On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> Thanks for explaining. Is it documented somewhere that TableRow contains
> Map<String, Object>?
> I don't construct it, I fetch from Google Analytics export to BigQuery
> table.
>
> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jk...@mozilla.com> wrote:
>
>> I would expect the following line to fail:
>>
>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>
>> The top-level bigQueryRow will be a TableRow, but
>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>> class that implements `Map`. So that line needs to become:
>>
>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>> bigQueryRow.get("hits"));
>>
>> And then your constructor for Hit must accept a Map<String, Object>
>> rather than a TableRow.
>>
>> I imagine that TableRow is only intended to be used as a top-level
>> object. Each row you get from a BQ result is a TableRow, but objects nested
>> inside it are not logically table rows; they're BQ structs that are modeled
>> as maps in JSON and Map<String, Object> in Java.
>>
>> Are you manually constructing TableRow objects with nested TableRows? I
>> would expect that a result from BigQueryIO.read() would give a TableRow
>> with some other map type for nested structs, so I'm surprised that this
>> cast works in some contexts.
>>
>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kz...@gmail.com>
>> wrote:
>>
>>>    I changed code a little bit not to use lambdas.
>>>
>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>         List<Hit> hits = new ArrayList<>();
>>>
>>>         for (TableRow tableRow : h) { <-- breaks here
>>>             hits.add(new Hit(tableRow));
>>>         }
>>>         ...
>>>     }
>>>
>>> Stack trace
>>>
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>> cast to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>> to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>> at
>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at
>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>> at
>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>> at
>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>> at
>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>> at
>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>> at
>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>> at
>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>> at
>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>> at
>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>> at
>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at
>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>> at
>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>
>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> Does the stack trace tell you where specifically in the code the cast
>>>> is happening? I'm guessing there may be assumptions inside your
>>>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>>>> objects.
>>>>
>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <kz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Interesting. All my code does is following:
>>>>>
>>>>> public static void main(String[] args) {
>>>>>     PCollection<TableRow> bqResult =
>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>> string params);
>>>>>     // store results
>>>>> }
>>>>>
>>>>> and
>>>>>
>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>> bqResult,
>>>>>
>>>>>       ...) {
>>>>>         return bqResult
>>>>>                 // In this step I convert TableRow into my custom
>>>>> class object
>>>>>                 .apply("Create metrics based on sessions",
>>>>>                         ParDo.of(new CreateSessionMetrics(boolean and
>>>>> string params)))
>>>>>                // few more transformations
>>>>>
>>>>> }
>>>>>
>>>>> This is basically similar to examples you can find here
>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>
>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>>
>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>> kzhdanovich@gmail.com> wrote:
>>>>>>
>>>>>>> So from what I understand, it works like this by design and it's not
>>>>>>> possible to test my code with the current coder implementation. Is that
>>>>>>> correct?
>>>>>>>
>>>>>>
>>>>>> I would argue that this test failure is indicating an area of
>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>> your current production pipeline relies on fusion which is not guaranteed
>>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>
>>>>>> Is it possible to update your code such that it does not need to make
>>>>>> assumptions about the concrete Map type returned by TableRow objects?
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Kirill
>>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>
> --
> Best Regards,
> Kirill
>

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
Thanks for explaining. Is it documented somewhere that TableRow contains
Map<String, Object>?
I don't construct it, I fetch from Google Analytics export to BigQuery
table.

On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jk...@mozilla.com> wrote:

> I would expect the following line to fail:
>
>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>
> The top-level bigQueryRow will be a TableRow, but
> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
> class that implements `Map`. So that line needs to become:
>
>         List<Map<String, Object> h = ((List<Map<String, Object>)
> bigQueryRow.get("hits"));
>
> And then your constructor for Hit must accept a Map<String, Object> rather
> than a TableRow.
>
> I imagine that TableRow is only intended to be used as a top-level object.
> Each row you get from a BQ result is a TableRow, but objects nested inside
> it are not logically table rows; they're BQ structs that are modeled as
> maps in JSON and Map<String, Object> in Java.
>
> Are you manually constructing TableRow objects with nested TableRows? I
> would expect that a result from BigQueryIO.read() would give a TableRow
> with some other map type for nested structs, so I'm surprised that this
> cast works in some contexts.
>
> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>>    I changed code a little bit not to use lambdas.
>>
>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>         List<Hit> hits = new ArrayList<>();
>>
>>         for (TableRow tableRow : h) { <-- breaks here
>>             hits.add(new Hit(tableRow));
>>         }
>>         ...
>>     }
>>
>> Stack trace
>>
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>> cast to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>> to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>> at
>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at
>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> at
>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>> at
>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>> at
>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>> at
>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>> at
>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>> at
>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>> at
>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>> at
>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>> at
>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>> at
>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>> at
>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>> at
>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>> at
>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>> at
>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>> at
>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at
>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>> at
>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>
>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Does the stack trace tell you where specifically in the code the cast is
>>> happening? I'm guessing there may be assumptions inside your
>>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>>> objects.
>>>
>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <kz...@gmail.com>
>>> wrote:
>>>
>>>> Interesting. All my code does is following:
>>>>
>>>> public static void main(String[] args) {
>>>>     PCollection<TableRow> bqResult =
>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and string
>>>> params);
>>>>     // store results
>>>> }
>>>>
>>>> and
>>>>
>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>> bqResult,
>>>>
>>>>     ...) {
>>>>         return bqResult
>>>>                 // In this step I convert TableRow into my custom class
>>>> object
>>>>                 .apply("Create metrics based on sessions",
>>>>                         ParDo.of(new CreateSessionMetrics(boolean and
>>>> string params)))
>>>>                // few more transformations
>>>>
>>>> }
>>>>
>>>> This is basically similar to examples you can find here
>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>
>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:
>>>>
>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>> kzhdanovich@gmail.com> wrote:
>>>>>
>>>>>> So from what I understand, it works like this by design and it's not
>>>>>> possible to test my code with the current coder implementation. Is that
>>>>>> correct?
>>>>>>
>>>>>
>>>>> I would argue that this test failure is indicating an area of
>>>>> potential failure in your code that should be addressed. It may be that
>>>>> your current production pipeline relies on fusion which is not guaranteed
>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>
>>>>> Is it possible to update your code such that it does not need to make
>>>>> assumptions about the concrete Map type returned by TableRow objects?
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
I would expect the following line to fail:

        List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));

The top-level bigQueryRow will be a TableRow, but `bigQueryRow.get("hits")`
is only guaranteed to be an instance of some class that implements `Map`.
So that line needs to become:

        List<Map<String, Object> h = ((List<Map<String, Object>)
bigQueryRow.get("hits"));

And then your constructor for Hit must accept a Map<String, Object> rather
than a TableRow.

I imagine that TableRow is only intended to be used as a top-level object.
Each row you get from a BQ result is a TableRow, but objects nested inside
it are not logically table rows; they're BQ structs that are modeled as
maps in JSON and Map<String, Object> in Java.

Are you manually constructing TableRow objects with nested TableRows? I
would expect that a result from BigQueryIO.read() would give a TableRow
with some other map type for nested structs, so I'm surprised that this
cast works in some contexts.

On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kz...@gmail.com>
wrote:

>    I changed code a little bit not to use lambdas.
>
>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>         List<Hit> hits = new ArrayList<>();
>
>         for (TableRow tableRow : h) { <-- breaks here
>             hits.add(new Hit(tableRow));
>         }
>         ...
>     }
>
> Stack trace
>
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
> at
> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> at
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
> at
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
> cannot be cast to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.TableRow is in unnamed module of
> loader 'app')
> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
> at
> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>
> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Does the stack trace tell you where specifically in the code the cast is
>> happening? I'm guessing there may be assumptions inside your
>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>> objects.
>>
>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <kz...@gmail.com>
>> wrote:
>>
>>> Interesting. All my code does is following:
>>>
>>> public static void main(String[] args) {
>>>     PCollection<TableRow> bqResult =
>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and string
>>> params);
>>>     // store results
>>> }
>>>
>>> and
>>>
>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>> bqResult,
>>>
>>>     ...) {
>>>         return bqResult
>>>                 // In this step I convert TableRow into my custom class
>>> object
>>>                 .apply("Create metrics based on sessions",
>>>                         ParDo.of(new CreateSessionMetrics(boolean and
>>> string params)))
>>>                // few more transformations
>>>
>>> }
>>>
>>> This is basically similar to examples you can find here
>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>
>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <kz...@gmail.com>
>>>> wrote:
>>>>
>>>>> So from what I understand, it works like this by design and it's not
>>>>> possible to test my code with the current coder implementation. Is that
>>>>> correct?
>>>>>
>>>>
>>>> I would argue that this test failure is indicating an area of potential
>>>> failure in your code that should be addressed. It may be that your current
>>>> production pipeline relies on fusion which is not guaranteed by the Beam
>>>> model, and so the pipeline could fail if the runner makes an internal
>>>> change that affect fusion (in practice this is unlikely).
>>>>
>>>> Is it possible to update your code such that it does not need to make
>>>> assumptions about the concrete Map type returned by TableRow objects?
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>
> --
> Best Regards,
> Kirill
>

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
   I changed code a little bit not to use lambdas.

   Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
        List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
        List<Hit> hits = new ArrayList<>();

        for (TableRow tableRow : h) { <-- breaks here
            hits.add(new Hit(tableRow));
        }
        ...
    }

Stack trace

java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
to class com.google.api.services.bigquery.model.TableRow
(java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
com.google.api.services.bigquery.model.TableRow is in unnamed module of
loader 'app')
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
to class com.google.api.services.bigquery.model.TableRow
(java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
com.google.api.services.bigquery.model.TableRow is in unnamed module of
loader 'app')
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
at
com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
at
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at
org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
cannot be cast to class com.google.api.services.bigquery.model.TableRow
(java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
com.google.api.services.bigquery.model.TableRow is in unnamed module of
loader 'app')
at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
at
com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)

On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jk...@mozilla.com> wrote:

> Does the stack trace tell you where specifically in the code the cast is
> happening? I'm guessing there may be assumptions inside your
> CreateSessionMetrics class if that's where you're manipulating the TableRow
> objects.
>
> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> Interesting. All my code does is following:
>>
>> public static void main(String[] args) {
>>     PCollection<TableRow> bqResult =
>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>     PCollection<SomeClass> result = runJob(bqResult, boolean and string
>> params);
>>     // store results
>> }
>>
>> and
>>
>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>> bqResult,
>>
>>   ...) {
>>         return bqResult
>>                 // In this step I convert TableRow into my custom class
>> object
>>                 .apply("Create metrics based on sessions",
>>                         ParDo.of(new CreateSessionMetrics(boolean and
>> string params)))
>>                // few more transformations
>>
>> }
>>
>> This is basically similar to examples you can find here
>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>
>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <kz...@gmail.com>
>>> wrote:
>>>
>>>> So from what I understand, it works like this by design and it's not
>>>> possible to test my code with the current coder implementation. Is that
>>>> correct?
>>>>
>>>
>>> I would argue that this test failure is indicating an area of potential
>>> failure in your code that should be addressed. It may be that your current
>>> production pipeline relies on fusion which is not guaranteed by the Beam
>>> model, and so the pipeline could fail if the runner makes an internal
>>> change that affect fusion (in practice this is unlikely).
>>>
>>> Is it possible to update your code such that it does not need to make
>>> assumptions about the concrete Map type returned by TableRow objects?
>>>
>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
Does the stack trace tell you where specifically in the code the cast is
happening? I'm guessing there may be assumptions inside your
CreateSessionMetrics class if that's where you're manipulating the TableRow
objects.

On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> Interesting. All my code does is following:
>
> public static void main(String[] args) {
>     PCollection<TableRow> bqResult =
> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>     PCollection<SomeClass> result = runJob(bqResult, boolean and string
> params);
>     // store results
> }
>
> and
>
> private static PCollection<SomeClass> runJob(PCollection<TableRow>
> bqResult,
>
>   ...) {
>         return bqResult
>                 // In this step I convert TableRow into my custom class
> object
>                 .apply("Create metrics based on sessions",
>                         ParDo.of(new CreateSessionMetrics(boolean and
> string params)))
>                // few more transformations
>
> }
>
> This is basically similar to examples you can find here
> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>
> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:
>
>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <kz...@gmail.com>
>> wrote:
>>
>>> So from what I understand, it works like this by design and it's not
>>> possible to test my code with the current coder implementation. Is that
>>> correct?
>>>
>>
>> I would argue that this test failure is indicating an area of potential
>> failure in your code that should be addressed. It may be that your current
>> production pipeline relies on fusion which is not guaranteed by the Beam
>> model, and so the pipeline could fail if the runner makes an internal
>> change that affect fusion (in practice this is unlikely).
>>
>> Is it possible to update your code such that it does not need to make
>> assumptions about the concrete Map type returned by TableRow objects?
>>
>
>
> --
> Best Regards,
> Kirill
>

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
Interesting. All my code does is following:

public static void main(String[] args) {
    PCollection<TableRow> bqResult =
p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
    PCollection<SomeClass> result = runJob(bqResult, boolean and string
params);
    // store results
}

and

private static PCollection<SomeClass> runJob(PCollection<TableRow> bqResult,

...) {
        return bqResult
                // In this step I convert TableRow into my custom class
object
                .apply("Create metrics based on sessions",
                        ParDo.of(new CreateSessionMetrics(boolean and
string params)))
               // few more transformations

}

This is basically similar to examples you can find here
https://beam.apache.org/documentation/io/built-in/google-bigquery/

On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jk...@mozilla.com> wrote:

> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> So from what I understand, it works like this by design and it's not
>> possible to test my code with the current coder implementation. Is that
>> correct?
>>
>
> I would argue that this test failure is indicating an area of potential
> failure in your code that should be addressed. It may be that your current
> production pipeline relies on fusion which is not guaranteed by the Beam
> model, and so the pipeline could fail if the runner makes an internal
> change that affect fusion (in practice this is unlikely).
>
> Is it possible to update your code such that it does not need to make
> assumptions about the concrete Map type returned by TableRow objects?
>


-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> So from what I understand, it works like this by design and it's not
> possible to test my code with the current coder implementation. Is that
> correct?
>

I would argue that this test failure is indicating an area of potential
failure in your code that should be addressed. It may be that your current
production pipeline relies on fusion which is not guaranteed by the Beam
model, and so the pipeline could fail if the runner makes an internal
change that affect fusion (in practice this is unlikely).

Is it possible to update your code such that it does not need to make
assumptions about the concrete Map type returned by TableRow objects?

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
So from what I understand, it works like this by design and it's not
possible to test my code with the current coder implementation. Is that
correct?

On Wed, 8 Jul 2020 at 21:41, Jeff Klukas <jk...@mozilla.com> wrote:

> On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> So it's correct implementation of TableRow that encode(decode(a)) != a?
>>
>
> A TableRow can contain fields of any map implementation. It makes sense to
> me that once a TableRow object is serialized and deserialized, that the
> coder must make a choice about a concrete Map implementation to use.
>
> So, no I would not expect that a decoded TableRow would contain exactly
> the same objects as before encoding. But I _would_ expect that
> encode(decode(a)).equals(a) in the sense that Map.equals() can determine
> two maps of different types to be equal as long as both maps contain
> entries that are equal to one another.
>
>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> So it's correct implementation of TableRow that encode(decode(a)) != a?
>

A TableRow can contain fields of any map implementation. It makes sense to
me that once a TableRow object is serialized and deserialized, that the
coder must make a choice about a concrete Map implementation to use.

So, no I would not expect that a decoded TableRow would contain exactly the
same objects as before encoding. But I _would_ expect that
encode(decode(a)).equals(a) in the sense that Map.equals() can determine
two maps of different types to be equal as long as both maps contain
entries that are equal to one another.

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
So it's correct implementation of TableRow that encode(decode(a)) != a?

On Wed, 8 Jul 2020 at 19:03, Jeff Klukas <jk...@mozilla.com> wrote:

> The test runner intentionally does some ugly things in order to expose
> problems which might otherwise be missed. In particular, I believe the test
> runner enforces coding between each transform and scrambles order of
> elements whereas production pipelines will often have many transforms fused
> together without serializing data.
>
> > I've tried TableRowJsonCoder, but seems like it converts all object
> inside TableRow to LinkedHashMaps
>
> This is likely intended. I would expect only the top-level container to be
> a TableRow and that nested maps would be some other map type.
>
> On Wed, Jul 8, 2020 at 10:52 AM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> Hi Jeff,
>> It's a simple pipeline that takes PCollection of TableRow which is
>> selected from Google Analytics export to BigQuery. So each TableRow follows
>> this scheme https://support.google.com/analytics/answer/3437719?hl=en
>> I have part of the code doing casting to TableRow like this:
>>
>> Boolean isMobile = (Boolean) (((TableRow) row.get("device")).get("isMobile"));
>>
>> or
>>
>> List<Hit> hits = ((List<TableRow>) row.get("hits")).stream().map(Hit::new).collect(Collectors.toList());
>>
>> I don't have issues running this pipeline in production. I have this
>> issue, only when I tried to write end to end test.
>> Do you know if there are existing coders for TableRow that I can
>> use? I've tried TableRowJsonCoder, but seems like it converts all object
>> inside TableRow to LinkedHashMaps
>>
>> On Wed, 8 Jul 2020 at 17:30, Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Kirill - Can you tell us more about what Job.runJob is doing? I would
>>> not expect the Beam SDK itself to do any casting to TableRow, so is there a
>>> line in your code where you're explicitly casting to TableRow? There may be
>>> a point where you need to explicitly set the coder on a PCollection to
>>> deserialize back to TableRow objects.
>>>
>>> On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich <kz...@gmail.com>
>>> wrote:
>>>
>>>> Here is a code example:
>>>>
>>>> List<TableRow> ss = Arrays.asList(session1, session2);
>>>> PCollection<TableRow> sessions = p.apply(Create.of(ss));
>>>> PCollection<MetricsWithDimension> res = Job.runJob(sessions, "20200614", false, new ProductCatalog());
>>>> p.run();
>>>>
>>>>
>>>> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich <kz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I want to test pipeline and the input for it is PCollection of
>>>>> TableRows. I've created a test, and when I run it I get an error:
>>>>>
>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>>>
>>>>> Is it a known issue? Thank you in advance
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Kirill
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
The test runner intentionally does some ugly things in order to expose
problems which might otherwise be missed. In particular, I believe the test
runner enforces coding between each transform and scrambles order of
elements whereas production pipelines will often have many transforms fused
together without serializing data.

> I've tried TableRowJsonCoder, but seems like it converts all object
inside TableRow to LinkedHashMaps

This is likely intended. I would expect only the top-level container to be
a TableRow and that nested maps would be some other map type.

On Wed, Jul 8, 2020 at 10:52 AM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> Hi Jeff,
> It's a simple pipeline that takes PCollection of TableRow which is
> selected from Google Analytics export to BigQuery. So each TableRow follows
> this scheme https://support.google.com/analytics/answer/3437719?hl=en
> I have part of the code doing casting to TableRow like this:
>
> Boolean isMobile = (Boolean) (((TableRow) row.get("device")).get("isMobile"));
>
> or
>
> List<Hit> hits = ((List<TableRow>) row.get("hits")).stream().map(Hit::new).collect(Collectors.toList());
>
> I don't have issues running this pipeline in production. I have this
> issue, only when I tried to write end to end test.
> Do you know if there are existing coders for TableRow that I can use? I've
> tried TableRowJsonCoder, but seems like it converts all object inside
> TableRow to LinkedHashMaps
>
> On Wed, 8 Jul 2020 at 17:30, Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Kirill - Can you tell us more about what Job.runJob is doing? I would not
>> expect the Beam SDK itself to do any casting to TableRow, so is there a
>> line in your code where you're explicitly casting to TableRow? There may be
>> a point where you need to explicitly set the coder on a PCollection to
>> deserialize back to TableRow objects.
>>
>> On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich <kz...@gmail.com>
>> wrote:
>>
>>> Here is a code example:
>>>
>>> List<TableRow> ss = Arrays.asList(session1, session2);
>>> PCollection<TableRow> sessions = p.apply(Create.of(ss));
>>> PCollection<MetricsWithDimension> res = Job.runJob(sessions, "20200614", false, new ProductCatalog());
>>> p.run();
>>>
>>>
>>> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich <kz...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I want to test pipeline and the input for it is PCollection of
>>>> TableRows. I've created a test, and when I run it I get an error:
>>>>
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>>
>>>> Is it a known issue? Thank you in advance
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>
> --
> Best Regards,
> Kirill
>

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
Hi Jeff,
It's a simple pipeline that takes PCollection of TableRow which is selected
from Google Analytics export to BigQuery. So each TableRow follows this
scheme https://support.google.com/analytics/answer/3437719?hl=en
I have part of the code doing casting to TableRow like this:

Boolean isMobile = (Boolean) (((TableRow) row.get("device")).get("isMobile"));

or

List<Hit> hits = ((List<TableRow>)
row.get("hits")).stream().map(Hit::new).collect(Collectors.toList());

I don't have issues running this pipeline in production. I have this issue,
only when I tried to write end to end test.
Do you know if there are existing coders for TableRow that I can use? I've
tried TableRowJsonCoder, but seems like it converts all object inside
TableRow to LinkedHashMaps

On Wed, 8 Jul 2020 at 17:30, Jeff Klukas <jk...@mozilla.com> wrote:

> Kirill - Can you tell us more about what Job.runJob is doing? I would not
> expect the Beam SDK itself to do any casting to TableRow, so is there a
> line in your code where you're explicitly casting to TableRow? There may be
> a point where you need to explicitly set the coder on a PCollection to
> deserialize back to TableRow objects.
>
> On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> Here is a code example:
>>
>> List<TableRow> ss = Arrays.asList(session1, session2);
>> PCollection<TableRow> sessions = p.apply(Create.of(ss));
>> PCollection<MetricsWithDimension> res = Job.runJob(sessions, "20200614", false, new ProductCatalog());
>> p.run();
>>
>>
>> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich <kz...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I want to test pipeline and the input for it is PCollection of
>>> TableRows. I've created a test, and when I run it I get an error:
>>>
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>
>>> Is it a known issue? Thank you in advance
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill

Re: TableRow class is not the same after serialization

Posted by Jeff Klukas <jk...@mozilla.com>.
Kirill - Can you tell us more about what Job.runJob is doing? I would not
expect the Beam SDK itself to do any casting to TableRow, so is there a
line in your code where you're explicitly casting to TableRow? There may be
a point where you need to explicitly set the coder on a PCollection to
deserialize back to TableRow objects.

On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich <kz...@gmail.com>
wrote:

> Here is a code example:
>
> List<TableRow> ss = Arrays.asList(session1, session2);
> PCollection<TableRow> sessions = p.apply(Create.of(ss));
> PCollection<MetricsWithDimension> res = Job.runJob(sessions, "20200614", false, new ProductCatalog());
> p.run();
>
>
> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich <kz...@gmail.com>
> wrote:
>
>> Hi,
>> I want to test pipeline and the input for it is PCollection of TableRows.
>> I've created a test, and when I run it I get an error:
>>
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>> cast to class com.google.api.services.bigquery.model.TableRow
>>
>> Is it a known issue? Thank you in advance
>>
>> --
>> Best Regards,
>> Kirill
>>
>
>
> --
> Best Regards,
> Kirill
>

Re: TableRow class is not the same after serialization

Posted by Kirill Zhdanovich <kz...@gmail.com>.
Here is a code example:

List<TableRow> ss = Arrays.asList(session1, session2);
PCollection<TableRow> sessions = p.apply(Create.of(ss));
PCollection<MetricsWithDimension> res = Job.runJob(sessions,
"20200614", false, new ProductCatalog());
p.run();


On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich <kz...@gmail.com>
wrote:

> Hi,
> I want to test pipeline and the input for it is PCollection of TableRows.
> I've created a test, and when I run it I get an error:
>
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
>
> Is it a known issue? Thank you in advance
>
> --
> Best Regards,
> Kirill
>


-- 
Best Regards,
Kirill