You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Josh <jo...@gmail.com> on 2020/08/06 14:16:34 UTC

Accumulator with Map field in CombineFn not serializing correctly

Hi all,

In my Beam job I have defined my own CombineFn with an accumulator. Running
locally is no problem, but when I run the job on Dataflow I hit an Avro
serialization exception:
java.lang.NoSuchMethodException: java.util.Map.<init>()
java.lang.Class.getConstructor0(Class.java:3082)
java.lang.Class.getDeclaredConstructor(Class.java:2178)
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
accumulator class. Is there anything special I need to do because one of
the fields in my accumulator class is a Map? I have pasted an outline of my
CombineFn below.

Thanks for any help with this!

Josh

private static class MyCombineFn extends CombineFn<Event,
MyCombineFn.Accum, Out> {

        private static class ExpiringLinkedHashMap<K, V> extends
LinkedHashMap<K, V> {
            @Override
            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                return this.size() > 10;
            }
        }

        @DefaultCoder(AvroCoder.class)
        private static class PartialEventUpdate implements Serializable {
            Long incrementCountBy = 0L;
            Map<String, Event> recentEvents = new ExpiringLinkedHashMap<>();
            Long lastSeenMillis = 0L;

            PartialEventUpdate() {}
        }

        @DefaultCoder(AvroCoder.class)
        private static class Accum implements Serializable {
            Map<UUID, PartialEventUpdate> eventIdToUpdate = new HashMap<>();

            Accum() {}
        }

        @Override
        public MyCombineFn.Accum createAccumulator() {
            return new MyCombineFn.Accum();
        }

        ...

}

Re: Accumulator with Map field in CombineFn not serializing correctly

Posted by Brian Hulette <bh...@google.com>.
Interesting, thanks for following up with the fix. Were you able to find a
way to reproduce this locally, or did it only occur on Dataflow?

Did you have to make a similar change for the HashMap in Accum, or just the
ExpiringLinkHashMap?

Brian

On Fri, Aug 7, 2020 at 9:58 AM Josh <jo...@gmail.com> wrote:

> I have resolved this issue now, in case anyone else runs into this problem
> in future, the resolution was simply to use the concrete type for the field
> in the accumulator, rather than Map:
>
> ExpiringLinkedHashMap<String, Event> recentEvents = new ExpiringLinkedHashMap<>()
>
>
> On Thu, Aug 6, 2020 at 3:16 PM Josh <jo...@gmail.com> wrote:
>
>> Hi all,
>>
>> In my Beam job I have defined my own CombineFn with an accumulator.
>> Running locally is no problem, but when I run the job on Dataflow I hit an
>> Avro serialization exception:
>> java.lang.NoSuchMethodException: java.util.Map.<init>()
>> java.lang.Class.getConstructor0(Class.java:3082)
>> java.lang.Class.getDeclaredConstructor(Class.java:2178)
>> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
>>
>> I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
>> accumulator class. Is there anything special I need to do because one of
>> the fields in my accumulator class is a Map? I have pasted an outline of my
>> CombineFn below.
>>
>> Thanks for any help with this!
>>
>> Josh
>>
>> private static class MyCombineFn extends CombineFn<Event,
>> MyCombineFn.Accum, Out> {
>>
>>         private static class ExpiringLinkedHashMap<K, V> extends
>> LinkedHashMap<K, V> {
>>             @Override
>>             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
>>                 return this.size() > 10;
>>             }
>>         }
>>
>>         @DefaultCoder(AvroCoder.class)
>>         private static class PartialEventUpdate implements Serializable {
>>             Long incrementCountBy = 0L;
>>             Map<String, Event> recentEvents = new
>> ExpiringLinkedHashMap<>();
>>             Long lastSeenMillis = 0L;
>>
>>             PartialEventUpdate() {}
>>         }
>>
>>         @DefaultCoder(AvroCoder.class)
>>         private static class Accum implements Serializable {
>>             Map<UUID, PartialEventUpdate> eventIdToUpdate = new
>> HashMap<>();
>>
>>             Accum() {}
>>         }
>>
>>         @Override
>>         public MyCombineFn.Accum createAccumulator() {
>>             return new MyCombineFn.Accum();
>>         }
>>
>>         ...
>>
>> }
>>
>

Re: Accumulator with Map field in CombineFn not serializing correctly

Posted by Josh <jo...@gmail.com>.
I have resolved this issue now, in case anyone else runs into this problem
in future, the resolution was simply to use the concrete type for the field
in the accumulator, rather than Map:

ExpiringLinkedHashMap<String, Event> recentEvents = new
ExpiringLinkedHashMap<>()


On Thu, Aug 6, 2020 at 3:16 PM Josh <jo...@gmail.com> wrote:

> Hi all,
>
> In my Beam job I have defined my own CombineFn with an accumulator.
> Running locally is no problem, but when I run the job on Dataflow I hit an
> Avro serialization exception:
> java.lang.NoSuchMethodException: java.util.Map.<init>()
> java.lang.Class.getConstructor0(Class.java:3082)
> java.lang.Class.getDeclaredConstructor(Class.java:2178)
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
>
> I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
> accumulator class. Is there anything special I need to do because one of
> the fields in my accumulator class is a Map? I have pasted an outline of my
> CombineFn below.
>
> Thanks for any help with this!
>
> Josh
>
> private static class MyCombineFn extends CombineFn<Event,
> MyCombineFn.Accum, Out> {
>
>         private static class ExpiringLinkedHashMap<K, V> extends
> LinkedHashMap<K, V> {
>             @Override
>             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
>                 return this.size() > 10;
>             }
>         }
>
>         @DefaultCoder(AvroCoder.class)
>         private static class PartialEventUpdate implements Serializable {
>             Long incrementCountBy = 0L;
>             Map<String, Event> recentEvents = new
> ExpiringLinkedHashMap<>();
>             Long lastSeenMillis = 0L;
>
>             PartialEventUpdate() {}
>         }
>
>         @DefaultCoder(AvroCoder.class)
>         private static class Accum implements Serializable {
>             Map<UUID, PartialEventUpdate> eventIdToUpdate = new
> HashMap<>();
>
>             Accum() {}
>         }
>
>         @Override
>         public MyCombineFn.Accum createAccumulator() {
>             return new MyCombineFn.Accum();
>         }
>
>         ...
>
> }
>