You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/02/03 10:01:56 UTC
[avro] branch branch-1.9 updated: AVRO-2247 - improved java reading
performance with new reader (#391)
This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new cbc6e50 AVRO-2247 - improved java reading performance with new reader (#391)
cbc6e50 is described below
commit cbc6e500710864545a1b2f9ffa28edef532e26af
Author: Martin Jubelgas <un...@wolke7.net>
AuthorDate: Mon Feb 3 09:37:02 2020 +0100
AVRO-2247 - improved java reading performance with new reader (#391)
* AVRO-2247 - Add FastDatumReaderBuilder and dependencies (rebased)
* Addressed comments to pull request
---
lang/java/avro/pom.xml | 12 +
.../src/main/java/org/apache/avro/Resolver.java | 21 +-
.../java/org/apache/avro/generic/GenericData.java | 70 ++-
.../apache/avro/generic/GenericDatumReader.java | 24 +-
.../java/org/apache/avro/io/FastReaderBuilder.java | 613 +++++++++++++++++++++
.../java/org/apache/avro/io/ReflectionUtils.java | 73 +++
.../avro/io/parsing/ResolvingGrammarGenerator.java | 2 +-
.../org/apache/avro/specific/SpecificData.java | 30 +-
.../src/main/java/org/apache/avro/util/Utf8.java | 14 +-
9 files changed, 833 insertions(+), 26 deletions(-)
diff --git a/lang/java/avro/pom.xml b/lang/java/avro/pom.xml
index 4953fce..dddead4 100644
--- a/lang/java/avro/pom.xml
+++ b/lang/java/avro/pom.xml
@@ -86,6 +86,18 @@
</systemPropertyVariables>
</configuration>
</execution>
+ <execution>
+ <id>test-with-fast-reader</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <systemPropertyVariables>
+ <org.apache.avro.fastread>true</org.apache.avro.fastread>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
diff --git a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java
index fead519..45f2a89 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java
@@ -221,6 +221,7 @@ public class Resolver {
this.error = e;
}
+ @Override
public String toString() {
switch (this.error) {
case INCOMPATIBLE_SCHEMA_TYPES:
@@ -365,9 +366,10 @@ public class Resolver {
*/
public static class EnumAdjust extends Action {
public final int[] adjustments;
+ public final Object[] values;
public final boolean noAdjustmentsNeeded;
- private EnumAdjust(Schema w, Schema r, GenericData d, int[] adj) {
+ private EnumAdjust(Schema w, Schema r, GenericData d, int[] adj, Object[] values) {
super(w, r, d, Action.Type.ENUM);
this.adjustments = adj;
boolean noAdj;
@@ -378,6 +380,7 @@ public class Resolver {
noAdj &= (i == adj[i]);
}
this.noAdjustmentsNeeded = noAdj;
+ this.values = values;
}
/**
@@ -393,11 +396,17 @@ public class Resolver {
final List<String> rsymbols = r.getEnumSymbols();
final int defaultIndex = (r.getEnumDefault() == null ? -1 : rsymbols.indexOf(r.getEnumDefault()));
int[] adjustments = new int[wsymbols.size()];
+ Object[] values = new Object[wsymbols.size()];
+ Object defaultValue = (defaultIndex == -1) ? null : d.createEnum(r.getEnumDefault(), r);
for (int i = 0; i < adjustments.length; i++) {
int j = rsymbols.indexOf(wsymbols.get(i));
- adjustments[i] = (0 <= j ? j : defaultIndex);
+ if (j < 0) {
+ j = defaultIndex;
+ }
+ adjustments[i] = j;
+ values[i] = (j == defaultIndex) ? defaultValue : d.createEnum(rsymbols.get(j), r);
}
- return new EnumAdjust(w, r, d, adjustments);
+ return new EnumAdjust(w, r, d, adjustments, values);
}
}
@@ -455,6 +464,11 @@ public class Resolver {
public final Object[] defaults;
/**
+ * Supplier that offers an optimized alternative to data.newRecord()
+ */
+ public final GenericData.InstanceSupplier instanceSupplier;
+
+ /**
* Returns true iff <code>i == readerOrder[i].pos()</code> for all
* indices <code>i</code>. Which is to say: the order of the reader's fields is
* the same in both the reader's and writer's schema.
@@ -473,6 +487,7 @@ public class Resolver {
this.readerOrder = ro;
this.firstDefault = firstD;
this.defaults = defaults;
+ this.instanceSupplier = d.getNewRecordSupplier(r);
}
/**
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
index 1aa50ac..8a3b437 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
@@ -17,9 +17,9 @@
*/
package org.apache.avro.generic;
-import java.nio.ByteBuffer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractList;
import java.util.Arrays;
@@ -27,8 +27,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
@@ -47,10 +47,11 @@ import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.io.BinaryData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.FastReaderBuilder;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.internal.Accessor;
@@ -178,6 +179,26 @@ public class GenericData {
return (Conversion<Object>) conversions.get(logicalType.getName());
}
+ public static final String FAST_READER_PROP = "org.apache.avro.fastread";
+ private boolean fastReaderEnabled = "true".equalsIgnoreCase(System.getProperty(FAST_READER_PROP));
+ private FastReaderBuilder fastReaderBuilder = null;
+
+ public GenericData setFastReaderEnabled(boolean flag) {
+ this.fastReaderEnabled = flag;
+ return this;
+ }
+
+ public boolean isFastReaderEnabled() {
+ return fastReaderEnabled && FastReaderBuilder.isSupportedData(this);
+ }
+
+ public FastReaderBuilder getFastReaderBuilder() {
+ if (fastReaderBuilder == null) {
+ fastReaderBuilder = new FastReaderBuilder(this);
+ }
+ return this.fastReaderBuilder;
+ }
+
/**
* Default implementation of {@link GenericRecord}. Note that this
* implementation does not fill in default values for fields if they are not
@@ -520,7 +541,7 @@ public class GenericData {
/** Returns a {@link DatumReader} for this kind of data. */
public DatumReader createDatumReader(Schema schema) {
- return new GenericDatumReader(schema, schema, this);
+ return createDatumReader(schema, schema);
}
/** Returns a {@link DatumReader} for this kind of data. */
@@ -1329,4 +1350,43 @@ public class GenericData {
return new GenericData.Record(schema);
}
+ /**
+ * Called to create new array instances. Subclasses may override to use a
+ * different array implementation. By default, this returns a
+ * {@link GenericData.Array}.
+ */
+ public Object newArray(Object old, int size, Schema schema) {
+ if (old instanceof GenericArray) {
+ ((GenericArray<?>) old).reset();
+ return old;
+ } else if (old instanceof Collection) {
+ ((Collection<?>) old).clear();
+ return old;
+ } else
+ return new GenericData.Array<Object>(size, schema);
+ }
+
+ /**
+ * Called to create new array instances. Subclasses may override to use a
+ * different map implementation. By default, this returns a {@link HashMap}.
+ */
+ public Object newMap(Object old, int size) {
+ if (old instanceof Map) {
+ ((Map<?, ?>) old).clear();
+ return old;
+ } else
+ return new HashMap<>(size);
+ }
+
+ /**
+ * create a supplier that allows to get new record instances for a given schema
+ * in an optimized way
+ */
+ public InstanceSupplier getNewRecordSupplier(Schema schema) {
+ return this::newRecord;
+ }
+
+ public interface InstanceSupplier {
+ public Object newInstance(Object oldInstance, Schema schema);
+ }
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
index 89df601..acf1ae9 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
@@ -45,6 +45,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
private final GenericData data;
private Schema actual;
private Schema expected;
+ private DatumReader<D> fastDatumReader = null;
private ResolvingDecoder creatorResolver = null;
private final Thread creator;
@@ -91,6 +92,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
expected = actual;
}
creatorResolver = null;
+ fastDatumReader = null;
}
/** Get the reader's schema. */
@@ -140,6 +142,13 @@ public class GenericDatumReader<D> implements DatumReader<D> {
@Override
@SuppressWarnings("unchecked")
public D read(D reuse, Decoder in) throws IOException {
+ if (data.isFastReaderEnabled()) {
+ if (this.fastDatumReader == null) {
+ this.fastDatumReader = data.getFastReaderBuilder().createDatumReader(actual, expected);
+ }
+ return fastDatumReader.read(reuse, in);
+ }
+
ResolvingDecoder resolver = getResolver(actual, expected);
resolver.configure(in);
D result = (D) read(reuse, expected, resolver);
@@ -432,14 +441,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
*/
@SuppressWarnings("unchecked")
protected Object newArray(Object old, int size, Schema schema) {
- if (old instanceof GenericArray) {
- ((GenericArray) old).reset();
- return old;
- } else if (old instanceof Collection) {
- ((Collection) old).clear();
- return old;
- } else
- return new GenericData.Array(size, schema);
+ return data.newArray(old, size, schema);
}
/**
@@ -448,11 +450,7 @@ public class GenericDatumReader<D> implements DatumReader<D> {
*/
@SuppressWarnings("unchecked")
protected Object newMap(Object old, int size) {
- if (old instanceof Map) {
- ((Map) old).clear();
- return old;
- } else
- return new HashMap<>(size);
+ return data.newMap(old, size);
}
/**
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java
new file mode 100644
index 0000000..40a612c
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.Resolver;
+import org.apache.avro.Resolver.Action;
+import org.apache.avro.Resolver.Container;
+import org.apache.avro.Resolver.EnumAdjust;
+import org.apache.avro.Resolver.Promote;
+import org.apache.avro.Resolver.ReaderUnion;
+import org.apache.avro.Resolver.RecordAdjust;
+import org.apache.avro.Resolver.Skip;
+import org.apache.avro.Resolver.WriterUnion;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.InstanceSupplier;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.FastReaderBuilder.RecordReader.Stage;
+import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.util.WeakIdentityHashMap;
+import org.apache.avro.util.internal.Accessor;
+
+public class FastReaderBuilder {
+
+ /**
+ * Generic/SpecificData instance that contains basic functionalities like
+ * instantiation of objects
+ */
+ private final GenericData data;
+
+ /** first schema is reader schema, second is writer schema */
+ private final Map<Schema, Map<Schema, RecordReader>> readerCache = Collections
+ .synchronizedMap(new WeakIdentityHashMap<>());
+
+ private boolean keyClassEnabled = true;
+
+ private boolean classPropEnabled = true;
+
+ public static FastReaderBuilder get() {
+ return new FastReaderBuilder(GenericData.get());
+ }
+
+ public static FastReaderBuilder getSpecific() {
+ return new FastReaderBuilder(SpecificData.get());
+ }
+
+ public static boolean isSupportedData(GenericData data) {
+ return data.getClass() == GenericData.class || data.getClass() == SpecificData.class;
+ }
+
+ public FastReaderBuilder(GenericData parentData) {
+ this.data = parentData;
+ }
+
+ public FastReaderBuilder withKeyClassEnabled(boolean enabled) {
+ this.keyClassEnabled = enabled;
+ return this;
+ }
+
+ public boolean isKeyClassEnabled() {
+ return this.keyClassEnabled;
+ }
+
+ public FastReaderBuilder withClassPropEnabled(boolean enabled) {
+ this.classPropEnabled = enabled;
+ return this;
+ }
+
+ public boolean isClassPropEnabled() {
+ return this.classPropEnabled;
+ }
+
+ public <D> DatumReader<D> createDatumReader(Schema schema) throws IOException {
+ return createDatumReader(schema, schema);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <D> DatumReader<D> createDatumReader(Schema writerSchema, Schema readerSchema) throws IOException {
+ Schema resolvedWriterSchema = Schema.applyAliases(writerSchema, readerSchema);
+ return (DatumReader<D>) getReaderFor(readerSchema, resolvedWriterSchema);
+ }
+
+ private FieldReader getReaderFor(Schema readerSchema, Schema writerSchema) throws IOException {
+ Action resolvedAction = Resolver.resolve(writerSchema, readerSchema, data);
+ return getReaderFor(resolvedAction, null);
+ }
+
+ private FieldReader getReaderFor(Action action, Conversion<?> explicitConversion) throws IOException {
+ final FieldReader baseReader = getNonConvertedReader(action);
+ return applyConversions(action.reader, baseReader, explicitConversion);
+ }
+
+ private RecordReader createRecordReader(RecordAdjust action) throws IOException {
+ // record readers are created in a two-step process, first registering it, then
+ // initializing it,
+ // to prevent endless loops on recursive types
+ RecordReader recordReader = getRecordReaderFromCache(action.reader, action.writer);
+ synchronized (recordReader) {
+ // only need to initialize once
+ if (recordReader.getInitializationStage() == Stage.NEW) {
+ initializeRecordReader(recordReader, action);
+ }
+ }
+ return recordReader;
+ }
+
+ private RecordReader initializeRecordReader(RecordReader recordReader, RecordAdjust action) throws IOException {
+ recordReader.startInitialization();
+
+ // generate supplier for the new object instances
+ Object testInstance = action.instanceSupplier.newInstance(null, action.reader);
+ IntFunction<Conversion<?>> conversionSupplier = getConversionSupplier(testInstance);
+
+ ExecutionStep[] readSteps = new ExecutionStep[action.fieldActions.length + action.readerOrder.length
+ - action.firstDefault];
+
+ int i = 0;
+ int fieldCounter = 0;
+ // compute what to do with writer's fields
+ for (; i < action.fieldActions.length; i++) {
+ Action fieldAction = action.fieldActions[i];
+ if (fieldAction instanceof Skip) {
+ readSteps[i] = (r, decoder) -> GenericDatumReader.skip(fieldAction.writer, decoder);
+ } else {
+ Field readerField = action.readerOrder[fieldCounter++];
+ Conversion<?> conversion = conversionSupplier.apply(readerField.pos());
+ FieldReader reader = getReaderFor(fieldAction, conversion);
+ readSteps[i] = createFieldSetter(readerField, reader);
+ }
+ }
+
+ // add defaulting if required
+ for (; i < readSteps.length; i++) {
+ readSteps[i] = getDefaultingStep(action.readerOrder[fieldCounter++]);
+ }
+
+ recordReader.finishInitialization(readSteps, action.reader, action.instanceSupplier);
+ return recordReader;
+ }
+
+ private ExecutionStep createFieldSetter(Field field, FieldReader reader) {
+ int pos = field.pos();
+ if (reader.canReuse()) {
+ return (object, decoder) -> {
+ IndexedRecord record = (IndexedRecord) object;
+ record.put(pos, reader.read(record.get(pos), decoder));
+ };
+ } else {
+ return (object, decoder) -> {
+ IndexedRecord record = (IndexedRecord) object;
+ record.put(pos, reader.read(null, decoder));
+ };
+ }
+ }
+
+ private ExecutionStep getDefaultingStep(Schema.Field field) throws IOException {
+ Object defaultValue = data.getDefaultValue(field);
+
+ if (isObjectImmutable(defaultValue)) {
+ return createFieldSetter(field, (old, d) -> defaultValue);
+ } else if (defaultValue instanceof Utf8) {
+ return createFieldSetter(field, reusingReader((old, d) -> readUtf8(old, (Utf8) defaultValue)));
+ } else if (defaultValue instanceof List && ((List<?>) defaultValue).isEmpty()) {
+ return createFieldSetter(field, reusingReader((old, d) -> data.newArray(old, 0, field.schema())));
+ } else if (defaultValue instanceof Map && ((Map<?, ?>) defaultValue).isEmpty()) {
+ return createFieldSetter(field, reusingReader((old, d) -> data.newMap(old, 0)));
+ } else {
+ DatumReader<Object> datumReader = createDatumReader(field.schema());
+ byte[] encoded = getEncodedValue(field);
+ FieldReader fieldReader = reusingReader(
+ (old, decoder) -> datumReader.read(old, DecoderFactory.get().binaryDecoder(encoded, null)));
+ return createFieldSetter(field, fieldReader);
+ }
+ }
+
+ private boolean isObjectImmutable(Object object) {
+ return object == null || object instanceof Number || object instanceof String || object instanceof GenericEnumSymbol
+ || object.getClass().isEnum();
+ }
+
+ private Utf8 readUtf8(Object reuse, Utf8 newValue) {
+ if (reuse instanceof Utf8) {
+ Utf8 oldUtf8 = (Utf8) reuse;
+ oldUtf8.set(newValue);
+ return oldUtf8;
+ } else {
+ return new Utf8(newValue);
+ }
+ }
+
+ private byte[] getEncodedValue(Field field) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+
+ ResolvingGrammarGenerator.encode(encoder, field.schema(), Accessor.defaultValue(field));
+ encoder.flush();
+
+ return out.toByteArray();
+ }
+
+ private IntFunction<Conversion<?>> getConversionSupplier(Object record) {
+ if (record instanceof SpecificRecordBase) {
+ return ((SpecificRecordBase) record)::getConversion;
+ } else {
+ return index -> null;
+ }
+ }
+
+ private RecordReader getRecordReaderFromCache(Schema readerSchema, Schema writerSchema) {
+ return readerCache.computeIfAbsent(readerSchema, k -> new WeakIdentityHashMap<>()).computeIfAbsent(writerSchema,
+ k -> new RecordReader());
+ }
+
+ private FieldReader applyConversions(Schema readerSchema, FieldReader reader, Conversion<?> explicitConversion) {
+ Conversion<?> conversion = explicitConversion;
+
+ if (conversion == null) {
+ if (readerSchema.getLogicalType() == null) {
+ return reader;
+ }
+ conversion = data.getConversionFor(readerSchema.getLogicalType());
+ if (conversion == null) {
+ return reader;
+ }
+ }
+
+ Conversion<?> finalConversion = conversion;
+ return (old, decoder) -> Conversions.convertToLogicalType(reader.read(old, decoder), readerSchema,
+ readerSchema.getLogicalType(), finalConversion);
+ }
+
+ private FieldReader getNonConvertedReader(Action action) throws IOException {
+ switch (action.type) {
+ case CONTAINER:
+ switch (action.reader.getType()) {
+ case MAP:
+ return createMapReader(action.reader, (Container) action);
+ case ARRAY:
+ return createArrayReader(action.reader, (Container) action);
+ default:
+ throw new IllegalStateException("Error getting reader for action type " + action.getClass());
+ }
+ case DO_NOTHING:
+ return getReaderForBaseType(action.reader, action.writer);
+ case RECORD:
+ return createRecordReader((RecordAdjust) action);
+ case ENUM:
+ return createEnumReader((EnumAdjust) action);
+ case PROMOTE:
+ return createPromotingReader((Promote) action);
+ case WRITER_UNION:
+ return createUnionReader((WriterUnion) action);
+ case READER_UNION:
+ return getReaderFor(((ReaderUnion) action).actualAction, null);
+ case ERROR:
+ return (old, decoder) -> {
+ throw new AvroTypeException(action.toString());
+ };
+ default:
+ throw new IllegalStateException("Error getting reader for action type " + action.getClass());
+ }
+ }
+
+ private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchema) throws IOException {
+ switch (readerSchema.getType()) {
+ case NULL:
+ return (old, decoder) -> {
+ decoder.readNull();
+ return null;
+ };
+ case BOOLEAN:
+ return (old, decoder) -> decoder.readBoolean();
+ case STRING:
+ return createStringReader(readerSchema, writerSchema);
+ case INT:
+ return (old, decoder) -> decoder.readInt();
+ case LONG:
+ return (old, decoder) -> decoder.readLong();
+ case FLOAT:
+ return (old, decoder) -> decoder.readFloat();
+ case DOUBLE:
+ return (old, decoder) -> decoder.readDouble();
+ case BYTES:
+ return createBytesReader();
+ case FIXED:
+ return createFixedReader(readerSchema, writerSchema);
+ case RECORD: // covered by action type
+ case UNION: // covered by action type
+ case ENUM: // covered by action type
+ case MAP: // covered by action type
+ case ARRAY: // covered by action type
+ default:
+ throw new IllegalStateException("Error getting reader for type " + readerSchema.getFullName());
+ }
+ }
+
+ private FieldReader createPromotingReader(Promote promote) throws IOException {
+ switch (promote.reader.getType()) {
+ case BYTES:
+ return (reuse, decoder) -> ByteBuffer.wrap(decoder.readString(null).getBytes());
+ case STRING:
+ return createBytesPromotingToStringReader(promote.reader);
+ case LONG:
+ return (reuse, decoder) -> (long) decoder.readInt();
+ case FLOAT:
+ switch (promote.writer.getType()) {
+ case INT:
+ return (reuse, decoder) -> (float) decoder.readInt();
+ case LONG:
+ return (reuse, decoder) -> (float) decoder.readLong();
+ default:
+ }
+ break;
+ case DOUBLE:
+ switch (promote.writer.getType()) {
+ case INT:
+ return (reuse, decoder) -> (double) decoder.readInt();
+ case LONG:
+ return (reuse, decoder) -> (double) decoder.readLong();
+ case FLOAT:
+ return (reuse, decoder) -> (double) decoder.readFloat();
+ default:
+ }
+ break;
+ default:
+ }
+ throw new IllegalStateException(
+ "No promotion possible for type " + promote.writer.getType() + " to " + promote.reader.getType());
+ }
+
+ private FieldReader createStringReader(Schema readerSchema, Schema writerSchema) {
+ FieldReader stringReader = createSimpleStringReader(readerSchema);
+ if (isClassPropEnabled()) {
+ return getTransformingStringReader(readerSchema.getProp(SpecificData.CLASS_PROP), stringReader);
+ } else {
+ return stringReader;
+ }
+ }
+
+ private FieldReader createSimpleStringReader(Schema readerSchema) {
+ String stringProperty = readerSchema.getProp(GenericData.STRING_PROP);
+ if (GenericData.StringType.String.name().equals(stringProperty)) {
+ return (old, decoder) -> decoder.readString();
+ } else {
+ return (old, decoder) -> decoder.readString(old instanceof Utf8 ? (Utf8) old : null);
+ }
+ }
+
+ private FieldReader createBytesPromotingToStringReader(Schema readerSchema) {
+ String stringProperty = readerSchema.getProp(GenericData.STRING_PROP);
+ if (GenericData.StringType.String.name().equals(stringProperty)) {
+ return (old, decoder) -> getStringFromByteBuffer(decoder.readBytes(null));
+ } else {
+ return (old, decoder) -> getUtf8FromByteBuffer(old, decoder.readBytes(null));
+ }
+ }
+
+ private String getStringFromByteBuffer(ByteBuffer buffer) {
+ return new String(buffer.array(), buffer.position(), buffer.remaining(), StandardCharsets.UTF_8);
+ }
+
+ private Utf8 getUtf8FromByteBuffer(Object old, ByteBuffer buffer) {
+ return (old instanceof Utf8) ? ((Utf8) old).set(new Utf8(buffer.array())) : new Utf8(buffer.array());
+ }
+
+ private FieldReader createUnionReader(WriterUnion action) throws IOException {
+ FieldReader[] unionReaders = new FieldReader[action.actions.length];
+ for (int i = 0; i < action.actions.length; i++) {
+ unionReaders[i] = getReaderFor(action.actions[i], null);
+ }
+ return createUnionReader(unionReaders);
+ }
+
+ private FieldReader createUnionReader(FieldReader[] unionReaders) {
+ return reusingReader((reuse, decoder) -> {
+ final int selection = decoder.readIndex();
+ return unionReaders[selection].read(null, decoder);
+ });
+
+ }
+
+ private FieldReader createMapReader(Schema readerSchema, Container action) throws IOException {
+ FieldReader keyReader = createMapKeyReader(readerSchema);
+ FieldReader valueReader = getReaderFor(action.elementAction, null);
+ return new MapReader(keyReader, valueReader);
+ }
+
+ private FieldReader createMapKeyReader(Schema readerSchema) {
+ FieldReader stringReader = createSimpleStringReader(readerSchema);
+ if (isKeyClassEnabled()) {
+ return getTransformingStringReader(readerSchema.getProp(SpecificData.KEY_CLASS_PROP),
+ createSimpleStringReader(readerSchema));
+ } else {
+ return stringReader;
+ }
+ }
+
+ private FieldReader getTransformingStringReader(String valueClass, FieldReader stringReader) {
+ if (valueClass == null) {
+ return stringReader;
+ } else {
+ Function<String, ?> transformer = findClass(valueClass)
+ .map(clazz -> ReflectionUtils.getConstructorAsFunction(String.class, clazz)).orElse(null);
+ if (transformer != null) {
+ return (old, decoder) -> transformer.apply((String) stringReader.read(null, decoder));
+ }
+ }
+
+ return stringReader;
+ }
+
+ private Optional<Class<?>> findClass(String clazz) {
+ try {
+ return Optional.of(data.getClassLoader().loadClass(clazz));
+ } catch (ReflectiveOperationException e) {
+ return Optional.empty();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private FieldReader createArrayReader(Schema readerSchema, Container action) throws IOException {
+ FieldReader elementReader = getReaderFor(action.elementAction, null);
+
+ return reusingReader((reuse, decoder) -> {
+ if (reuse instanceof GenericArray) {
+ GenericArray<Object> reuseArray = (GenericArray<Object>) reuse;
+ long l = decoder.readArrayStart();
+ reuseArray.clear();
+
+ while (l > 0) {
+ for (long i = 0; i < l; i++) {
+ reuseArray.add(elementReader.read(reuseArray.peek(), decoder));
+ }
+ l = decoder.arrayNext();
+ }
+ return reuseArray;
+ } else {
+ long l = decoder.readArrayStart();
+ List<Object> array = (reuse instanceof List) ? (List<Object>) reuse
+ : new GenericData.Array<>((int) l, readerSchema);
+ array.clear();
+ while (l > 0) {
+ for (long i = 0; i < l; i++) {
+ array.add(elementReader.read(null, decoder));
+ }
+ l = decoder.arrayNext();
+ }
+ return array;
+ }
+ });
+ }
+
+ private FieldReader createEnumReader(EnumAdjust action) {
+ return reusingReader((reuse, decoder) -> {
+ int index = decoder.readEnum();
+ Object resultObject = action.values[index];
+ if (resultObject == null) {
+ throw new AvroTypeException("No match for " + action.writer.getEnumSymbols().get(index));
+ }
+ return resultObject;
+ });
+ }
+
+ private FieldReader createFixedReader(Schema readerSchema, Schema writerSchema) {
+ return reusingReader((reuse, decoder) -> {
+ GenericFixed fixed = (GenericFixed) data.createFixed(reuse, readerSchema);
+ decoder.readFixed(fixed.bytes(), 0, readerSchema.getFixedSize());
+ return fixed;
+ });
+ }
+
+ private FieldReader createBytesReader() {
+ return reusingReader(
+ (reuse, decoder) -> decoder.readBytes(reuse instanceof ByteBuffer ? (ByteBuffer) reuse : null));
+ }
+
+ public static FieldReader reusingReader(ReusingFieldReader reader) {
+ return reader;
+ }
+
+ public interface FieldReader extends DatumReader<Object> {
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException;
+
+ public default boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ default void setSchema(Schema schema) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public interface ReusingFieldReader extends FieldReader {
+ @Override
+ public default boolean canReuse() {
+ return true;
+ }
+ }
+
+ public static class RecordReader implements FieldReader {
+ public enum Stage {
+ NEW, INITIALIZING, INITIALIZED
+ }
+
+ private ExecutionStep[] readSteps;
+ private InstanceSupplier supplier;
+ private Schema schema;
+ private Stage stage = Stage.NEW;
+
+ public Stage getInitializationStage() {
+ return this.stage;
+ }
+
+ public void reset() {
+ this.stage = Stage.NEW;
+ }
+
+ public void startInitialization() {
+ this.stage = Stage.INITIALIZING;
+ }
+
+ public void finishInitialization(ExecutionStep[] readSteps, Schema schema, InstanceSupplier supp) {
+ this.readSteps = readSteps;
+ this.schema = schema;
+ this.supplier = supp;
+ this.stage = Stage.INITIALIZED;
+ }
+
+ @Override
+ public boolean canReuse() {
+ return true;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ Object object = supplier.newInstance(reuse, schema);
+ for (ExecutionStep thisStep : readSteps) {
+ thisStep.execute(object, decoder);
+ }
+ return object;
+ }
+ }
+
+ public static class MapReader implements FieldReader {
+
+ private final FieldReader keyReader;
+ private final FieldReader valueReader;
+
+ public MapReader(FieldReader keyReader, FieldReader valueReader) {
+ this.keyReader = keyReader;
+ this.valueReader = valueReader;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ long l = decoder.readMapStart();
+ Map<Object, Object> targetMap = new HashMap<>();
+
+ while (l > 0) {
+ for (int i = 0; i < l; i++) {
+ Object key = keyReader.read(null, decoder);
+ Object value = valueReader.read(null, decoder);
+ targetMap.put(key, value);
+ }
+ l = decoder.mapNext();
+ }
+
+ return targetMap;
+ }
+ }
+
+ public interface ExecutionStep {
+ public void execute(Object record, Decoder decoder) throws IOException;
+ }
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ReflectionUtils.java b/lang/java/avro/src/main/java/org/apache/avro/io/ReflectionUtils.java
new file mode 100644
index 0000000..53faec0
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/ReflectionUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.lang.invoke.CallSite;
+import java.lang.invoke.LambdaMetafactory;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class ReflectionUtils {
+
+ private ReflectionUtils() {
+ // static helper class, don't initiate
+ }
+
+ public static <D> Supplier<D> getConstructorAsSupplier(Class<D> clazz) {
+ try {
+ MethodHandles.Lookup lookup = MethodHandles.lookup();
+ MethodHandle constructorHandle = lookup.findConstructor(clazz, MethodType.methodType(void.class));
+
+ CallSite site = LambdaMetafactory.metafactory(lookup, "get", MethodType.methodType(Supplier.class),
+ constructorHandle.type().generic(), constructorHandle, constructorHandle.type());
+
+ return (Supplier<D>) site.getTarget().invokeExact();
+ } catch (Throwable t) {
+ // if anything goes wrong, don't provide a Supplier
+ return null;
+ }
+ }
+
+ public static <V, R> Supplier<R> getOneArgConstructorAsSupplier(Class<R> clazz, Class<V> argumentClass, V argument) {
+ Function<V, R> supplierFunction = getConstructorAsFunction(argumentClass, clazz);
+ if (supplierFunction != null) {
+ return () -> supplierFunction.apply(argument);
+ } else {
+ return null;
+ }
+ }
+
+ public static <V, R> Function<V, R> getConstructorAsFunction(Class<V> parameterClass, Class<R> clazz) {
+ try {
+ MethodHandles.Lookup lookup = MethodHandles.lookup();
+ MethodHandle constructorHandle = lookup.findConstructor(clazz, MethodType.methodType(void.class, parameterClass));
+
+ CallSite site = LambdaMetafactory.metafactory(lookup, "apply", MethodType.methodType(Function.class),
+ constructorHandle.type().generic(), constructorHandle, constructorHandle.type());
+
+ return (Function<V, R>) site.getTarget().invokeExact();
+ } catch (Throwable t) {
+ // if something goes wrong, do not provide a Function instance
+ return null;
+ }
+ }
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
index fd02b46..1e2fba4 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
@@ -251,7 +251,7 @@ public class ResolvingGrammarGenerator extends ValidatingGrammarGenerator {
* @param n The Json node to encode.
* @throws IOException
*/
- static void encode(Encoder e, Schema s, JsonNode n) throws IOException {
+ public static void encode(Encoder e, Schema s, JsonNode n) throws IOException {
switch (s.getType()) {
case RECORD:
for (Field f : s.getFields()) {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
index 98667cd..bebdfc6 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
@@ -117,7 +117,7 @@ public class SpecificData extends GenericData {
@Override
public DatumReader createDatumReader(Schema schema) {
- return new SpecificDatumReader(schema, schema, this);
+ return createDatumReader(schema, schema);
}
@Override
@@ -491,10 +491,36 @@ public class SpecificData extends GenericData {
return (c.isInstance(old) ? old : newInstance(c, schema));
}
+ @SuppressWarnings("rawtypes")
+ @Override
+ /**
+ * Create an InstanceSupplier that caches all information required for the
+ * creation of a schema record instance rather than having to look them up for
+ * each call (as newRecord would)
+ */
+ public InstanceSupplier getNewRecordSupplier(Schema schema) {
+ Class c = getClass(schema);
+ if (c == null) {
+ return super.getNewRecordSupplier(schema);
+ }
+
+ boolean useSchema = SchemaConstructable.class.isAssignableFrom(c);
+ Constructor meth = (Constructor) CTOR_CACHE.get(c);
+ Object[] params = useSchema ? new Object[] { schema } : (Object[]) null;
+
+ return (old, sch) -> {
+ try {
+ return c.isInstance(old) ? old : meth.newInstance(params);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
/**
* Tag interface that indicates that a class has a one-argument constructor that
* accepts a Schema.
- *
+ *
* @see #newInstance
*/
public interface SchemaConstructable {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
index e0ec85c..883452c 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
@@ -80,7 +80,7 @@ public class Utf8 implements Comparable<Utf8>, CharSequence {
/**
* Return length in bytes.
- *
+ *
* @deprecated call {@link #getByteLength()} instead.
*/
@Deprecated
@@ -96,7 +96,7 @@ public class Utf8 implements Comparable<Utf8>, CharSequence {
/**
* Set length in bytes. Should called whenever byte content changes, even if the
* length does not change, as this also clears the cached String.
- *
+ *
* @deprecated call {@link #setByteLength(int)} instead.
*/
@Deprecated
@@ -130,6 +130,16 @@ public class Utf8 implements Comparable<Utf8>, CharSequence {
return this;
}
+ public Utf8 set(Utf8 other) {
+ if (this.bytes.length < other.length) {
+ this.bytes = new byte[other.length];
+ }
+ this.length = other.length;
+ System.arraycopy(other.bytes, 0, bytes, 0, length);
+ this.string = other.string;
+ return this;
+ }
+
@Override
public String toString() {
if (this.length == 0)