You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ananthi <ku...@gmail.com> on 2022/02/01 13:49:15 UTC

[Question] Writing test for Zeta SQL Transform JOIN

Hi Team,

I am trying to write test cases for Zeta SQL transform. I just tried junit
for a very simple pcollection with int64 values. I am getting the below
error, Am I missing anything here? I am using Junit4 and beam version 2.35.
Please let me know if any other details are needed.

-----------

java.lang.ClassCastException: java.lang.Integer cannot be cast to
java.lang.Long

at
org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
at
org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
at
org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
at
org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
at
com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)


----------
Code -

private static final Schema RESULT_ROW_TYPE =
        Schema.builder()
                .addNullableField("order_id", Schema.FieldType.INT64)
                .addNullableField("site_id", Schema.FieldType.INT64)
                .addNullableField("price", Schema.FieldType.INT64)
                .addNullableField("order_id0", Schema.FieldType.INT64)
                .addNullableField("site_id0", Schema.FieldType.INT64)
                .addNullableField("price0", Schema.FieldType.INT64)
                .build();

private static final Schema SOURCE_ROW_TYPE =
        Schema.builder()
                .addNullableField("order_id", Schema.FieldType.INT64)
                .addNullableField("site_id", Schema.FieldType.INT64)
                .addNullableField("price", Schema.FieldType.INT64)
                .build();
@Test
public void testInnerJoin() throws Exception {
    OrderHistoryReorderOptions options =
PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
    options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");

    TestPipeline pipeline = TestPipeline.fromOptions(options);

    Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
    Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
    Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
    Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
    Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
    Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
    Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3,
3,1,2,3).build();
    final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
    final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
    final List<Row> outputRowsToTransform = Arrays.asList(row7);
    PCollection<Row> inputPcoll1 =
            pipeline.apply("col1",Create.of(inputRowsToTransform))
                    .setRowSchema(SOURCE_ROW_TYPE);
    PCollection<Row> inputPcoll2 =
            pipeline.apply("col2",Create.of(inputRowsToTransform1))
                    .setRowSchema(SOURCE_ROW_TYPE);
    String sql =
            "SELECT *  "
                    + "FROM ORDER_DETAILS1 o1"
                    + " JOIN ORDER_DETAILS2 o2"
                    + " on "
                    + " o1.order_id=o2.site_id AND o2.price=o1.site_id";

    PAssert.that(tuple(
            "ORDER_DETAILS1",
            inputPcoll1,
            "ORDER_DETAILS2",
            inputPcoll2)
            .apply("join", SqlTransform.query(sql))
            .setRowSchema(RESULT_ROW_TYPE))
            .containsInAnyOrder(outputRowsToTransform);
    pipeline.run();



Thanks in advance,

Regards,

Ana

Re: [Question] Writing test for Zeta SQL Transform JOIN

Posted by Kyle Weaver <kc...@google.com>.
The ZetaSQL DATE type does not match with Schema.FieldType.DATETIME. The
corresponding Field type to ZetaSQL date
is FieldType.logicalType(SqlTypes.DATE).

It looks like this test exercises casting string to date, although it is a
bit obfuscated.
https://github.com/apache/beam/blob/0701655bede7e0610291ebe34026abbd839a3a51/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java#L2771-L2798

On Tue, Feb 1, 2022 at 7:58 PM Ananthi <ku...@gmail.com> wrote:

> Thank you. I used INT64 as INT32 was not supported in ZetaSQL. Using INT64
> and passing the samples as long, I am able to test the flow.
>
> Now I am facing an issue with the DATETIME field. I am trying to cast a
> field in a row, from STRING to DATE, using ZetaSQL. I am getting the
> below error (Attached the code below the error. Do I need to use any other
> field type other than DATETIME?
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: java.time.LocalDate cannot be cast to
> org.joda.time.Instant
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:399)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
> at com.test.PipelineTest.testInnerJoin(PipelineTest.java:71)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be
> cast to org.joda.time.Instant
> at org.apache.beam.sdk.coders.InstantCoder.encode(InstantCoder.java:34)
> at
> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:337)
> at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
> Source)
> at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
> Source)
> at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:330)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:325)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:300)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:283)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.outputRow(BeamZetaSqlCalcRel.java:323)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.finishBundle(BeamZetaSqlCalcRel.java:276)
>
> -----------------------------------------------------------------
> --------------------------
>
> @Rule
> public final TestPipeline pipeline = getPipeLine();
>
> @Test
> public void testZetaSQLCastStringToDate() throws Exception {
>     final Schema SOURCE_ROW_TYPE =
>             Schema.builder()
>                     .addNullableField("testdate", Schema.FieldType.STRING)
>                     .build();
>     final Schema DEST_ROW_TYPE =
>             Schema.builder()
>                     .addNullableField("testdate", Schema.FieldType.DATETIME)
>                     .build();
>     final TestBoundedTable DATE_COLL=
>             TestBoundedTable.of(SOURCE_ROW_TYPE)
>                     .addRows("2017-01-01");
>     PCollection<Row> rowPCollection = tuple(
>             "DATECOLL",
>             DATE_COLL.buildIOReader(pipeline.begin()).setRowSchema(SOURCE_ROW_TYPE))
>             .apply("join", SqlTransform.query("SELECT safe_cast(testdate as date) "
>                     + "FROM DATECOLL o1"))
>             .setRowSchema(DEST_ROW_TYPE);
>     pipeline.run();
> }
>
> public TestPipeline getPipeLine() {
>     TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>     return TestPipeline.fromOptions(options);
> }
>
>
>
>
> On Tue, Feb 1, 2022 at 11:28 AM Brian Hulette <bh...@google.com> wrote:
>
>> If you use FieldType.INT32 you could use Java's Integer type rather than
>> Long.
>>
>> Also, note that you can infer a schema from common Java class types
>> (POJOs, AutoValues, etc) [1] instead of directly building Rows. That will
>> automatically map Integer fields to INT32, Long to INT64, etc, so you don't
>> need to worry about it.
>>
>> Brian
>>
>> [1]
>> https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
>>
>> On Tue, Feb 1, 2022 at 8:22 AM Ananthi U <ku...@gmail.com> wrote:
>>
>>> Hi Kerry,
>>>
>>> Casting sample to long works. Is there a way to handle int fields as is
>>> in a row in pcollection?
>>>
>>>
>>> Regards,
>>> Ana
>>>
>>>
>>> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <ke...@google.com>
>>> wrote:
>>>
>>> 
>>> I believe the issue is that your sample data is cast to Integer, while
>>> the schema expects a Long. You can explicitly cast your samples to Long. If
>>> you search for that you should find some good examples.
>>>
>>> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <ku...@gmail.com> wrote:
>>>
>>>>
>>>> Hi Team,
>>>>
>>>> I am trying to write test cases for Zeta SQL transform. I just tried
>>>> junit for a very simple pcollection with int64 values. I am getting the
>>>> below error, Am I missing anything here? I am using Junit4 and beam version
>>>> 2.35. Please let me know if any other details are needed.
>>>>
>>>> -----------
>>>>
>>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to
>>>> java.lang.Long
>>>>
>>>> at
>>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>>>> at
>>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>>>> at
>>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>>>> at
>>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>>>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>>>> at
>>>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>>> at
>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>> at
>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>>> at
>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>>> at
>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>>> at
>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>>>> at
>>>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>>>> at
>>>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>>>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>>>>
>>>>
>>>> ----------
>>>> Code -
>>>>
>>>> private static final Schema RESULT_ROW_TYPE =
>>>>         Schema.builder()
>>>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>>>                 .addNullableField("price", Schema.FieldType.INT64)
>>>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>>>                 .build();
>>>>
>>>> private static final Schema SOURCE_ROW_TYPE =
>>>>         Schema.builder()
>>>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>>>                 .addNullableField("price", Schema.FieldType.INT64)
>>>>                 .build();
>>>> @Test
>>>> public void testInnerJoin() throws Exception {
>>>>     OrderHistoryReorderOptions options = PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>>>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>>>>
>>>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>>>>
>>>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 3,1,2,3).build();
>>>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>>>     PCollection<Row> inputPcoll1 =
>>>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>>>     PCollection<Row> inputPcoll2 =
>>>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>>>     String sql =
>>>>             "SELECT *  "
>>>>                     + "FROM ORDER_DETAILS1 o1"
>>>>                     + " JOIN ORDER_DETAILS2 o2"
>>>>                     + " on "
>>>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>>>>
>>>>     PAssert.that(tuple(
>>>>             "ORDER_DETAILS1",
>>>>             inputPcoll1,
>>>>             "ORDER_DETAILS2",
>>>>             inputPcoll2)
>>>>             .apply("join", SqlTransform.query(sql))
>>>>             .setRowSchema(RESULT_ROW_TYPE))
>>>>             .containsInAnyOrder(outputRowsToTransform);
>>>>     pipeline.run();
>>>>
>>>>
>>>>
>>>> Thanks in advance,
>>>>
>>>> Regards,
>>>>
>>>> Ana
>>>>
>>>>

Re: [Question] Writing test for Zeta SQL Transform JOIN

Posted by Ananthi <ku...@gmail.com>.
Thank you. I used INT64 as INT32 was not supported in ZetaSQL. Using INT64
and passing the samples as long, I am able to test the flow.

Now I am facing an issue with the DATETIME field. I am trying to cast a
field in a row, from STRING to DATE, using ZetaSQL. I am getting the
below error (Attached the code below the error. Do I need to use any other
field type other than DATETIME?

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: java.time.LocalDate cannot be cast to
org.joda.time.Instant

at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:399)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at com.test.PipelineTest.testInnerJoin(PipelineTest.java:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be cast
to org.joda.time.Instant
at org.apache.beam.sdk.coders.InstantCoder.encode(InstantCoder.java:34)
at
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:337)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
Source)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:330)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:325)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:300)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:283)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.outputRow(BeamZetaSqlCalcRel.java:323)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.finishBundle(BeamZetaSqlCalcRel.java:276)

-----------------------------------------------------------------
--------------------------

@Rule
public final TestPipeline pipeline = getPipeLine();

@Test
public void testZetaSQLCastStringToDate() throws Exception {
    final Schema SOURCE_ROW_TYPE =
            Schema.builder()
                    .addNullableField("testdate", Schema.FieldType.STRING)
                    .build();
    final Schema DEST_ROW_TYPE =
            Schema.builder()
                    .addNullableField("testdate", Schema.FieldType.DATETIME)
                    .build();
    final TestBoundedTable DATE_COLL=
            TestBoundedTable.of(SOURCE_ROW_TYPE)
                    .addRows("2017-01-01");
    PCollection<Row> rowPCollection = tuple(
            "DATECOLL",
            DATE_COLL.buildIOReader(pipeline.begin()).setRowSchema(SOURCE_ROW_TYPE))
            .apply("join", SqlTransform.query("SELECT
safe_cast(testdate as date) "
                    + "FROM DATECOLL o1"))
            .setRowSchema(DEST_ROW_TYPE);
    pipeline.run();
}

public TestPipeline getPipeLine() {
    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
    options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
    return TestPipeline.fromOptions(options);
}




On Tue, Feb 1, 2022 at 11:28 AM Brian Hulette <bh...@google.com> wrote:

> If you use FieldType.INT32 you could use Java's Integer type rather than
> Long.
>
> Also, note that you can infer a schema from common Java class types
> (POJOs, AutoValues, etc) [1] instead of directly building Rows. That will
> automatically map Integer fields to INT32, Long to INT64, etc, so you don't
> need to worry about it.
>
> Brian
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
>
> On Tue, Feb 1, 2022 at 8:22 AM Ananthi U <ku...@gmail.com> wrote:
>
>> Hi Kerry,
>>
>> Casting sample to long works. Is there a way to handle int fields as is
>> in a row in pcollection?
>>
>>
>> Regards,
>> Ana
>>
>>
>> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <ke...@google.com> wrote:
>>
>> 
>> I believe the issue is that your sample data is cast to Integer, while
>> the schema expects a Long. You can explicitly cast your samples to Long. If
>> you search for that you should find some good examples.
>>
>> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <ku...@gmail.com> wrote:
>>
>>>
>>> Hi Team,
>>>
>>> I am trying to write test cases for Zeta SQL transform. I just tried
>>> junit for a very simple pcollection with int64 values. I am getting the
>>> below error, Am I missing anything here? I am using Junit4 and beam version
>>> 2.35. Please let me know if any other details are needed.
>>>
>>> -----------
>>>
>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to
>>> java.lang.Long
>>>
>>> at
>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>>> at
>>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>> at
>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>>> at
>>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>>> at
>>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>>>
>>>
>>> ----------
>>> Code -
>>>
>>> private static final Schema RESULT_ROW_TYPE =
>>>         Schema.builder()
>>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>>                 .addNullableField("price", Schema.FieldType.INT64)
>>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>>                 .build();
>>>
>>> private static final Schema SOURCE_ROW_TYPE =
>>>         Schema.builder()
>>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>>                 .addNullableField("price", Schema.FieldType.INT64)
>>>                 .build();
>>> @Test
>>> public void testInnerJoin() throws Exception {
>>>     OrderHistoryReorderOptions options = PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>>>
>>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>>>
>>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 3,1,2,3).build();
>>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>>     PCollection<Row> inputPcoll1 =
>>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>>     PCollection<Row> inputPcoll2 =
>>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>>     String sql =
>>>             "SELECT *  "
>>>                     + "FROM ORDER_DETAILS1 o1"
>>>                     + " JOIN ORDER_DETAILS2 o2"
>>>                     + " on "
>>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>>>
>>>     PAssert.that(tuple(
>>>             "ORDER_DETAILS1",
>>>             inputPcoll1,
>>>             "ORDER_DETAILS2",
>>>             inputPcoll2)
>>>             .apply("join", SqlTransform.query(sql))
>>>             .setRowSchema(RESULT_ROW_TYPE))
>>>             .containsInAnyOrder(outputRowsToTransform);
>>>     pipeline.run();
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>> Regards,
>>>
>>> Ana
>>>
>>>

Re: [Question] Writing test for Zeta SQL Transform JOIN

Posted by Brian Hulette <bh...@google.com>.
If you use FieldType.INT32 you could use Java's Integer type rather than
Long.

Also, note that you can infer a schema from common Java class types (POJOs,
AutoValues, etc) [1] instead of directly building Rows. That will
automatically map Integer fields to INT32, Long to INT64, etc, so you don't
need to worry about it.

Brian

[1]
https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types

On Tue, Feb 1, 2022 at 8:22 AM Ananthi U <ku...@gmail.com> wrote:

> Hi Kerry,
>
> Casting sample to long works. Is there a way to handle int fields as is in
> a row in pcollection?
>
>
> Regards,
> Ana
>
>
> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <ke...@google.com> wrote:
>
> 
> I believe the issue is that your sample data is cast to Integer, while the
> schema expects a Long. You can explicitly cast your samples to Long. If you
> search for that you should find some good examples.
>
> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <ku...@gmail.com> wrote:
>
>>
>> Hi Team,
>>
>> I am trying to write test cases for Zeta SQL transform. I just tried
>> junit for a very simple pcollection with int64 values. I am getting the
>> below error, Am I missing anything here? I am using Junit4 and beam version
>> 2.35. Please let me know if any other details are needed.
>>
>> -----------
>>
>> java.lang.ClassCastException: java.lang.Integer cannot be cast to
>> java.lang.Long
>>
>> at
>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>> at
>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>> at
>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>> at
>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>> at
>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>> at
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> at
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>>
>>
>> ----------
>> Code -
>>
>> private static final Schema RESULT_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>                 .build();
>>
>> private static final Schema SOURCE_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .build();
>> @Test
>> public void testInnerJoin() throws Exception {
>>     OrderHistoryReorderOptions options = PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>>
>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>>
>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 3,1,2,3).build();
>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>     PCollection<Row> inputPcoll1 =
>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     PCollection<Row> inputPcoll2 =
>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     String sql =
>>             "SELECT *  "
>>                     + "FROM ORDER_DETAILS1 o1"
>>                     + " JOIN ORDER_DETAILS2 o2"
>>                     + " on "
>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>>
>>     PAssert.that(tuple(
>>             "ORDER_DETAILS1",
>>             inputPcoll1,
>>             "ORDER_DETAILS2",
>>             inputPcoll2)
>>             .apply("join", SqlTransform.query(sql))
>>             .setRowSchema(RESULT_ROW_TYPE))
>>             .containsInAnyOrder(outputRowsToTransform);
>>     pipeline.run();
>>
>>
>>
>> Thanks in advance,
>>
>> Regards,
>>
>> Ana
>>
>>

Re: [Question] Writing test for Zeta SQL Transform JOIN

Posted by Ananthi U <ku...@gmail.com>.
Hi Kerry,

Casting sample to long works. Is there a way to handle int fields as is in a row in pcollection?


Regards,
Ana

> 
> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <ke...@google.com> wrote:
> 
> 
> I believe the issue is that your sample data is cast to Integer, while the schema expects a Long. You can explicitly cast your samples to Long. If you search for that you should find some good examples.
> 
>> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <ku...@gmail.com> wrote:
>> 
>> Hi Team,
>> 
>> I am trying to write test cases for Zeta SQL transform. I just tried junit for a very simple pcollection with int64 values. I am getting the below error, Am I missing anything here? I am using Junit4 and beam version 2.35. Please let me know if any other details are needed.
>> 
>> -----------
>> 
>> java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
>> 
>> at org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>> at org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>> at org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>> at org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>> at com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>> at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>> 
>> 
>> ----------
>> Code -
>> private static final Schema RESULT_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>                 .build();
>> 
>> private static final Schema SOURCE_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .build();
>> @Test
>> public void testInnerJoin() throws Exception {
>>     OrderHistoryReorderOptions options = PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>> 
>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>> 
>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 3,1,2,3).build();
>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>     PCollection<Row> inputPcoll1 =
>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     PCollection<Row> inputPcoll2 =
>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     String sql =
>>             "SELECT *  "
>>                     + "FROM ORDER_DETAILS1 o1"
>>                     + " JOIN ORDER_DETAILS2 o2"
>>                     + " on "
>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>> 
>>     PAssert.that(tuple(
>>             "ORDER_DETAILS1",
>>             inputPcoll1,
>>             "ORDER_DETAILS2",
>>             inputPcoll2)
>>             .apply("join", SqlTransform.query(sql))
>>             .setRowSchema(RESULT_ROW_TYPE))
>>             .containsInAnyOrder(outputRowsToTransform);
>>     pipeline.run();
>> 
>> 
>> Thanks in advance,
>> Regards,
>> Ana

Re: [Question] Writing test for Zeta SQL Transform JOIN

Posted by Kyle Weaver <kc...@google.com>.
The simplest way is to suffix literals with L, for example 1L, 2L, 3L.

On Tue, Feb 1, 2022 at 6:45 AM Kerry Donny-Clark <ke...@google.com> wrote:

> I believe the issue is that your sample data is cast to Integer, while the
> schema expects a Long. You can explicitly cast your samples to Long. If you
> search for that you should find some good examples.
>
> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <ku...@gmail.com> wrote:
>
>>
>> Hi Team,
>>
>> I am trying to write test cases for Zeta SQL transform. I just tried
>> junit for a very simple pcollection with int64 values. I am getting the
>> below error, Am I missing anything here? I am using Junit4 and beam version
>> 2.35. Please let me know if any other details are needed.
>>
>> -----------
>>
>> java.lang.ClassCastException: java.lang.Integer cannot be cast to
>> java.lang.Long
>>
>> at
>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>> at
>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>> at
>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>> at
>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>> at
>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>> at
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> at
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>>
>>
>> ----------
>> Code -
>>
>> private static final Schema RESULT_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>                 .build();
>>
>> private static final Schema SOURCE_ROW_TYPE =
>>         Schema.builder()
>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>                 .addNullableField("price", Schema.FieldType.INT64)
>>                 .build();
>> @Test
>> public void testInnerJoin() throws Exception {
>>     OrderHistoryReorderOptions options = PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>>
>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>>
>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 3,1,2,3).build();
>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>     PCollection<Row> inputPcoll1 =
>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     PCollection<Row> inputPcoll2 =
>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>     String sql =
>>             "SELECT *  "
>>                     + "FROM ORDER_DETAILS1 o1"
>>                     + " JOIN ORDER_DETAILS2 o2"
>>                     + " on "
>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>>
>>     PAssert.that(tuple(
>>             "ORDER_DETAILS1",
>>             inputPcoll1,
>>             "ORDER_DETAILS2",
>>             inputPcoll2)
>>             .apply("join", SqlTransform.query(sql))
>>             .setRowSchema(RESULT_ROW_TYPE))
>>             .containsInAnyOrder(outputRowsToTransform);
>>     pipeline.run();
>>
>>
>>
>> Thanks in advance,
>>
>> Regards,
>>
>> Ana
>>
>>

Re: [Question] Writing test for Zeta SQL Transform JOIN

Posted by Kerry Donny-Clark <ke...@google.com>.
I believe the issue is that your sample data is cast to Integer, while the
schema expects a Long. You can explicitly cast your samples to Long. If you
search for that you should find some good examples.

On Tue, Feb 1, 2022 at 8:49 AM Ananthi <ku...@gmail.com> wrote:

>
> Hi Team,
>
> I am trying to write test cases for Zeta SQL transform. I just tried junit
> for a very simple pcollection with int64 values. I am getting the below
> error, Am I missing anything here? I am using Junit4 and beam version 2.35.
> Please let me know if any other details are needed.
>
> -----------
>
> java.lang.ClassCastException: java.lang.Integer cannot be cast to
> java.lang.Long
>
> at
> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
> at
> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
> at
> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
> at
> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
> at
> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>
>
> ----------
> Code -
>
> private static final Schema RESULT_ROW_TYPE =
>         Schema.builder()
>                 .addNullableField("order_id", Schema.FieldType.INT64)
>                 .addNullableField("site_id", Schema.FieldType.INT64)
>                 .addNullableField("price", Schema.FieldType.INT64)
>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>                 .addNullableField("price0", Schema.FieldType.INT64)
>                 .build();
>
> private static final Schema SOURCE_ROW_TYPE =
>         Schema.builder()
>                 .addNullableField("order_id", Schema.FieldType.INT64)
>                 .addNullableField("site_id", Schema.FieldType.INT64)
>                 .addNullableField("price", Schema.FieldType.INT64)
>                 .build();
> @Test
> public void testInnerJoin() throws Exception {
>     OrderHistoryReorderOptions options = PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>     options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>
>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>
>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 3,1,2,3).build();
>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>     PCollection<Row> inputPcoll1 =
>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>                     .setRowSchema(SOURCE_ROW_TYPE);
>     PCollection<Row> inputPcoll2 =
>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>                     .setRowSchema(SOURCE_ROW_TYPE);
>     String sql =
>             "SELECT *  "
>                     + "FROM ORDER_DETAILS1 o1"
>                     + " JOIN ORDER_DETAILS2 o2"
>                     + " on "
>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>
>     PAssert.that(tuple(
>             "ORDER_DETAILS1",
>             inputPcoll1,
>             "ORDER_DETAILS2",
>             inputPcoll2)
>             .apply("join", SqlTransform.query(sql))
>             .setRowSchema(RESULT_ROW_TYPE))
>             .containsInAnyOrder(outputRowsToTransform);
>     pipeline.run();
>
>
>
> Thanks in advance,
>
> Regards,
>
> Ana
>
>