You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/11 00:19:05 UTC
svn commit: r984240 - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/generic/
lang/java/src/java/org/apache/avro/io/
lang/java/src/java/org/apache/avro/util/
lang/java/src/test/java/org/apache/avro/io/
Author: cutting
Date: Tue Aug 10 22:19:05 2010
New Revision: 984240
URL: http://svn.apache.org/viewvc?rev=984240&view=rev
Log:
AVRO-557. Java: Cache ResolvingDecoder instances in GenericDatumReader.
Added:
avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java
avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java
avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Aug 10 22:19:05 2010
@@ -103,6 +103,9 @@ Avro 1.4.0 (unreleased)
accepts both reader's and writer's schema. Also improve javadoc
for related constructors and setters. (Stu Hood via cutting)
+ AVRO-557. Java: Cache ResolvingDecoder instances, speeding
+ DatumReader benchmarks by 5x to 9x. (cutting & scotcarey)
+
BUG FIXES
AVRO-502. Memory leak from parsing JSON schema.
Modified: avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java Tue Aug 10 22:19:05 2010
@@ -29,12 +29,13 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
+import org.apache.avro.util.WeakIdentityHashMap;
/** {@link DatumReader} for generic Java objects. */
public class GenericDatumReader<D> implements DatumReader<D> {
private Schema actual;
private Schema expected;
- private Object resolver;
+ private ResolvingDecoder resolver;
public GenericDatumReader() {}
@@ -64,14 +65,36 @@ public class GenericDatumReader<D> imple
this.expected = reader;
}
- @SuppressWarnings("unchecked")
- public D read(D reuse, Decoder in) throws IOException {
+ private static final ThreadLocal<Map<Schema,Map<Schema,ResolvingDecoder>>>
+ RESOLVER_CACHE =
+ new ThreadLocal<Map<Schema,Map<Schema,ResolvingDecoder>>>() {
+ protected Map<Schema,Map<Schema,ResolvingDecoder>> initialValue() {
+ return new WeakIdentityHashMap<Schema,Map<Schema,ResolvingDecoder>>();
+ }
+ };
+
+ private static ResolvingDecoder getResolver(Schema actual, Schema expected)
+ throws IOException {
+ Map<Schema,ResolvingDecoder> cache = RESOLVER_CACHE.get().get(actual);
+ if (cache == null) {
+ cache = new WeakIdentityHashMap<Schema,ResolvingDecoder>();
+ RESOLVER_CACHE.get().put(actual, cache);
+ }
+ ResolvingDecoder resolver = cache.get(expected);
if (resolver == null) {
- resolver = ResolvingDecoder.resolve(actual, expected);
+ resolver = new ResolvingDecoder(actual, expected, null);
+ cache.put(expected, resolver);
}
- ResolvingDecoder r = new ResolvingDecoder(resolver, in);
- D result = (D) read(reuse, expected, r);
- r.drain();
+ return resolver;
+ }
+
+ @SuppressWarnings("unchecked")
+ public D read(D reuse, Decoder in) throws IOException {
+ if (resolver == null)
+ resolver = getResolver(actual, expected);
+ resolver.init(in);
+ D result = (D) read(reuse, expected, resolver);
+ resolver.drain();
return result;
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java Tue Aug 10 22:19:05 2010
@@ -45,6 +45,12 @@ public class ValidatingDecoder extends P
this(new ValidatingGrammarGenerator().generate(schema), in);
}
+ /** Re-initialize, reading from a new underlying Decoder. */
+ public void init(Decoder in) throws IOException {
+ parser.reset();
+ this.in = in;
+ }
+
@Override
public void init(InputStream in) throws IOException {
parser.reset();
Added: avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java?rev=984240&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java Tue Aug 10 22:19:05 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.avro.util;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implements a combination of WeakHashMap and IdentityHashMap.
+ * Useful for caches that need to key off of a == comparison
+ * instead of a .equals.
+ *
+ * <b>
+ * This class is not a general-purpose Map implementation! While
+ * this class implements the Map interface, it intentionally violates
+ * Map's general contract, which mandates the use of the equals method
+ * when comparing objects. This class is designed for use only in the
+ * rare cases wherein reference-equality semantics are required.
+ *
+ * Note that this implementation is not synchronized.
+ * </b>
+ */
+public class WeakIdentityHashMap<K, V> implements Map<K, V> {
+ private final ReferenceQueue<K> queue = new ReferenceQueue<K>();
+ private Map<IdentityWeakReference, V> backingStore
+ = new HashMap<IdentityWeakReference, V>();
+
+ public WeakIdentityHashMap() {}
+
+ public void clear() {
+ backingStore.clear();
+ reap();
+ }
+
+ public boolean containsKey(Object key) {
+ reap();
+ return backingStore.containsKey(new IdentityWeakReference(key));
+ }
+
+ public boolean containsValue(Object value) {
+ reap();
+ return backingStore.containsValue(value);
+ }
+
+ public Set<Map.Entry<K, V>> entrySet() {
+ reap();
+ Set<Map.Entry<K, V>> ret = new HashSet<Map.Entry<K, V>>();
+ for (Map.Entry<IdentityWeakReference, V> ref : backingStore.entrySet()) {
+ final K key = ref.getKey().get();
+ final V value = ref.getValue();
+ Map.Entry<K, V> entry = new Map.Entry<K, V>() {
+ public K getKey() {
+ return key;
+ }
+ public V getValue() {
+ return value;
+ }
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ ret.add(entry);
+ }
+ return Collections.unmodifiableSet(ret);
+ }
+
+ public Set<K> keySet() {
+ reap();
+ Set<K> ret = new HashSet<K>();
+ for (IdentityWeakReference ref : backingStore.keySet()) {
+ ret.add(ref.get());
+ }
+ return Collections.unmodifiableSet(ret);
+ }
+
+ public boolean equals(Object o) {
+ return backingStore.equals(((WeakIdentityHashMap)o).backingStore);
+ }
+
+ public V get(Object key) {
+ reap();
+ return backingStore.get(new IdentityWeakReference(key));
+ }
+ public V put(K key, V value) {
+ reap();
+ return backingStore.put(new IdentityWeakReference(key), value);
+ }
+
+ public int hashCode() {
+ reap();
+ return backingStore.hashCode();
+ }
+ public boolean isEmpty() {
+ reap();
+ return backingStore.isEmpty();
+ }
+ public void putAll(Map t) {
+ throw new UnsupportedOperationException();
+ }
+ public V remove(Object key) {
+ reap();
+ return backingStore.remove(new IdentityWeakReference(key));
+ }
+ public int size() {
+ reap();
+ return backingStore.size();
+ }
+ public Collection<V> values() {
+ reap();
+ return backingStore.values();
+ }
+
+ private synchronized void reap() {
+ Object zombie = queue.poll();
+
+ while (zombie != null) {
+ IdentityWeakReference victim = (IdentityWeakReference)zombie;
+ backingStore.remove(victim);
+ zombie = queue.poll();
+ }
+ }
+
+ class IdentityWeakReference extends WeakReference<K> {
+ int hash;
+
+ @SuppressWarnings("unchecked")
+ IdentityWeakReference(Object obj) {
+ super((K)obj, queue);
+ hash = System.identityHashCode(obj);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ IdentityWeakReference ref = (IdentityWeakReference)o;
+ if (this.get() == ref.get()) {
+ return true;
+ }
+ return false;
+ }
+ }
+}
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java Tue Aug 10 22:19:05 2010
@@ -18,7 +18,6 @@
package org.apache.avro.io;
import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -61,7 +60,7 @@ public class Perf {
} else if (a.equals("-S")) {
tests.add(new ResolverTest());
} else if (a.equals("-M")) {
- tests.add(new MigrationTest());
+ tests.add(new MigrationWithDefaultTest());
} else if (a.equals("-G")) {
tests.add(new GenericReaderTest());
} else if (a.equals("-Gd")) {
@@ -70,6 +69,11 @@ public class Perf {
tests.add(new GenericReaderWithOutOfOrderTest());
} else if (a.equals("-Gp")) {
tests.add(new GenericReaderWithPromotionTest());
+ } else if (a.equals("-GoneTimeUse")) {
+ tests.add(new GenericReaderTest());
+ tests.add(new GenericReaderOneTimeUseReaderTest());
+ tests.add(new GenericReaderOneTimeUseDecoderTest());
+ tests.add(new GenericReaderOneTimeUseTest());
} else {
usage();
System.exit(1);
@@ -85,8 +89,12 @@ public class Perf {
new ReadDouble(),
new ReadBoolean(),
new RepeaterTest(), new NestedRecordTest(),
- new ResolverTest(), new MigrationTest(),
- new GenericReaderTest(), new GenericReaderWithDefaultTest(),
+ new ResolverTest(), new MigrationWithDefaultTest(),
+ new GenericReaderTest(),
+ new GenericReaderOneTimeUseReaderTest(),
+ new GenericReaderOneTimeUseDecoderTest(),
+ new GenericReaderOneTimeUseTest(),
+ new GenericReaderWithDefaultTest(),
new GenericReaderWithOutOfOrderTest(),
new GenericReaderWithPromotionTest()
}));
@@ -144,10 +152,15 @@ public class Perf {
this.count = count;
}
- protected void generateRepeaterData(Encoder e) throws IOException {
+ protected final void generateRepeaterDataArray(Encoder e) throws IOException {
e.writeArrayStart();
e.setItemCount(count);
Random r = newRandom();
+ generateRepeatData(e, r, count);
+ e.writeArrayEnd();
+ }
+
+ protected final void generateRepeatData(Encoder e, Random r, int count) throws IOException {
for (int i = 0; i < count; i++) {
e.writeDouble(r.nextDouble());
e.writeDouble(r.nextDouble());
@@ -155,8 +168,7 @@ public class Perf {
e.writeInt(r.nextInt());
e.writeInt(r.nextInt());
e.writeInt(r.nextInt());
- }
- e.writeArrayEnd();
+ }
}
}
@@ -339,16 +351,19 @@ public class Perf {
}
}
- private static final String REPEATER_SCHEMA =
- "{ \"type\": \"array\", \"items\":\n"
- + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ private static final String SCHEMA =
+ "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"name\": \"f1\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f2\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f3\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f4\", \"type\": \"int\" },\n"
+ "{ \"name\": \"f5\", \"type\": \"int\" },\n"
+ "{ \"name\": \"f6\", \"type\": \"int\" }\n"
- + "] } }";
+ + "] }";
+
+ private static final String REPEATER_SCHEMA =
+ "{ \"type\": \"array\", \"items\":\n"
+ + SCHEMA + " }";
private static class RepeaterTest extends DecoderTest {
public RepeaterTest() throws IOException {
@@ -361,7 +376,8 @@ public class Perf {
@Override
protected void genData(Encoder e) throws IOException {
- generateRepeaterData(e);
+ generateRepeaterDataArray(e);
+ e.flush();
}
@Override
@@ -395,8 +411,7 @@ public class Perf {
}
private static final String MIGRATION_SCHEMA_WITH_DEFAULT =
- "{ \"type\": \"array\", \"items\":\n"
- + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"name\": \"f1\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f2\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f3\", \"type\": \"double\" },\n"
@@ -407,39 +422,41 @@ public class Perf {
+ "\"default\": \"undefined\" },\n"
+ "{ \"name\": \"f8\", \"type\": \"string\","
+ "\"default\": \"undefined\" }\n"
- + "] } }";
+ + "] }";
+
+ private static final String MIGRATION_SCHEMA_WITH_DEFAULT_REPEATER =
+ "{ \"type\": \"array\", \"items\":\n"
+ + MIGRATION_SCHEMA_WITH_DEFAULT + " }";
private static final String MIGRATION_SCHEMA_WITH_OUT_OF_ORDER =
- "{ \"type\": \"array\", \"items\":\n"
- + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"name\": \"f1\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f3\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f5\", \"type\": \"int\" },\n"
+ "{ \"name\": \"f2\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f4\", \"type\": \"int\" },\n"
+ "{ \"name\": \"f6\", \"type\": \"int\" }\n"
- + "] } }";
+ + "] }";
private static final String MIGRATION_SCHEMA_WITH_PROMOTION =
- "{ \"type\": \"array\", \"items\":\n"
- + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+ "{ \"name\": \"f1\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f2\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f3\", \"type\": \"double\" },\n"
+ "{ \"name\": \"f4\", \"type\": \"long\" },\n"
+ "{ \"name\": \"f5\", \"type\": \"long\" },\n"
+ "{ \"name\": \"f6\", \"type\": \"long\" }\n"
- + "] } }";
+ + "] }";
/**
* Tests the performance of introducing default values.
*/
- private static class MigrationTest extends RepeaterTest {
+ private static class MigrationWithDefaultTest extends RepeaterTest {
private final Schema readerSchema;
- public MigrationTest() throws IOException {
- super("MigrationTest");
- readerSchema = Schema.parse(MIGRATION_SCHEMA_WITH_DEFAULT);
+ public MigrationWithDefaultTest() throws IOException {
+ super("MigrationWithDefaultTest");
+ readerSchema = Schema.parse(MIGRATION_SCHEMA_WITH_DEFAULT_REPEATER);
}
@Override
@@ -465,6 +482,8 @@ public class Perf {
private static class GenericReaderTest extends Test {
public final Schema writerSchema;
+ protected final GenericDatumReader<Object> r;
+ protected BinaryDecoder d;
public GenericReaderTest() throws IOException {
this("GenericReaderTest");
@@ -472,36 +491,97 @@ public class Perf {
public GenericReaderTest(String name) throws IOException {
super(name, CYCLES, COUNT/12);
- this.writerSchema = Schema.parse(REPEATER_SCHEMA);
+ this.writerSchema = Schema.parse(SCHEMA);
ByteArrayOutputStream bao = new ByteArrayOutputStream();
Encoder e = new BinaryEncoder(bao);
- generateRepeaterData(e);
+ genData(e);
data = bao.toByteArray();
+ r = createReader();
+ d = null;
}
@Override
- public final long read() throws IOException {
- GenericDatumReader<Object> r = getReader();
+ public long read() throws IOException {
long t = System.nanoTime();
- Decoder d =
- DecoderFactory.defaultFactory().createBinaryDecoder(data, null);
+ d = initDecoder(d);
Object reuse = null;
- for (; ;) {
- try {
- reuse = r.read(reuse, d);
- } catch (EOFException e) {
- break;
- }
+ for (int i = 0; i < this.count; i++) {
+ try {
+ GenericDatumReader<Object> reader = getReader();
+ Decoder decoder = getDecoder();
+ reuse = reader.read(reuse, decoder);
+ } catch (Exception e) {
+ System.out.println(i + " out of " + count + " " + name);
+ e.printStackTrace();
+ break;
+ }
}
-
return (System.nanoTime() - t);
}
- protected GenericDatumReader<Object> getReader() throws IOException {
+ protected void genData(Encoder e) throws IOException {
+ generateRepeatData(e, new Random(), count);
+ e.flush();
+ }
+
+ protected GenericDatumReader<Object> createReader() throws IOException {
return new GenericDatumReader<Object>(writerSchema);
}
+
+ protected GenericDatumReader<Object> getReader() throws IOException {
+ return r;
+ }
+
+ protected BinaryDecoder initDecoder(BinaryDecoder reuse) {
+ return DecoderFactory.defaultFactory().createBinaryDecoder(data, reuse);
+ }
+
+ protected Decoder getDecoder() {
+ return d;
+ }
+ }
+
+ private static class GenericReaderOneTimeUseReaderTest extends GenericReaderTest {
+
+ public GenericReaderOneTimeUseReaderTest() throws IOException {
+ super("GenericReaderOneTimeUseReaderTest");
+ }
+
+ @Override
+ protected GenericDatumReader<Object> getReader() throws IOException {
+ return createReader();
+ }
+ }
+
+
+ private static class GenericReaderOneTimeUseDecoderTest extends GenericReaderTest {
+
+ public GenericReaderOneTimeUseDecoderTest() throws IOException {
+ super("GenericReaderOneTimeUseDecoderTest");
+ }
+
+ @Override
+ protected Decoder getDecoder() {
+ return initDecoder(null);
+ }
}
+
+ private static class GenericReaderOneTimeUseTest extends GenericReaderTest {
+ public GenericReaderOneTimeUseTest() throws IOException {
+ super("GenericReaderOneTimeUseTest");
+ }
+ @Override
+ protected GenericDatumReader<Object> getReader() throws IOException {
+ return createReader();
+ }
+
+ @Override
+ protected Decoder getDecoder() {
+ return initDecoder(null);
+ }
+ }
+
private static class GenericReaderWithMigrationTest extends GenericReaderTest {
private final Schema readerSchema;
protected GenericReaderWithMigrationTest(String name, String readerSchema)
@@ -553,6 +633,7 @@ public class Perf {
}
}
+
private static void usage() {
System.out.println("Usage: Perf { -i | -ls | -l | -f | -d | -b | -R | -N " +
"| -S | -M | -G | -Gd | -Go | Gp }");