You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/07/01 17:07:00 UTC

[jira] [Work logged] (AVRO-3531) GenericDatumReader in multithread lead to infinite loop cause misused of IdentityHashMap

     [ https://issues.apache.org/jira/browse/AVRO-3531?focusedWorklogId=787168&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-787168 ]

ASF GitHub Bot logged work on AVRO-3531:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jul/22 17:06
            Start Date: 01/Jul/22 17:06
    Worklog Time Spent: 10m 
      Work Description: RyanSkraba commented on code in PR #1719:
URL: https://github.com/apache/avro/pull/1719#discussion_r912121582


##########
lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java:
##########
@@ -498,34 +498,86 @@ protected Class findStringClass(Schema schema) {
     }
   }
 
-  private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
+  /**
+   * This class is used to reproduce part of IdentityHashMap in ConcurrentHashMap
+   * code.
+   */
+  private static final class IdentitySchemaKey {
+    private final Schema schema;
+
+    private final int hashcode;
+
+    public IdentitySchemaKey(Schema schema) {
+      this.schema = schema;
+      this.hashcode = System.identityHashCode(schema);
+    }
 
-  private Class getStringClass(Schema s) {
-    Class c = stringClassCache.get(s);
-    if (c == null) {
-      c = findStringClass(s);
-      stringClassCache.put(s, c);
+    @Override
+    public int hashCode() {
+      return this.hashcode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) {
+        return false;
+      }
+      IdentitySchemaKey key = (IdentitySchemaKey) obj;
+      return this == key || this.schema == key.schema;
     }
-    return c;
   }
 
-  private final Map<Class, Constructor> stringCtorCache = new HashMap<>();
+  protected static class ReaderCache {
+    private final Map<IdentitySchemaKey, Class> stringClassCache = new ConcurrentHashMap<>();
 
-  @SuppressWarnings("unchecked")
-  protected Object newInstanceFromString(Class c, String s) {
-    try {
-      Constructor ctor = stringCtorCache.get(c);
-      if (ctor == null) {
+    private final Map<Class, Function<String, Object>> stringCtorCache = new ConcurrentHashMap<>();
+
+    private final Function<Schema, Class> findStringClass;
+
+    public ReaderCache(Function<Schema, Class> findStringClass) {
+      this.findStringClass = findStringClass;
+    }
+
+    public Object newInstanceFromString(Class c, String s) {
+      final Function<String, Object> ctor = stringCtorCache.computeIfAbsent(c, this::buildFunction);
+      return ctor.apply(s);
+    }
+
+    private Function<String, Object> buildFunction(Class c) {
+      final Constructor ctor;
+      try {
         ctor = c.getDeclaredConstructor(String.class);
-        ctor.setAccessible(true);
-        stringCtorCache.put(c, ctor);
+      } catch (NoSuchMethodException e) {
+        throw new AvroRuntimeException(e);
       }
-      return ctor.newInstance(s);
-    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
-      throw new AvroRuntimeException(e);
+      ctor.setAccessible(true);
+
+      return (String s) -> {
+        try {
+          return ctor.newInstance(s);
+        } catch (ReflectiveOperationException e) {
+          throw new AvroRuntimeException(e);
+        }
+      };
+    }
+
+    public Class getStringClass(final Schema s) {
+      final IdentitySchemaKey key = new IdentitySchemaKey(s);
+      return this.stringClassCache.computeIfAbsent(key, (IdentitySchemaKey k) -> this.findStringClass.apply(k.schema));
     }
   }
 
+  private final ReaderCache readerCache = new ReaderCache(this::findStringClass);
+
+  protected ReaderCache getReaderCache() {

Review Comment:
   ```suggestion
     // VisibleForTesting
     ReaderCache getReaderCache() {
   ```



##########
lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java:
##########
@@ -267,8 +267,8 @@ protected void readField(Object record, Field field, Object oldDatum, ResolvingD
         if (accessor.isStringable()) {
           try {
             String asString = (String) read(null, field.schema(), in);
-            accessor.set(record,
-                asString == null ? null : newInstanceFromString(accessor.getField().getType(), asString));
+            accessor.set(record, asString == null ? null

Review Comment:
   ```suggestion
               accessor.set(record,
                   asString == null ? null : newInstanceFromString(accessor.getField().getType(), asString));
   ```



##########
lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java:
##########
@@ -498,34 +498,86 @@ protected Class findStringClass(Schema schema) {
     }
   }
 
-  private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
+  /**
+   * This class is used to reproduce part of IdentityHashMap in ConcurrentHashMap
+   * code.
+   */
+  private static final class IdentitySchemaKey {
+    private final Schema schema;
+
+    private final int hashcode;
+
+    public IdentitySchemaKey(Schema schema) {
+      this.schema = schema;
+      this.hashcode = System.identityHashCode(schema);
+    }
 
-  private Class getStringClass(Schema s) {
-    Class c = stringClassCache.get(s);
-    if (c == null) {
-      c = findStringClass(s);
-      stringClassCache.put(s, c);
+    @Override
+    public int hashCode() {
+      return this.hashcode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) {
+        return false;
+      }
+      IdentitySchemaKey key = (IdentitySchemaKey) obj;
+      return this == key || this.schema == key.schema;
     }
-    return c;
   }
 
-  private final Map<Class, Constructor> stringCtorCache = new HashMap<>();
+  protected static class ReaderCache {

Review Comment:
   ```suggestion
     // VisibleForTesting
     static class ReaderCache {
   ```



##########
lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java:
##########
@@ -267,8 +267,8 @@ protected void readField(Object record, Field field, Object oldDatum, ResolvingD
         if (accessor.isStringable()) {
           try {
             String asString = (String) read(null, field.schema(), in);
-            accessor.set(record,
-                asString == null ? null : newInstanceFromString(accessor.getField().getType(), asString));
+            accessor.set(record, asString == null ? null

Review Comment:
   This change is no longer necessary, thanks to restoring the original method.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 787168)
    Time Spent: 1.5h  (was: 1h 20m)

> GenericDatumReader in multithread lead to infinite loop cause misused of IdentityHashMap
> ----------------------------------------------------------------------------------------
>
>                 Key: AVRO-3531
>                 URL: https://issues.apache.org/jira/browse/AVRO-3531
>             Project: Apache Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.11.0
>            Reporter: tansion
>            Assignee: Christophe Le Saec
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.11.1
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Hi, 
> I am working on a java project that uses Kafka with Avro serialization/deserialization in an messaging platform.
> In production enrionment, we meet a serious issue on the deserialization processs. The GenericDatumReader process some how get into a infinite loop status, and it is happened accationally.
> When the issue happens, The thread stack is like this:
>  
> {code:java}
> "DmqFixedRateConsumer-Thread-17" #453 daemon prio=5 os_prio=0 tid=0x00007f2ae1832800 nid=0xef49 runnable [0x00007f2a743fc000]
>    java.lang.Thread.State: RUNNABLE
>     at java.util.IdentityHashMap.get(IdentityHashMap.java:337)
>     at org.apache.avro.generic.GenericDatumReader.getStringClass(GenericDatumReader.java:503)
>     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:454)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>     at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>     at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>     at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>     at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>     at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>     at com.xxx.xxx.xxx.xxx.xxx.XXX.deserialize(XXX.java:252)
>     at com.xxx.xxx.xxx.xxx.xxx.ZZZ.deserialize(ZZZ.java:216)
>     at com.xxx.xxx.xxx.xxx.xxx.SSS.processMessage(SSS.java:152)
>     at com.xxx.xxx.xxx.xxx.xxx.SSS.loopProcess(SSS.java:127)
>     at com.xxx.xxx.xxx.xxx.xxx.SSS$$Lambda$172/367082698.run(Unknown Source)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748) {code}
> We create 30 threads, and all the threads are the same as above! They all get stuck in the IdentityHashMap.get() method.
>  
> Accroding to this mail [1.7.6 Slow Deserialization|https://www.mail-archive.com/user@avro.apache.org/msg02902.html], the Reader is thread-safe,  But actually, it seems not.
> Why?
> org.apache.avro.generic.GenericDatumReader#getStringClass
>  
> {code:java}
> /**
>  * Called to read strings. Subclasses may override to use a different string
>  * representation. By default, this calls {@link #readString(Object,Decoder)}.
>  */
> protected Object readString(Object old, Schema expected, Decoder in) throws IOException {
>   Class stringClass = getStringClass(expected);
>   if (stringClass == String.class) {
>     return in.readString();
>   }
>   if (stringClass == CharSequence.class) {
>     return readString(old, in);
>   }
>   return newInstanceFromString(stringClass, in.readString());
> }
> private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
> private Class getStringClass(Schema s) {
>   Class c = stringClassCache.get(s);
>   if (c == null) {
>     c = findStringClass(s);
>     stringClassCache.put(s, c);
>   }
>   return c;
> }
>  {code}
> The IdentityHashMap is not thread-safe, which is addressed by javadoc clearly! Like Hashmap infinite loop issue in multithread using, same issue happen to IdentityHashMap,too.
> My question is: Can the class GenericDatumReader fix this issue and act like  real thread-safe? Or we need to avoid use the single instance of GenericDatumReader in multithread?
> Thanks a lot,
>  Xtsong.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)