You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by fe...@apache.org on 2012/02/19 00:10:29 UTC
svn commit: r1290902 - in /gora/trunk: ./
gora-hbase/src/main/java/org/apache/gora/hbase/util/
gora-hbase/src/test/java/org/apache/gora/hbase/util/
Author: ferdy
Date: Sat Feb 18 23:10:28 2012
New Revision: 1290902
URL: http://svn.apache.org/viewvc?rev=1290902&view=rev
Log:
GORA-88 HBaseByteInterface not thread safe
Added:
gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/
gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java (with props)
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1290902&r1=1290901&r2=1290902&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Sat Feb 18 23:10:28 2012
@@ -2,6 +2,8 @@ Gora Change Log
Trunk (unreleased changes):
+* GORA-88 HBaseByteInterface not thread safe (ferdy)
+
* GORA-93 [gora-cassandra] Add implementation of CassandraStore.get(key) (Sujit Pal via lewismc)
* GORA-58 Upgrade Gora-Cassandra to use Cassandra 1.0.2 (lewismc)
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1290902&r1=1290901&r2=1290902&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Sat Feb 18 23:10:28 2012
@@ -18,15 +18,18 @@
package org.apache.gora.hbase.util;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
@@ -38,18 +41,53 @@ import org.apache.hadoop.hbase.util.Byte
* conversions.
*/
public class HBaseByteInterface {
+ /**
+ * Threadlocals maintaining reusable binary decoders and encoders.
+ */
+ public static final ThreadLocal<BinaryDecoder> decoders =
+ new ThreadLocal<BinaryDecoder>();
+ public static final ThreadLocal<BinaryEncoderWithStream> encoders =
+ new ThreadLocal<BinaryEncoderWithStream>();
+
+ /**
+ * A BinaryEncoder that exposes the outputstream so that it can be reset
+ * every time. (This is a workaround to reuse BinaryEncoder and the buffers,
+ * normally provided be EncoderFactory, but this class does not exist yet
+ * in the current Avro version).
+ */
+ public static final class BinaryEncoderWithStream extends BinaryEncoder {
+ public BinaryEncoderWithStream(OutputStream out) {
+ super(out);
+ }
+
+ protected OutputStream getOut() {
+ return out;
+ }
+ }
+
+ /*
+ * Create a threadlocal map for the datum readers and writers, because
+ * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
+ * When they are thread safe, it is possible to maintain a single reader and
+ * writer pair for every schema, instead of one for every thread.
+ */
+
+ public static final ThreadLocal<Map<String, SpecificDatumReader<?>>>
+ readerMaps = new ThreadLocal<Map<String, SpecificDatumReader<?>>>() {
+ protected Map<String,SpecificDatumReader<?>> initialValue() {
+ return new HashMap<String, SpecificDatumReader<?>>();
+ };
+ };
+
+ public static final ThreadLocal<Map<String, SpecificDatumWriter<?>>>
+ writerMaps = new ThreadLocal<Map<String, SpecificDatumWriter<?>>>() {
+ protected Map<String,SpecificDatumWriter<?>> initialValue() {
+ return new HashMap<String, SpecificDatumWriter<?>>();
+ };
+ };
- public static final byte[] EMPTY_BYTES = new byte[0];
@SuppressWarnings("rawtypes")
- private static final SpecificDatumWriter writer =
- new SpecificDatumWriter();
-
- @SuppressWarnings("rawtypes")
- private static final SpecificDatumReader reader =
- new SpecificDatumReader();
-
- @SuppressWarnings({ "unchecked", "deprecation" })
public static Object fromBytes(Schema schema, byte[] val) throws IOException {
Type type = schema.getType();
switch (type) {
@@ -62,10 +100,22 @@ public class HBaseByteInterface {
case DOUBLE: return Bytes.toDouble(val);
case BOOLEAN: return val[0] != 0;
case RECORD:
- // TODO: This is TOO SLOW... OPTIMIZE
- reader.setSchema(schema);
- reader.setExpected(schema);
- BinaryDecoder decoder = new BinaryDecoder(new ByteArrayInputStream(val));
+ Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
+ SpecificDatumReader<?> reader = readerMap.get(schema.getFullName());
+ if (reader == null) {
+ reader = new SpecificDatumReader(schema);
+ readerMap.put(schema.getFullName(), reader);
+ }
+
+ // initialize a decoder, possibly reusing previous one
+ BinaryDecoder decoderFromCache = decoders.get();
+ BinaryDecoder decoder=DecoderFactory.defaultFactory().
+ createBinaryDecoder(val, decoderFromCache);
+ // put in threadlocal cache if the initial get was empty
+ if (decoderFromCache==null) {
+ decoders.set(decoder);
+ }
+
return reader.read(null, decoder);
default: throw new RuntimeException("Unknown type: "+type);
}
@@ -121,7 +171,7 @@ public class HBaseByteInterface {
throw new RuntimeException("Can't parse data as class: " + clazz);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public static byte[] toBytes(Object o, Schema schema) throws IOException {
Type type = schema.getType();
switch (type) {
@@ -134,10 +184,22 @@ public class HBaseByteInterface {
case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() };
case RECORD:
- // TODO: This is TOO SLOW... OPTIMIZE
- writer.setSchema(schema);
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- BinaryEncoder encoder = new BinaryEncoder(os);
+ Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
+ SpecificDatumWriter writer = writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new SpecificDatumWriter(schema);
+ writerMap.put(schema.getFullName(),writer);
+ }
+
+ BinaryEncoderWithStream encoder = encoders.get();
+ if (encoder == null) {
+ encoder = new BinaryEncoderWithStream(new ByteArrayOutputStream());
+ encoders.set(encoder);
+ }
+ //reset the buffers
+ ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
+ os.reset();
+
writer.write(o, encoder);
encoder.flush();
return os.toByteArray();
Added: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java?rev=1290902&view=auto
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java (added)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java Sat Feb 18 23:10:28 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.Metadata;
+import org.apache.gora.examples.generated.TokenDatum;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHBaseByteInterface {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testEncodingDecoding() throws Exception {
+ for (int i=0; i < 1000; i++) {
+
+ //employer
+ Utf8 name = new Utf8("john");
+ long dateOfBirth = System.currentTimeMillis();
+ int salary = 1337;
+ Utf8 ssn = new Utf8(String.valueOf(RANDOM.nextLong()));
+
+ Employee e = new Employee();
+ e.setName(name);
+ e.setDateOfBirth(dateOfBirth);
+ e.setSalary(salary);
+ e.setSsn(ssn);
+
+ byte[] employerBytes = HBaseByteInterface.toBytes(e, Employee._SCHEMA);
+ Employee e2 = (Employee) HBaseByteInterface.fromBytes(Employee._SCHEMA,
+ employerBytes);
+
+ Assert.assertEquals(name, e2.getName());
+ Assert.assertEquals(dateOfBirth, e2.getDateOfBirth());
+ Assert.assertEquals(salary, e2.getSalary());
+ Assert.assertEquals(ssn, e2.getSsn());
+
+
+ //metadata
+ Utf8 key = new Utf8("theKey");
+ Utf8 value = new Utf8("theValue " + RANDOM.nextLong());
+
+ Metadata m = new Metadata();
+ m.putToData(key, value);
+
+ byte[] datumBytes = HBaseByteInterface.toBytes(m, Metadata._SCHEMA);
+ Metadata m2 = (Metadata) HBaseByteInterface.fromBytes(Metadata._SCHEMA,
+ datumBytes);
+
+ Assert.assertEquals(value, m2.getFromData(key));
+ }
+ }
+
+ @Test
+ public void testEncodingDecodingMultithreaded() throws Exception {
+ // create a fixed thread pool
+ int numThreads = 8;
+ ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+
+ // define a list of tasks
+ Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+ for (int i = 0; i < numThreads; i++) {
+ tasks.add(new Callable<Integer>() {
+ @Override
+ public Integer call() {
+ try {
+ // run a sequence
+ testEncodingDecoding();
+ // everything ok, return 0
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ // this will fail the test
+ return 1;
+ }
+ }
+ });
+ }
+ // submit them at once
+ List<Future<Integer>> results = pool.invokeAll(tasks);
+
+ // check results
+ for (Future<Integer> result : results) {
+ Assert.assertEquals(0, (int) result.get());
+ }
+ }
+
+}
\ No newline at end of file
Propchange: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/util/TestHBaseByteInterface.java
------------------------------------------------------------------------------
svn:mime-type = text/plain