You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by fe...@apache.org on 2012/02/19 00:10:29 UTC

svn commit: r1290902 - in /gora/trunk: ./ gora-hbase/src/main/java/org/apache/gora/hbase/util/ gora-hbase/src/test/java/org/apache/gora/hbase/util/

Author: ferdy
Date: Sat Feb 18 23:10:28 2012
New Revision: 1290902

URL: http://svn.apache.org/viewvc?rev=1290902&view=rev
Log:
GORA-88 HBaseByteInterface not thread safe

Added:
    gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/
    gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java   (with props)
Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1290902&r1=1290901&r2=1290902&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Sat Feb 18 23:10:28 2012
@@ -2,6 +2,8 @@ Gora Change Log
 
 Trunk (unreleased changes):
 
+* GORA-88 HBaseByteInterface not thread safe (ferdy)
+
 * GORA-93 [gora-cassandra] Add implementation of CassandraStore.get(key) (Sujit Pal via lewismc)
 
 * GORA-58 Upgrade Gora-Cassandra to use Cassandra 1.0.2 (lewismc)

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1290902&r1=1290901&r2=1290902&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Sat Feb 18 23:10:28 2012
@@ -18,15 +18,18 @@
 
 package org.apache.gora.hbase.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
@@ -38,18 +41,53 @@ import org.apache.hadoop.hbase.util.Byte
  * conversions.
  */
 public class HBaseByteInterface {
+  /**
+   * Threadlocals maintaining reusable binary decoders and encoders.
+   */
+  public static final ThreadLocal<BinaryDecoder> decoders =
+      new ThreadLocal<BinaryDecoder>();
+  public static final ThreadLocal<BinaryEncoderWithStream> encoders =
+      new ThreadLocal<BinaryEncoderWithStream>();
+  
+  /**
+   * A BinaryEncoder that exposes the outputstream so that it can be reset
+   * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
+   * normally provided be EncoderFactory, but this class does not exist yet 
+   * in the current Avro version).
+   */
+  public static final class BinaryEncoderWithStream extends BinaryEncoder {
+    public BinaryEncoderWithStream(OutputStream out) {
+      super(out);
+    }
+    
+    protected OutputStream getOut() {
+      return out;
+    }
+  }
+  
+  /*
+   * Create a threadlocal map for the datum readers and writers, because
+   * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
+   * When they are thread safe, it is possible to maintain a single reader and
+   * writer pair for every schema, instead of one for every thread.
+   */
+  
+  public static final ThreadLocal<Map<String, SpecificDatumReader<?>>> 
+    readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
+      protected Map<String,SpecificDatumReader<?>> initialValue() {
+        return new HashMap<String, SpecificDatumReader<?>>();
+      };
+  };
+  
+  public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>> 
+    writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
+      protected Map<String,SpecificDatumWriter<?>> initialValue() {
+        return new HashMap<String, SpecificDatumWriter<?>>();
+      };
+  };
 
-  public static final byte[] EMPTY_BYTES = new byte[0];
 
   @SuppressWarnings("rawtypes")
-  private static final SpecificDatumWriter writer =
-    new SpecificDatumWriter();
-
-  @SuppressWarnings("rawtypes")
-  private static final SpecificDatumReader reader =
-    new SpecificDatumReader();
-
-  @SuppressWarnings({ "unchecked", "deprecation" })
   public static Object fromBytes(Schema schema, byte[] val) throws IOException {
     Type type = schema.getType();
     switch (type) {
@@ -62,10 +100,22 @@ public class HBaseByteInterface {
     case DOUBLE:  return Bytes.toDouble(val);
     case BOOLEAN: return val[0] != 0;
     case RECORD:
-      // TODO: This is TOO SLOW... OPTIMIZE
-      reader.setSchema(schema);
-      reader.setExpected(schema);
-      BinaryDecoder decoder = new BinaryDecoder(new ByteArrayInputStream(val));
+      Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
+      SpecificDatumReader<?> reader = readerMap.get(schema.getFullName());
+      if (reader == null) {
+        reader = new SpecificDatumReader(schema);     
+        readerMap.put(schema.getFullName(), reader);
+      }
+      
+      // initialize a decoder, possibly reusing previous one
+      BinaryDecoder decoderFromCache = decoders.get();
+      BinaryDecoder decoder=DecoderFactory.defaultFactory().
+          createBinaryDecoder(val, decoderFromCache);
+      // put in threadlocal cache if the initial get was empty
+      if (decoderFromCache==null) {
+        decoders.set(decoder);
+      }
+      
       return reader.read(null, decoder);
     default: throw new RuntimeException("Unknown type: "+type);
     }
@@ -121,7 +171,7 @@ public class HBaseByteInterface {
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   public static byte[] toBytes(Object o, Schema schema) throws IOException {
     Type type = schema.getType();
     switch (type) {
@@ -134,10 +184,22 @@ public class HBaseByteInterface {
     case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
     case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
     case RECORD:
-      // TODO: This is TOO SLOW... OPTIMIZE
-      writer.setSchema(schema);
-      ByteArrayOutputStream os = new ByteArrayOutputStream();
-      BinaryEncoder encoder = new BinaryEncoder(os);
+      Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
+      SpecificDatumWriter writer = writerMap.get(schema.getFullName());
+      if (writer == null) {
+        writer = new SpecificDatumWriter(schema);
+        writerMap.put(schema.getFullName(),writer);
+      }
+      
+      BinaryEncoderWithStream encoder = encoders.get();
+      if (encoder == null) {
+        encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+        encoders.set(encoder);
+      }
+      //reset the buffers
+      ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+      os.reset();
+      
       writer.write(o, encoder);
       encoder.flush();
       return os.toByteArray();

Added: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java?rev=1290902&view=auto
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java (added)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java Sat Feb 18 23:10:28 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.Metadata;
+import org.apache.gora.examples.generated.TokenDatum;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHBaseByteInterface {
+
+  private static final Random RANDOM = new Random();
+
+  @Test
+  public void testEncodingDecoding() throws Exception {
+    for (int i=0; i < 1000; i++) {
+    
+      //employer
+      Utf8 name = new Utf8("john");
+      long dateOfBirth = System.currentTimeMillis();
+      int salary = 1337;
+      Utf8 ssn = new Utf8(String.valueOf(RANDOM.nextLong()));
+      
+      Employee e = new Employee();
+      e.setName(name);
+      e.setDateOfBirth(dateOfBirth);
+      e.setSalary(salary);
+      e.setSsn(ssn);
+      
+      byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee._SCHEMA);
+      Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee._SCHEMA, 
+          employerBytes);
+      
+      Assert.assertEquals(name, e2.getName());
+      Assert.assertEquals(dateOfBirth, e2.getDateOfBirth());
+      Assert.assertEquals(salary, e2.getSalary());
+      Assert.assertEquals(ssn, e2.getSsn());
+      
+      
+      //metadata
+      Utf8 key = new Utf8("theKey");
+      Utf8 value = new Utf8("theValue " + RANDOM.nextLong());
+      
+      Metadata m = new Metadata();
+      m.putToData(key, value);
+      
+      byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata._SCHEMA);
+      Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata._SCHEMA, 
+          datumBytes);
+      
+      Assert.assertEquals(value, m2.getFromData(key));
+    }
+  }
+  
+  @Test
+  public void testEncodingDecodingMultithreaded() throws Exception {
+    // create a fixed thread pool
+    int numThreads = 8;
+    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+
+    // define a list of tasks
+    Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+    for (int i = 0; i < numThreads; i++) {
+      tasks.add(new Callable<Integer>() {
+        @Override
+        public Integer call() {
+          try {
+            // run a sequence
+            testEncodingDecoding();
+            // everything ok, return 0
+            return 0;
+          } catch (Exception e) {
+            e.printStackTrace();
+            // this will fail the test
+            return 1;
+          }
+        }
+      });
+    }
+    // submit them at once
+    List<Future<Integer>> results = pool.invokeAll(tasks);
+
+    // check results
+    for (Future<Integer> result : results) {
+      Assert.assertEquals(0, (int) result.get());
+    }
+  }
+
+}
\ No newline at end of file

Propchange: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain