You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ismaël Mejía <ie...@gmail.com> on 2020/05/01 13:04:38 UTC
Re: possible bug in AvroUtils
I dug deeper and found that this global static change was introduced
since the beginning of the Avro / Beam Schema support (Beam 2.15.0):
https://github.com/apache/beam/commit/2a40c576cfb
On Thu, Apr 30, 2020 at 8:52 PM Ismaël Mejía <ie...@gmail.com> wrote:
>
> Created https://issues.apache.org/jira/browse/BEAM-9863 to track this.
> Any taker?
>
> On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote:
> >
> > I'm not sure who added that, but it's been there for a while. Making global static changes like that in our module seems like poor form - I wonder if there's a better approach.
> >
> > On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bh...@google.com> wrote:
> >>
> >> It seems likely this is a side effect of some static initialization in AvroUtils: https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99
> >>
> >> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote:
> >>>
> >>> I've copied this failing test into my client, and it passes for me. I can't reproduce the failure.
> >>>
> >>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote:
> >>>>
> >>>> +dev +Brian Hulette +Reuven Lax
> >>>>
> >>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p....@gmail.com> wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>> I think the method AvroUtils.toBeamSchema has a not expected side effect.
> >>>>> I found out that, if you invoke it and then you run a pipeline of GenericRecords containing a timestamp (l tried with logical-type timestamp-millis), Beam converts such timestamp from long to org.joda.time.DateTime. Even if you don't apply any transformation to the pipeline.
> >>>>> Do you think it's a bug?
> >>>>>
> >>>>> Below you can find a simple test class I wrote in order to replicate the problem.
> >>>>> The first test passes while the second fails.
> >>>>>
> >>>>>
> >>>>> import org.apache.avro.Schema;
> >>>>> import org.apache.avro.SchemaBuilder;
> >>>>> import org.apache.avro.generic.GenericRecord;
> >>>>> import org.apache.avro.generic.GenericRecordBuilder;
> >>>>> import org.apache.beam.sdk.coders.AvroCoder;
> >>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
> >>>>> import org.apache.beam.sdk.testing.TestPipeline;
> >>>>> import org.apache.beam.sdk.transforms.Combine;
> >>>>> import org.apache.beam.sdk.transforms.Create;
> >>>>> import org.apache.beam.sdk.transforms.SerializableFunction;
> >>>>> import org.junit.Rule;
> >>>>>
> >>>>> import java.sql.Timestamp;
> >>>>>
> >>>>> import static org.junit.Assert.assertEquals;
> >>>>>
> >>>>> public class AvroUtilsSideEffect {
> >>>>>
> >>>>> @Rule
> >>>>> public final transient TestPipeline pipeline = TestPipeline.create();
> >>>>> @Rule
> >>>>> public final transient TestPipeline pipeline2 = TestPipeline.create();
> >>>>> public final Schema testSchema = SchemaBuilder
> >>>>> .record("record").namespace("test")
> >>>>> .fields()
> >>>>> .name("timestamp").type().longBuilder().prop("logicalType", "timestamp-millis").endLong().noDefault()
> >>>>> .endRecord();
> >>>>> public final GenericRecord record = new GenericRecordBuilder(testSchema)
> >>>>> .set("timestamp", new Timestamp(1563926400000L).getTime())
> >>>>> .build();
> >>>>>
> >>>>>
> >>>>> @org.junit.Test
> >>>>> public void test() {
> >>>>> pipeline.apply( Create.of(record).withCoder(AvroCoder.of(testSchema)))
> >>>>> .apply( Combine.globally(new TestFn()));
> >>>>>
> >>>>> pipeline.run().waitUntilFinish();
> >>>>> }
> >>>>> @org.junit.Test
> >>>>> public void test2() {
> >>>>>
> >>>>> AvroUtils.toBeamSchema(testSchema);
> >>>>>
> >>>>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
> >>>>> .apply(Combine.globally(new TestFn()));
> >>>>>
> >>>>> pipeline2.run().waitUntilFinish();
> >>>>> }
> >>>>>
> >>>>> public static class TestFn implements SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
> >>>>>
> >>>>> @Override
> >>>>> public GenericRecord apply(Iterable<GenericRecord> input) {
> >>>>> for (GenericRecord item : input) {
> >>>>> if(item != null){
> >>>>> assertEquals(Long.class, item.get("timestamp").getClass());
> >>>>> assertEquals(1563926400000L, item.get("timestamp"));
> >>>>> }
> >>>>> return item;
> >>>>> }
> >>>>> return null;
> >>>>> }
> >>>>> }
> >>>>> }
> >>>>>
> >>>>> Thanks,
> >>>>> Paolo
> >>>>>
> >>>>> --
> >>>>> Paolo Tomeo, PhD
> >>>>>
> >>>>> Big Data and Machine Learning Engineer
> >>>>>
> >>>>> linkedin.com/in/ptomeo
Re: possible bug in AvroUtils
Posted by Brian Hulette <bh...@google.com>.
Let's discuss details on the jira. I could maybe take it, but could use
advice on the right course of action.
On Fri, May 1, 2020 at 6:05 AM Ismaël Mejía <ie...@gmail.com> wrote:
> I dug deeper and found that this global static change was introduced
> since the beginning of the Avro / Beam Schema support (Beam 2.15.0):
> https://github.com/apache/beam/commit/2a40c576cfb
>
>
>
> On Thu, Apr 30, 2020 at 8:52 PM Ismaël Mejía <ie...@gmail.com> wrote:
> >
> > Created https://issues.apache.org/jira/browse/BEAM-9863 to track this.
> > Any taker?
> >
> > On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote:
> > >
> > > I'm not sure who added that, but it's been there for a while. Making
> global static changes like that in our module seems like poor form - I
> wonder if there's a better approach.
> > >
> > > On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bh...@google.com>
> wrote:
> > >>
> > >> It seems likely this is a side effect of some static initialization
> in AvroUtils:
> https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99
> > >>
> > >> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote:
> > >>>
> > >>> I've copied this failing test into my client, and it passes for me.
> I can't reproduce the failure.
> > >>>
> > >>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote:
> > >>>>
> > >>>> +dev +Brian Hulette +Reuven Lax
> > >>>>
> > >>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p....@gmail.com>
> wrote:
> > >>>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> I think the method AvroUtils.toBeamSchema has a not expected side
> effect.
> > >>>>> I found out that, if you invoke it and then you run a pipeline of
> GenericRecords containing a timestamp (l tried with logical-type
> timestamp-millis), Beam converts such timestamp from long to
> org.joda.time.DateTime. Even if you don't apply any transformation to the
> pipeline.
> > >>>>> Do you think it's a bug?
> > >>>>>
> > >>>>> Below you can find a simple test class I wrote in order to
> replicate the problem.
> > >>>>> The first test passes while the second fails.
> > >>>>>
> > >>>>>
> > >>>>> import org.apache.avro.Schema;
> > >>>>> import org.apache.avro.SchemaBuilder;
> > >>>>> import org.apache.avro.generic.GenericRecord;
> > >>>>> import org.apache.avro.generic.GenericRecordBuilder;
> > >>>>> import org.apache.beam.sdk.coders.AvroCoder;
> > >>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
> > >>>>> import org.apache.beam.sdk.testing.TestPipeline;
> > >>>>> import org.apache.beam.sdk.transforms.Combine;
> > >>>>> import org.apache.beam.sdk.transforms.Create;
> > >>>>> import org.apache.beam.sdk.transforms.SerializableFunction;
> > >>>>> import org.junit.Rule;
> > >>>>>
> > >>>>> import java.sql.Timestamp;
> > >>>>>
> > >>>>> import static org.junit.Assert.assertEquals;
> > >>>>>
> > >>>>> public class AvroUtilsSideEffect {
> > >>>>>
> > >>>>> @Rule
> > >>>>> public final transient TestPipeline pipeline =
> TestPipeline.create();
> > >>>>> @Rule
> > >>>>> public final transient TestPipeline pipeline2 =
> TestPipeline.create();
> > >>>>> public final Schema testSchema = SchemaBuilder
> > >>>>> .record("record").namespace("test")
> > >>>>> .fields()
> > >>>>>
> .name("timestamp").type().longBuilder().prop("logicalType",
> "timestamp-millis").endLong().noDefault()
> > >>>>> .endRecord();
> > >>>>> public final GenericRecord record = new
> GenericRecordBuilder(testSchema)
> > >>>>> .set("timestamp", new
> Timestamp(1563926400000L).getTime())
> > >>>>> .build();
> > >>>>>
> > >>>>>
> > >>>>> @org.junit.Test
> > >>>>> public void test() {
> > >>>>> pipeline.apply(
> Create.of(record).withCoder(AvroCoder.of(testSchema)))
> > >>>>> .apply( Combine.globally(new TestFn()));
> > >>>>>
> > >>>>> pipeline.run().waitUntilFinish();
> > >>>>> }
> > >>>>> @org.junit.Test
> > >>>>> public void test2() {
> > >>>>>
> > >>>>> AvroUtils.toBeamSchema(testSchema);
> > >>>>>
> > >>>>>
> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
> > >>>>> .apply(Combine.globally(new TestFn()));
> > >>>>>
> > >>>>> pipeline2.run().waitUntilFinish();
> > >>>>> }
> > >>>>>
> > >>>>> public static class TestFn implements
> SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
> > >>>>>
> > >>>>> @Override
> > >>>>> public GenericRecord apply(Iterable<GenericRecord> input) {
> > >>>>> for (GenericRecord item : input) {
> > >>>>> if(item != null){
> > >>>>> assertEquals(Long.class,
> item.get("timestamp").getClass());
> > >>>>> assertEquals(1563926400000L,
> item.get("timestamp"));
> > >>>>> }
> > >>>>> return item;
> > >>>>> }
> > >>>>> return null;
> > >>>>> }
> > >>>>> }
> > >>>>> }
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Paolo
> > >>>>>
> > >>>>> --
> > >>>>> Paolo Tomeo, PhD
> > >>>>>
> > >>>>> Big Data and Machine Learning Engineer
> > >>>>>
> > >>>>> linkedin.com/in/ptomeo
>