You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ismaël Mejía <ie...@gmail.com> on 2020/05/01 13:04:38 UTC

Re: possible bug in AvroUtils

I dug deeper and found that this global static change was introduced
since the beginning of the Avro / Beam Schema support (Beam 2.15.0):
https://github.com/apache/beam/commit/2a40c576cfb



On Thu, Apr 30, 2020 at 8:52 PM Ismaël Mejía <ie...@gmail.com> wrote:
>
> Created https://issues.apache.org/jira/browse/BEAM-9863 to track this.
> Any taker?
>
> On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote:
> >
> > I'm not sure who added that, but it's been there for a while. Making global static changes like that in our module seems like poor form - I wonder if there's a better approach.
> >
> > On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bh...@google.com> wrote:
> >>
> >> It seems likely this is a side effect of some static initialization in AvroUtils: https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99
> >>
> >> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote:
> >>>
> >>> I've copied this failing test into my client, and it passes for me. I can't reproduce the failure.
> >>>
> >>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote:
> >>>>
> >>>> +dev +Brian Hulette +Reuven Lax
> >>>>
> >>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p....@gmail.com> wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>> I think the method AvroUtils.toBeamSchema has a not expected side effect.
> >>>>> I found out that, if you invoke it and then you run a pipeline of GenericRecords containing a timestamp (l tried with logical-type timestamp-millis), Beam converts such timestamp from long to org.joda.time.DateTime. Even if you don't apply any transformation to the pipeline.
> >>>>> Do you think it's a bug?
> >>>>>
> >>>>> Below you can find a simple test class I wrote in order to replicate the problem.
> >>>>> The first test passes while the second fails.
> >>>>>
> >>>>>
> >>>>> import org.apache.avro.Schema;
> >>>>> import org.apache.avro.SchemaBuilder;
> >>>>> import org.apache.avro.generic.GenericRecord;
> >>>>> import org.apache.avro.generic.GenericRecordBuilder;
> >>>>> import org.apache.beam.sdk.coders.AvroCoder;
> >>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
> >>>>> import org.apache.beam.sdk.testing.TestPipeline;
> >>>>> import org.apache.beam.sdk.transforms.Combine;
> >>>>> import org.apache.beam.sdk.transforms.Create;
> >>>>> import org.apache.beam.sdk.transforms.SerializableFunction;
> >>>>> import org.junit.Rule;
> >>>>>
> >>>>> import java.sql.Timestamp;
> >>>>>
> >>>>> import static org.junit.Assert.assertEquals;
> >>>>>
> >>>>> public class AvroUtilsSideEffect {
> >>>>>
> >>>>>     @Rule
> >>>>>     public final transient TestPipeline pipeline = TestPipeline.create();
> >>>>>     @Rule
> >>>>>     public final transient TestPipeline pipeline2 = TestPipeline.create();
> >>>>>     public final Schema testSchema = SchemaBuilder
> >>>>>             .record("record").namespace("test")
> >>>>>             .fields()
> >>>>>             .name("timestamp").type().longBuilder().prop("logicalType", "timestamp-millis").endLong().noDefault()
> >>>>>             .endRecord();
> >>>>>     public final GenericRecord record = new GenericRecordBuilder(testSchema)
> >>>>>             .set("timestamp", new Timestamp(1563926400000L).getTime())
> >>>>>             .build();
> >>>>>
> >>>>>
> >>>>>     @org.junit.Test
> >>>>>     public void test() {
> >>>>>         pipeline.apply( Create.of(record).withCoder(AvroCoder.of(testSchema)))
> >>>>>                 .apply( Combine.globally(new TestFn()));
> >>>>>
> >>>>>         pipeline.run().waitUntilFinish();
> >>>>>     }
> >>>>>     @org.junit.Test
> >>>>>     public void test2() {
> >>>>>
> >>>>>         AvroUtils.toBeamSchema(testSchema);
> >>>>>
> >>>>>         pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
> >>>>>                 .apply(Combine.globally(new TestFn()));
> >>>>>
> >>>>>         pipeline2.run().waitUntilFinish();
> >>>>>     }
> >>>>>
> >>>>>     public static class TestFn implements SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
> >>>>>
> >>>>>         @Override
> >>>>>         public GenericRecord apply(Iterable<GenericRecord> input) {
> >>>>>             for (GenericRecord item : input) {
> >>>>>                 if(item != null){
> >>>>>                     assertEquals(Long.class, item.get("timestamp").getClass());
> >>>>>                     assertEquals(1563926400000L, item.get("timestamp"));
> >>>>>                 }
> >>>>>                 return item;
> >>>>>             }
> >>>>>             return null;
> >>>>>         }
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> Thanks,
> >>>>> Paolo
> >>>>>
> >>>>> --
> >>>>> Paolo Tomeo, PhD
> >>>>>
> >>>>> Big Data and Machine Learning Engineer
> >>>>>
> >>>>> linkedin.com/in/ptomeo

Re: possible bug in AvroUtils

Posted by Brian Hulette <bh...@google.com>.
Let's discuss details on the jira. I could maybe take it, but could use
advice on the right course of action.

On Fri, May 1, 2020 at 6:05 AM Ismaël Mejía <ie...@gmail.com> wrote:

> I dug deeper and found that this global static change was introduced
> since the beginning of the Avro / Beam Schema support (Beam 2.15.0):
> https://github.com/apache/beam/commit/2a40c576cfb
>
>
>
> On Thu, Apr 30, 2020 at 8:52 PM Ismaël Mejía <ie...@gmail.com> wrote:
> >
> > Created https://issues.apache.org/jira/browse/BEAM-9863 to track this.
> > Any taker?
> >
> > On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote:
> > >
> > > I'm not sure who added that, but it's been there for a while. Making
> global static changes like that in our module seems like poor form - I
> wonder if there's a better approach.
> > >
> > > On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bh...@google.com>
> wrote:
> > >>
> > >> It seems likely this is a side effect of some static initialization
> in AvroUtils:
> https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99
> > >>
> > >> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote:
> > >>>
> > >>> I've copied this failing test into my client, and it passes for me.
> I can't reproduce the failure.
> > >>>
> > >>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote:
> > >>>>
> > >>>> +dev +Brian Hulette +Reuven Lax
> > >>>>
> > >>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p....@gmail.com>
> wrote:
> > >>>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> I think the method AvroUtils.toBeamSchema has a not expected side
> effect.
> > >>>>> I found out that, if you invoke it and then you run a pipeline of
> GenericRecords containing a timestamp (l tried with logical-type
> timestamp-millis), Beam converts such timestamp from long to
> org.joda.time.DateTime. Even if you don't apply any transformation to the
> pipeline.
> > >>>>> Do you think it's a bug?
> > >>>>>
> > >>>>> Below you can find a simple test class I wrote in order to
> replicate the problem.
> > >>>>> The first test passes while the second fails.
> > >>>>>
> > >>>>>
> > >>>>> import org.apache.avro.Schema;
> > >>>>> import org.apache.avro.SchemaBuilder;
> > >>>>> import org.apache.avro.generic.GenericRecord;
> > >>>>> import org.apache.avro.generic.GenericRecordBuilder;
> > >>>>> import org.apache.beam.sdk.coders.AvroCoder;
> > >>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
> > >>>>> import org.apache.beam.sdk.testing.TestPipeline;
> > >>>>> import org.apache.beam.sdk.transforms.Combine;
> > >>>>> import org.apache.beam.sdk.transforms.Create;
> > >>>>> import org.apache.beam.sdk.transforms.SerializableFunction;
> > >>>>> import org.junit.Rule;
> > >>>>>
> > >>>>> import java.sql.Timestamp;
> > >>>>>
> > >>>>> import static org.junit.Assert.assertEquals;
> > >>>>>
> > >>>>> public class AvroUtilsSideEffect {
> > >>>>>
> > >>>>>     @Rule
> > >>>>>     public final transient TestPipeline pipeline =
> TestPipeline.create();
> > >>>>>     @Rule
> > >>>>>     public final transient TestPipeline pipeline2 =
> TestPipeline.create();
> > >>>>>     public final Schema testSchema = SchemaBuilder
> > >>>>>             .record("record").namespace("test")
> > >>>>>             .fields()
> > >>>>>
>  .name("timestamp").type().longBuilder().prop("logicalType",
> "timestamp-millis").endLong().noDefault()
> > >>>>>             .endRecord();
> > >>>>>     public final GenericRecord record = new
> GenericRecordBuilder(testSchema)
> > >>>>>             .set("timestamp", new
> Timestamp(1563926400000L).getTime())
> > >>>>>             .build();
> > >>>>>
> > >>>>>
> > >>>>>     @org.junit.Test
> > >>>>>     public void test() {
> > >>>>>         pipeline.apply(
> Create.of(record).withCoder(AvroCoder.of(testSchema)))
> > >>>>>                 .apply( Combine.globally(new TestFn()));
> > >>>>>
> > >>>>>         pipeline.run().waitUntilFinish();
> > >>>>>     }
> > >>>>>     @org.junit.Test
> > >>>>>     public void test2() {
> > >>>>>
> > >>>>>         AvroUtils.toBeamSchema(testSchema);
> > >>>>>
> > >>>>>
>  pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
> > >>>>>                 .apply(Combine.globally(new TestFn()));
> > >>>>>
> > >>>>>         pipeline2.run().waitUntilFinish();
> > >>>>>     }
> > >>>>>
> > >>>>>     public static class TestFn implements
> SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
> > >>>>>
> > >>>>>         @Override
> > >>>>>         public GenericRecord apply(Iterable<GenericRecord> input) {
> > >>>>>             for (GenericRecord item : input) {
> > >>>>>                 if(item != null){
> > >>>>>                     assertEquals(Long.class,
> item.get("timestamp").getClass());
> > >>>>>                     assertEquals(1563926400000L,
> item.get("timestamp"));
> > >>>>>                 }
> > >>>>>                 return item;
> > >>>>>             }
> > >>>>>             return null;
> > >>>>>         }
> > >>>>>     }
> > >>>>> }
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Paolo
> > >>>>>
> > >>>>> --
> > >>>>> Paolo Tomeo, PhD
> > >>>>>
> > >>>>> Big Data and Machine Learning Engineer
> > >>>>>
> > >>>>> linkedin.com/in/ptomeo
>