You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/07/01 17:07:00 UTC
[jira] [Work logged] (AVRO-3531) GenericDatumReader in multithread lead to infinite loop cause misused of IdentityHashMap
[ https://issues.apache.org/jira/browse/AVRO-3531?focusedWorklogId=787168&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-787168 ]
ASF GitHub Bot logged work on AVRO-3531:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Jul/22 17:06
Start Date: 01/Jul/22 17:06
Worklog Time Spent: 10m
Work Description: RyanSkraba commented on code in PR #1719:
URL: https://github.com/apache/avro/pull/1719#discussion_r912121582
##########
lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java:
##########
@@ -498,34 +498,86 @@ protected Class findStringClass(Schema schema) {
}
}
- private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
+ /**
+ * This class is used to reproduce part of IdentityHashMap in ConcurrentHashMap
+ * code.
+ */
+ private static final class IdentitySchemaKey {
+ private final Schema schema;
+
+ private final int hashcode;
+
+ public IdentitySchemaKey(Schema schema) {
+ this.schema = schema;
+ this.hashcode = System.identityHashCode(schema);
+ }
- private Class getStringClass(Schema s) {
- Class c = stringClassCache.get(s);
- if (c == null) {
- c = findStringClass(s);
- stringClassCache.put(s, c);
+ @Override
+ public int hashCode() {
+ return this.hashcode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) {
+ return false;
+ }
+ IdentitySchemaKey key = (IdentitySchemaKey) obj;
+ return this == key || this.schema == key.schema;
}
- return c;
}
- private final Map<Class, Constructor> stringCtorCache = new HashMap<>();
+ protected static class ReaderCache {
+ private final Map<IdentitySchemaKey, Class> stringClassCache = new ConcurrentHashMap<>();
- @SuppressWarnings("unchecked")
- protected Object newInstanceFromString(Class c, String s) {
- try {
- Constructor ctor = stringCtorCache.get(c);
- if (ctor == null) {
+ private final Map<Class, Function<String, Object>> stringCtorCache = new ConcurrentHashMap<>();
+
+ private final Function<Schema, Class> findStringClass;
+
+ public ReaderCache(Function<Schema, Class> findStringClass) {
+ this.findStringClass = findStringClass;
+ }
+
+ public Object newInstanceFromString(Class c, String s) {
+ final Function<String, Object> ctor = stringCtorCache.computeIfAbsent(c, this::buildFunction);
+ return ctor.apply(s);
+ }
+
+ private Function<String, Object> buildFunction(Class c) {
+ final Constructor ctor;
+ try {
ctor = c.getDeclaredConstructor(String.class);
- ctor.setAccessible(true);
- stringCtorCache.put(c, ctor);
+ } catch (NoSuchMethodException e) {
+ throw new AvroRuntimeException(e);
}
- return ctor.newInstance(s);
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
- throw new AvroRuntimeException(e);
+ ctor.setAccessible(true);
+
+ return (String s) -> {
+ try {
+ return ctor.newInstance(s);
+ } catch (ReflectiveOperationException e) {
+ throw new AvroRuntimeException(e);
+ }
+ };
+ }
+
+ public Class getStringClass(final Schema s) {
+ final IdentitySchemaKey key = new IdentitySchemaKey(s);
+ return this.stringClassCache.computeIfAbsent(key, (IdentitySchemaKey k) -> this.findStringClass.apply(k.schema));
}
}
+ private final ReaderCache readerCache = new ReaderCache(this::findStringClass);
+
+ protected ReaderCache getReaderCache() {
Review Comment:
```suggestion
// VisibleForTesting
ReaderCache getReaderCache() {
```
##########
lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java:
##########
@@ -267,8 +267,8 @@ protected void readField(Object record, Field field, Object oldDatum, ResolvingD
if (accessor.isStringable()) {
try {
String asString = (String) read(null, field.schema(), in);
- accessor.set(record,
- asString == null ? null : newInstanceFromString(accessor.getField().getType(), asString));
+ accessor.set(record, asString == null ? null
Review Comment:
```suggestion
accessor.set(record,
asString == null ? null : newInstanceFromString(accessor.getField().getType(), asString));
```
##########
lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java:
##########
@@ -498,34 +498,86 @@ protected Class findStringClass(Schema schema) {
}
}
- private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
+ /**
+ * This class is used to reproduce part of IdentityHashMap in ConcurrentHashMap
+ * code.
+ */
+ private static final class IdentitySchemaKey {
+ private final Schema schema;
+
+ private final int hashcode;
+
+ public IdentitySchemaKey(Schema schema) {
+ this.schema = schema;
+ this.hashcode = System.identityHashCode(schema);
+ }
- private Class getStringClass(Schema s) {
- Class c = stringClassCache.get(s);
- if (c == null) {
- c = findStringClass(s);
- stringClassCache.put(s, c);
+ @Override
+ public int hashCode() {
+ return this.hashcode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) {
+ return false;
+ }
+ IdentitySchemaKey key = (IdentitySchemaKey) obj;
+ return this == key || this.schema == key.schema;
}
- return c;
}
- private final Map<Class, Constructor> stringCtorCache = new HashMap<>();
+ protected static class ReaderCache {
Review Comment:
```suggestion
// VisibleForTesting
static class ReaderCache {
```
##########
lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java:
##########
@@ -267,8 +267,8 @@ protected void readField(Object record, Field field, Object oldDatum, ResolvingD
if (accessor.isStringable()) {
try {
String asString = (String) read(null, field.schema(), in);
- accessor.set(record,
- asString == null ? null : newInstanceFromString(accessor.getField().getType(), asString));
+ accessor.set(record, asString == null ? null
Review Comment:
This change is no longer necessary, thanks to restoring the original method.
Issue Time Tracking
-------------------
Worklog Id: (was: 787168)
Time Spent: 1.5h (was: 1h 20m)
> GenericDatumReader in multithread lead to infinite loop cause misused of IdentityHashMap
> ----------------------------------------------------------------------------------------
>
> Key: AVRO-3531
> URL: https://issues.apache.org/jira/browse/AVRO-3531
> Project: Apache Avro
> Issue Type: Bug
> Components: java
> Affects Versions: 1.11.0
> Reporter: tansion
> Assignee: Christophe Le Saec
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.11.1
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Hi,
> I am working on a java project that uses Kafka with Avro serialization/deserialization in an messaging platform.
> In production enrionment, we meet a serious issue on the deserialization processs. The GenericDatumReader process some how get into a infinite loop status, and it is happened accationally.
> When the issue happens, The thread stack is like this:
>
> {code:java}
> "DmqFixedRateConsumer-Thread-17" #453 daemon prio=5 os_prio=0 tid=0x00007f2ae1832800 nid=0xef49 runnable [0x00007f2a743fc000]
> java.lang.Thread.State: RUNNABLE
> at java.util.IdentityHashMap.get(IdentityHashMap.java:337)
> at org.apache.avro.generic.GenericDatumReader.getStringClass(GenericDatumReader.java:503)
> at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:454)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at com.xxx.xxx.xxx.xxx.xxx.XXX.deserialize(XXX.java:252)
> at com.xxx.xxx.xxx.xxx.xxx.ZZZ.deserialize(ZZZ.java:216)
> at com.xxx.xxx.xxx.xxx.xxx.SSS.processMessage(SSS.java:152)
> at com.xxx.xxx.xxx.xxx.xxx.SSS.loopProcess(SSS.java:127)
> at com.xxx.xxx.xxx.xxx.xxx.SSS$$Lambda$172/367082698.run(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) {code}
> We create 30 threads, and all the threads are the same as above! They all get stuck in the IdentityHashMap.get() method.
>
> Accroding to this mail [1.7.6 Slow Deserialization|https://www.mail-archive.com/user@avro.apache.org/msg02902.html], the Reader is thread-safe, But actually, it seems not.
> Why?
> org.apache.avro.generic.GenericDatumReader#getStringClass
>
> {code:java}
> /**
> * Called to read strings. Subclasses may override to use a different string
> * representation. By default, this calls {@link #readString(Object,Decoder)}.
> */
> protected Object readString(Object old, Schema expected, Decoder in) throws IOException {
> Class stringClass = getStringClass(expected);
> if (stringClass == String.class) {
> return in.readString();
> }
> if (stringClass == CharSequence.class) {
> return readString(old, in);
> }
> return newInstanceFromString(stringClass, in.readString());
> }
> private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
> private Class getStringClass(Schema s) {
> Class c = stringClassCache.get(s);
> if (c == null) {
> c = findStringClass(s);
> stringClassCache.put(s, c);
> }
> return c;
> }
> {code}
> The IdentityHashMap is not thread-safe, which is addressed by javadoc clearly! Like Hashmap infinite loop issue in multithread using, same issue happen to IdentityHashMap,too.
> My question is: Can the class GenericDatumReader fix this issue and act like real thread-safe? Or we need to avoid use the single instance of GenericDatumReader in multithread?
> Thanks a lot,
> Xtsong.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)