You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/11 00:19:05 UTC

svn commit: r984240 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/generic/ lang/java/src/java/org/apache/avro/io/ lang/java/src/java/org/apache/avro/util/ lang/java/src/test/java/org/apache/avro/io/

Author: cutting
Date: Tue Aug 10 22:19:05 2010
New Revision: 984240

URL: http://svn.apache.org/viewvc?rev=984240&view=rev
Log:
AVRO-557. Java: Cache ResolvingDecoder instances in GenericDatumReader.

Added:
    avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Aug 10 22:19:05 2010
@@ -103,6 +103,9 @@ Avro 1.4.0 (unreleased)
     accepts both reader's and writer's schema.  Also improve javadoc
     for related constructors and setters.  (Stu Hood via cutting)
 
+    AVRO-557. Java: Cache ResolvingDecoder instances, speeding
+    DatumReader benchmarks by 5x to 9x.  (cutting & scotcarey)
+
   BUG FIXES
 
     AVRO-502. Memory leak from parsing JSON schema.

Modified: avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericDatumReader.java Tue Aug 10 22:19:05 2010
@@ -29,12 +29,13 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.util.Utf8;
+import org.apache.avro.util.WeakIdentityHashMap;
 
 /** {@link DatumReader} for generic Java objects. */
 public class GenericDatumReader<D> implements DatumReader<D> {
   private Schema actual;
   private Schema expected;
-  private Object resolver;
+  private ResolvingDecoder resolver;
 
   public GenericDatumReader() {}
 
@@ -64,14 +65,36 @@ public class GenericDatumReader<D> imple
     this.expected = reader;
   }
 
-  @SuppressWarnings("unchecked")
-  public D read(D reuse, Decoder in) throws IOException {
+  private static final ThreadLocal<Map<Schema,Map<Schema,ResolvingDecoder>>>
+    RESOLVER_CACHE =
+    new ThreadLocal<Map<Schema,Map<Schema,ResolvingDecoder>>>() {
+    protected Map<Schema,Map<Schema,ResolvingDecoder>> initialValue() {
+      return new WeakIdentityHashMap<Schema,Map<Schema,ResolvingDecoder>>();
+    }
+  };
+
+  private static ResolvingDecoder getResolver(Schema actual, Schema expected)
+    throws IOException {
+    Map<Schema,ResolvingDecoder> cache = RESOLVER_CACHE.get().get(actual);
+    if (cache == null) {
+      cache = new WeakIdentityHashMap<Schema,ResolvingDecoder>();
+      RESOLVER_CACHE.get().put(actual, cache);
+    }
+    ResolvingDecoder resolver = cache.get(expected);
     if (resolver == null) {
-      resolver = ResolvingDecoder.resolve(actual, expected);
+      resolver = new ResolvingDecoder(actual, expected, null);
+      cache.put(expected, resolver);
     }
-    ResolvingDecoder r = new ResolvingDecoder(resolver, in);
-    D result = (D) read(reuse, expected, r);
-    r.drain();
+    return resolver;
+  }
+
+  @SuppressWarnings("unchecked")
+  public D read(D reuse, Decoder in) throws IOException {
+    if (resolver == null)
+      resolver = getResolver(actual, expected);
+    resolver.init(in);
+    D result = (D) read(reuse, expected, resolver);
+    resolver.drain();
     return result;
   }
   

Modified: avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/io/ValidatingDecoder.java Tue Aug 10 22:19:05 2010
@@ -45,6 +45,12 @@ public class ValidatingDecoder extends P
     this(new ValidatingGrammarGenerator().generate(schema), in);
   }
 
+  /** Re-initialize, reading from a new underlying Decoder. */
+  public void init(Decoder in) throws IOException {
+    parser.reset();
+    this.in = in;
+  }
+
   @Override
   public void init(InputStream in) throws IOException {
     parser.reset();

Added: avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java?rev=984240&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/util/WeakIdentityHashMap.java Tue Aug 10 22:19:05 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.avro.util;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implements a combination of WeakHashMap and IdentityHashMap.
+ * Useful for caches that need to key off of a == comparison
+ * instead of a .equals.
+ * 
+ * <b>
+ * This class is not a general-purpose Map implementation! While
+ * this class implements the Map interface, it intentionally violates
+ * Map's general contract, which mandates the use of the equals method
+ * when comparing objects. This class is designed for use only in the
+ * rare cases wherein reference-equality semantics are required.
+ * 
+ * Note that this implementation is not synchronized.
+ * </b>
+ */
+public class WeakIdentityHashMap<K, V> implements Map<K, V> {
+  private final ReferenceQueue<K> queue = new ReferenceQueue<K>();
+  private Map<IdentityWeakReference, V> backingStore
+    = new HashMap<IdentityWeakReference, V>();
+
+  public WeakIdentityHashMap() {}
+
+  public void clear() {
+    backingStore.clear();
+    reap();
+  }
+
+  public boolean containsKey(Object key) {
+    reap();
+    return backingStore.containsKey(new IdentityWeakReference(key));
+  }
+
+  public boolean containsValue(Object value)  {
+    reap();
+    return backingStore.containsValue(value);
+  }
+
+  public Set<Map.Entry<K, V>> entrySet() {
+    reap();
+    Set<Map.Entry<K, V>> ret = new HashSet<Map.Entry<K, V>>();
+    for (Map.Entry<IdentityWeakReference, V> ref : backingStore.entrySet()) {
+      final K key = ref.getKey().get();
+      final V value = ref.getValue();
+      Map.Entry<K, V> entry = new Map.Entry<K, V>() {
+        public K getKey() {
+          return key;
+        }
+        public V getValue() {
+          return value;
+        }
+        public V setValue(V value) {
+          throw new UnsupportedOperationException();
+        }
+      };
+      ret.add(entry);
+    }
+    return Collections.unmodifiableSet(ret);
+  }
+
+  public Set<K> keySet() {
+    reap();
+    Set<K> ret = new HashSet<K>();
+    for (IdentityWeakReference ref : backingStore.keySet()) {
+      ret.add(ref.get());
+    }
+    return Collections.unmodifiableSet(ret);
+  }
+
+  public boolean equals(Object o) {
+    return backingStore.equals(((WeakIdentityHashMap)o).backingStore);
+  }
+
+  public V get(Object key) {
+    reap();
+    return backingStore.get(new IdentityWeakReference(key));
+  }
+  public V put(K key, V value) {
+    reap();
+    return backingStore.put(new IdentityWeakReference(key), value);
+  }
+
+  public int hashCode() {
+    reap();
+    return backingStore.hashCode();
+  }
+  public boolean isEmpty() {
+    reap();
+    return backingStore.isEmpty();
+  }
+  public void putAll(Map t) {
+    throw new UnsupportedOperationException();
+  }
+  public V remove(Object key) {
+    reap();
+    return backingStore.remove(new IdentityWeakReference(key));
+  }
+  public int size() {
+    reap();
+    return backingStore.size();
+  }
+  public Collection<V> values() {
+    reap();
+    return backingStore.values();
+  }
+
+  private synchronized void reap() {
+      Object zombie = queue.poll();
+
+      while (zombie != null) {
+        IdentityWeakReference victim = (IdentityWeakReference)zombie;
+        backingStore.remove(victim);
+        zombie = queue.poll();
+      }
+    }
+
+  class IdentityWeakReference extends WeakReference<K> {
+    int hash;
+        
+    @SuppressWarnings("unchecked")
+      IdentityWeakReference(Object obj) {
+      super((K)obj, queue);
+      hash = System.identityHashCode(obj);
+    }
+
+    public int hashCode() {
+      return hash;
+    }
+
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      IdentityWeakReference ref = (IdentityWeakReference)o;
+      if (this.get() == ref.get()) {
+        return true;
+      }
+      return false;
+    }
+  }
+}

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java?rev=984240&r1=984239&r2=984240&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java Tue Aug 10 22:19:05 2010
@@ -18,7 +18,6 @@
 package org.apache.avro.io;
 
 import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -61,7 +60,7 @@ public class Perf {
       } else if (a.equals("-S")) {
         tests.add(new ResolverTest());
       } else if (a.equals("-M")) {
-        tests.add(new MigrationTest());
+        tests.add(new MigrationWithDefaultTest());
       } else if (a.equals("-G")) {
         tests.add(new GenericReaderTest());
       } else if (a.equals("-Gd")) {
@@ -70,6 +69,11 @@ public class Perf {
         tests.add(new GenericReaderWithOutOfOrderTest());
       } else if (a.equals("-Gp")) {
         tests.add(new GenericReaderWithPromotionTest());
+      } else if (a.equals("-GoneTimeUse")) {
+        tests.add(new GenericReaderTest());
+        tests.add(new GenericReaderOneTimeUseReaderTest());
+        tests.add(new GenericReaderOneTimeUseDecoderTest());
+        tests.add(new GenericReaderOneTimeUseTest());
       } else {
         usage();
         System.exit(1);
@@ -85,8 +89,12 @@ public class Perf {
           new ReadDouble(),
           new ReadBoolean(),
           new RepeaterTest(), new NestedRecordTest(),
-          new ResolverTest(), new MigrationTest(),
-          new GenericReaderTest(), new GenericReaderWithDefaultTest(),
+          new ResolverTest(), new MigrationWithDefaultTest(),
+          new GenericReaderTest(), 
+          new GenericReaderOneTimeUseReaderTest(),
+          new GenericReaderOneTimeUseDecoderTest(),
+          new GenericReaderOneTimeUseTest(),
+          new GenericReaderWithDefaultTest(),
           new GenericReaderWithOutOfOrderTest(),
           new GenericReaderWithPromotionTest()
       }));
@@ -144,10 +152,15 @@ public class Perf {
       this.count = count;
     }
     
-    protected void generateRepeaterData(Encoder e) throws IOException {
+    protected final void generateRepeaterDataArray(Encoder e) throws IOException {
       e.writeArrayStart();
       e.setItemCount(count);
       Random r = newRandom();
+      generateRepeatData(e, r, count);
+      e.writeArrayEnd();
+    }
+    
+    protected final void generateRepeatData(Encoder e, Random r, int count) throws IOException {
       for (int i = 0; i < count; i++) {
         e.writeDouble(r.nextDouble());
         e.writeDouble(r.nextDouble());
@@ -155,8 +168,7 @@ public class Perf {
         e.writeInt(r.nextInt());
         e.writeInt(r.nextInt());
         e.writeInt(r.nextInt());
-  }
-      e.writeArrayEnd();
+      }
     }
   }
   
@@ -339,16 +351,19 @@ public class Perf {
     }
   }
   
-  private static final String REPEATER_SCHEMA =
-    "{ \"type\": \"array\", \"items\":\n"
-    + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+  private static final String SCHEMA = 
+    "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
     + "{ \"name\": \"f1\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f2\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f3\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f4\", \"type\": \"int\" },\n"
     + "{ \"name\": \"f5\", \"type\": \"int\" },\n"
     + "{ \"name\": \"f6\", \"type\": \"int\" }\n"
-    + "] } }";
+    + "] }";
+  
+  private static final String REPEATER_SCHEMA =
+    "{ \"type\": \"array\", \"items\":\n"
+    + SCHEMA + " }";
 
   private static class RepeaterTest extends DecoderTest {
     public RepeaterTest() throws IOException {
@@ -361,7 +376,8 @@ public class Perf {
     
     @Override
     protected void genData(Encoder e) throws IOException {
-      generateRepeaterData(e);
+      generateRepeaterDataArray(e);
+      e.flush();
     }
     
     @Override
@@ -395,8 +411,7 @@ public class Perf {
   }
 
   private static final String MIGRATION_SCHEMA_WITH_DEFAULT =
-    "{ \"type\": \"array\", \"items\":\n"
-    + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+    "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
     + "{ \"name\": \"f1\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f2\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f3\", \"type\": \"double\" },\n"
@@ -407,39 +422,41 @@ public class Perf {
       + "\"default\": \"undefined\" },\n"
     + "{ \"name\": \"f8\", \"type\": \"string\","
       + "\"default\": \"undefined\" }\n"
-    + "] } }";
+    + "] }";
+  
+  private static final String MIGRATION_SCHEMA_WITH_DEFAULT_REPEATER = 
+    "{ \"type\": \"array\", \"items\":\n"
+    + MIGRATION_SCHEMA_WITH_DEFAULT + " }";
 
   private static final String MIGRATION_SCHEMA_WITH_OUT_OF_ORDER =
-    "{ \"type\": \"array\", \"items\":\n"
-    + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+    "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
     + "{ \"name\": \"f1\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f3\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f5\", \"type\": \"int\" },\n"
     + "{ \"name\": \"f2\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f4\", \"type\": \"int\" },\n"
     + "{ \"name\": \"f6\", \"type\": \"int\" }\n"
-    + "] } }";
+    + "] }";
 
   private static final String MIGRATION_SCHEMA_WITH_PROMOTION =
-    "{ \"type\": \"array\", \"items\":\n"
-    + "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
+    "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
     + "{ \"name\": \"f1\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f2\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f3\", \"type\": \"double\" },\n"
     + "{ \"name\": \"f4\", \"type\": \"long\" },\n"
     + "{ \"name\": \"f5\", \"type\": \"long\" },\n"
     + "{ \"name\": \"f6\", \"type\": \"long\" }\n"
-    + "] } }";
+    + "] }";
 
 
   /**
    * Tests the performance of introducing default values.
    */
-  private static class MigrationTest extends RepeaterTest {
+  private static class MigrationWithDefaultTest extends RepeaterTest {
     private final Schema readerSchema;
-    public MigrationTest() throws IOException {
-      super("MigrationTest");
-      readerSchema = Schema.parse(MIGRATION_SCHEMA_WITH_DEFAULT);
+    public MigrationWithDefaultTest() throws IOException {
+      super("MigrationWithDefaultTest");
+      readerSchema = Schema.parse(MIGRATION_SCHEMA_WITH_DEFAULT_REPEATER);
     }
     
     @Override
@@ -465,6 +482,8 @@ public class Perf {
   
   private static class GenericReaderTest extends Test {
     public final Schema writerSchema;
+    protected final GenericDatumReader<Object> r;
+    protected BinaryDecoder d;
 
     public GenericReaderTest() throws IOException {
       this("GenericReaderTest");
@@ -472,36 +491,97 @@ public class Perf {
 
     public GenericReaderTest(String name) throws IOException {
       super(name, CYCLES, COUNT/12);
-      this.writerSchema = Schema.parse(REPEATER_SCHEMA);
+      this.writerSchema = Schema.parse(SCHEMA);
       ByteArrayOutputStream bao = new ByteArrayOutputStream();
       Encoder e = new BinaryEncoder(bao);
-      generateRepeaterData(e);
+      genData(e);
       data = bao.toByteArray();
+      r = createReader();
+      d = null;
     }
 
     @Override
-    public final long read() throws IOException {
-      GenericDatumReader<Object> r = getReader();
+    public long read() throws IOException {
       long t = System.nanoTime();
-      Decoder d =
-        DecoderFactory.defaultFactory().createBinaryDecoder(data, null);
+      d = initDecoder(d);
       Object reuse = null;
-      for (; ;) {
-        try {
-          reuse = r.read(reuse, d);
-        } catch (EOFException e) {
-          break;
-        }
+        for (int i = 0; i < this.count; i++) {
+          try {
+                    GenericDatumReader<Object> reader = getReader();
+          Decoder decoder = getDecoder();
+          reuse = reader.read(reuse, decoder);
+          } catch (Exception e) {
+            System.out.println(i + " out of " + count + " " + name);
+            e.printStackTrace();
+            break;
+            }
       }
-      
       return (System.nanoTime() - t);
     }
     
-    protected GenericDatumReader<Object> getReader() throws IOException {
+    protected void genData(Encoder e) throws IOException {
+      generateRepeatData(e, new Random(), count);
+      e.flush();
+    }
+    
+    protected GenericDatumReader<Object> createReader() throws IOException {
       return new GenericDatumReader<Object>(writerSchema);
     }
+    
+    protected GenericDatumReader<Object> getReader() throws IOException {
+      return r;
+    }
+    
+    protected BinaryDecoder initDecoder(BinaryDecoder reuse) {
+      return DecoderFactory.defaultFactory().createBinaryDecoder(data, reuse);
+    }
+    
+    protected Decoder getDecoder() {
+      return d;
+    }
+  }
+
+  private static class GenericReaderOneTimeUseReaderTest extends GenericReaderTest {
+    
+    public GenericReaderOneTimeUseReaderTest() throws IOException {
+      super("GenericReaderOneTimeUseReaderTest");
+    }
+
+    @Override
+    protected GenericDatumReader<Object> getReader() throws IOException {
+      return createReader();
+    }
+  }
+  
+
+  private static class GenericReaderOneTimeUseDecoderTest extends GenericReaderTest {
+    
+    public GenericReaderOneTimeUseDecoderTest() throws IOException {
+      super("GenericReaderOneTimeUseDecoderTest");
+    }
+
+    @Override
+    protected Decoder getDecoder() {
+      return initDecoder(null);
+    }
   }
+  
+  private static class GenericReaderOneTimeUseTest extends GenericReaderTest {
+    public GenericReaderOneTimeUseTest() throws IOException {
+      super("GenericReaderOneTimeUseTest");
+    }
 
+    @Override
+    protected GenericDatumReader<Object> getReader() throws IOException {
+      return createReader();
+    }
+    
+    @Override
+    protected Decoder getDecoder() {
+      return initDecoder(null);
+    }
+  }
+  
   private static class GenericReaderWithMigrationTest extends GenericReaderTest {
     private final Schema readerSchema;
     protected GenericReaderWithMigrationTest(String name, String readerSchema)
@@ -553,6 +633,7 @@ public class Perf {
     }
   }
 
+
   private static void usage() {
     System.out.println("Usage: Perf { -i | -ls | -l | -f | -d | -b | -R | -N " +
       "| -S | -M | -G | -Gd | -Go | Gp }");