You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2013/05/08 01:19:04 UTC
svn commit: r1480130 - in /gora/branches/GORA_174/gora-hbase/src:
main/java/org/apache/gora/hbase/store/HBaseStore.java
main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
test/java/org/apache/gora/hbase/store/TestHBaseStore.java
Author: alfonsonishikawa
Date: Tue May 7 23:19:03 2013
New Revision: 1480130
URL: http://svn.apache.org/r1480130
Log:
GORA-207: Fixing some bugs. This is the last patch about GORA-207 (hopefully).
Full support of unions (2 types unions, 3 types unions,...) :
- Support of ["null",type] (a.k.a. optional field).
- Support for mutitypes(3+) unions.
- Support of nested unions.
- Support of recursive optional records.
- Support of unions as value in maps and arrays.
- Serialization of topmost optional fields of the main record in "raw": topmost ["null","type"] (optional field) will be persisted like if it was ["type"] (and non-existant column === null). This ensures data form 0.2.1 can be read.
Modified:
gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1480130&r1=1480129&r2=1480130&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Tue May 7 23:19:03 2013
@@ -200,6 +200,17 @@ implements Configurable {
}
}
+ /**
+ * {@inheritDoc}
+ * Serializes the Persistent data and saves in HBase.
+ * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
+ * in maps and arrays too.
+ *
+ * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
+ * deleted if value==null (so value read after will be null).
+ *
+ * @param persistent Record to be persisted in HBase
+ */
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void put(K key, T persistent) {
@@ -234,8 +245,14 @@ implements Configurable {
case DIRTY:
byte[] qual = Bytes.toBytes(mapKey.toString());
byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), qual);
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), qual, val);
+ hasPuts = true;
+ }
break;
case DELETED:
qual = Bytes.toBytes(mapKey.toString());
@@ -248,9 +265,15 @@ implements Configurable {
Set<Map.Entry> set = ((Map)o).entrySet();
for(Entry entry: set) {
byte[] qual = toBytes(entry.getKey());
- byte[] val = toBytes(entry.getValue());
- put.add(hcol.getFamily(), qual, val);
- hasPuts = true;
+ byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), qual);
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), qual, val);
+ hasPuts = true;
+ }
}
}
break;
@@ -259,15 +282,28 @@ implements Configurable {
GenericArray arr = (GenericArray) o;
int j=0;
for(Object item : arr) {
- byte[] val = toBytes(item);
- put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
- hasPuts = true;
+ byte[] val = toBytes(item, field.schema().getElementType());
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ if (val == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
+ hasPuts = true;
+ }
}
}
break;
default:
- put.add(hcol.getFamily(), hcol.getQualifier(), toBytes(o, field.schema()));
- hasPuts = true;
+ // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+ byte[] serializedBytes = toBytes(o, field.schema()) ;
+ if (serializedBytes == null) { // value == null => must delete the column
+ delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
+ hasDeletes = true;
+ } else {
+ put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
+ hasPuts = true;
+ }
break;
}
}
@@ -563,8 +599,7 @@ implements Configurable {
setField(persistent, field, arr);
break;
default:
- byte[] val =
- result.getValue(col.getFamily(), col.getQualifier());
+ byte[] val = result.getValue(col.getFamily(), col.getQualifier());
if (val == null) {
continue;
}
Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1480130&r1=1480129&r2=1480130&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Tue May 7 23:19:03 2013
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
@@ -88,7 +89,17 @@ public class HBaseByteInterface {
};
};
-
+ /**
+ * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
+ * complex type (Persistent/Record).
+ *
+ * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
+ *
+ * @param schema Avro schema describing the expected data
+ * @param val array of bytes with the data serialized
+ * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
+ * @throws IOException
+ */
@SuppressWarnings("rawtypes")
public static Object fromBytes(Schema schema, byte[] val) throws IOException {
Type type = schema.getType();
@@ -102,12 +113,43 @@ public class HBaseByteInterface {
case DOUBLE: return Bytes.toDouble(val);
case BOOLEAN: return val[0] != 0;
case UNION:
+ // XXX Special case: When reading the top-level field of a record we must handle the
+ // special case ["null","type"] definitions: this will be written as if it was ["type"]
+ // if not in a special case, will execute "case RECORD".
+
+ // if 'val' is empty we ignore the special case (will match Null in "case RECORD")
+ if (schema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = schema.getTypes().get(0).getType() ;
+ Type type1 = schema.getTypes().get(1).getType() ;
+
+ // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+ if (!type0.equals(type1)
+ && ( type0.equals(Schema.Type.NULL)
+ || type1.equals(Schema.Type.NULL))) {
+
+ if (type0.equals(Schema.Type.NULL))
+ schema = schema.getTypes().get(1) ;
+ else
+ schema = schema.getTypes().get(0) ;
+
+ return fromBytes(schema, val) ; // Deserialize as if schema was ["type"]
+ }
+
+ }
+ // else
+ // type = [type0,type1] where type0=type1
+ // or val == null
+ // => deserialize like "case RECORD"
+
case RECORD:
Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
PersistentDatumReader<?> reader = null ;
- // For UNION schemas, must use a specific (UNION-type-type-type)
- // since unions don't have own name
+ // For UNION schemas, must use a specific PersistentDatumReader
+ // from the readerMap since unions don't have own name
+ // (key name in map will be "UNION-type-type-...")
if (schema.getType().equals(Schema.Type.UNION)) {
reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
if (reader == null) {
@@ -137,6 +179,13 @@ public class HBaseByteInterface {
}
}
+ /**
+ * Converts an array of bytes to the target <em>basic class</em>.
+ * @param clazz (Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8).class
+ * @param val array of bytes with the value
+ * @return an instance of <code>clazz</code> with the bytes in <code>val</code>
+ * deserialized with org.apache.hadoop.hbase.util.Bytes
+ */
@SuppressWarnings("unchecked")
public static <K> K fromBytes(Class<K> clazz, byte[] val) {
if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
@@ -161,6 +210,11 @@ public class HBaseByteInterface {
throw new RuntimeException("Can't parse data as class: " + clazz);
}
+ /**
+ * Converts an instance of a <em>basic class</em> to an array of bytes.
+ * @param o Instance of Enum|Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8
+ * @return array of bytes with <code>o</code> serialized with org.apache.hadoop.hbase.util.Bytes
+ */
public static byte[] toBytes(Object o) {
Class<?> clazz = o.getClass();
if (clazz.equals(Enum.class)) {
@@ -187,6 +241,14 @@ public class HBaseByteInterface {
throw new RuntimeException("Can't parse data as class: " + clazz);
}
+ /**
+ * Serializes an object following the given schema.
+ * Does not handle <code>array/map</code> if it is not inside a <code>record</code>
+ * @param o Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Enum|Persistent
+ * @param schema The schema describing the object (or a compatible description)
+ * @return array of bytes of the serialized object
+ * @throws IOException
+ */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static byte[] toBytes(Object o, Schema schema) throws IOException {
Type type = schema.getType();
@@ -200,11 +262,40 @@ public class HBaseByteInterface {
case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
case ENUM: return new byte[] { (byte)((Enum<?>) o).ordinal() };
case UNION:
+ // XXX Special case: When writing the top-level field of a record we must handle the
+ // special case ["null","type"] definitions: this will be written as if it was ["type"]
+ // if not in a special case, will execute "case RECORD".
+
+ if (schema.getTypes().size() == 2) {
+
+ // schema [type0, type1]
+ Type type0 = schema.getTypes().get(0).getType() ;
+ Type type1 = schema.getTypes().get(1).getType() ;
+
+ // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+ if (!type0.equals(type1)
+ && ( type0.equals(Schema.Type.NULL)
+ || type1.equals(Schema.Type.NULL))) {
+
+ if (o == null) return null ;
+
+ int index = GenericData.get().resolveUnion(schema, o);
+ schema = schema.getTypes().get(index) ;
+
+ return toBytes(o, schema) ; // Serialize as if schema was ["type"]
+ }
+
+ }
+ // else
+ // type = [type0,type1] where type0=type1
+ // => Serialize like "case RECORD" with Avro
+
case RECORD:
Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
PersistentDatumWriter writer = null ;
- // For UNION schemas, must use a specific (UNION-type-type-type)
- // since unions don't have own name
+ // For UNION schemas, must use a specific PersistentDatumReader
+ // from the readerMap since unions don't have own name
+ // (key name in map will be "UNION-type-type-...")
if (schema.getType().equals(Schema.Type.UNION)) {
writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
if (writer == null) {
Modified: gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java?rev=1480130&r1=1480129&r2=1480130&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java (original)
+++ gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Tue May 7 23:19:03 2013
@@ -19,21 +19,29 @@
package org.apache.gora.hbase.store;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Properties;
import junit.framework.Assert;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hbase.GoraHBaseTestDriver;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.DataStoreTestUtil;
+import org.apache.gora.util.GoraException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
/**
* Test case for HBaseStore.
@@ -94,21 +102,114 @@ public class TestHBaseStore extends Data
}
+ /**
+ * Asserts that writing bytes actually works at low level in HBase.
+ * Checks writing null unions too.
+ */
@Override
public void assertPutBytes(byte[] contentBytes) throws IOException {
+ // Check first the parameter "contentBytes" if written+read right.
+ HTable table = new HTable("WebPage");
+ Get get = new Get(Bytes.toBytes("com.example/http"));
+ org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+ byte[] actualBytes = result.getValue(Bytes.toBytes("content"), null);
+ Assert.assertNotNull(actualBytes);
+ Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+ table.close();
+
// Since "content" is an optional field, we are forced to reopen the DataStore
// to retrieve the union correctly
- webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+ // Test writing+reading a null value. FIELD in HBASE MUST become DELETED
WebPage page = webPageStore.get("com.example/http") ;
- byte[] actualBytes = page.getContent().array() ;
+ page.setContent(null) ;
+ webPageStore.put("com.example/http", page) ;
+ webPageStore.close() ;
+ webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+ page = webPageStore.get("com.example/http") ;
+ Assert.assertNull(page.getContent()) ;
+ // Check directly with HBase
+ table = new HTable("WebPage");
+ get = new Get(Bytes.toBytes("com.example/http"));
+ result = table.get(get);
+ actualBytes = result.getValue(Bytes.toBytes("content"), null);
+ Assert.assertNull(actualBytes);
+ table.close();
+ // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0])
+ page = webPageStore.get("com.example/http") ;
+ page.setContent(ByteBuffer.wrap("".getBytes())) ;
+ webPageStore.put("com.example/http", page) ;
webPageStore.close() ;
-
+ webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+ page = webPageStore.get("com.example/http") ;
+ Assert.assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
+ // Check directly with HBase
+ table = new HTable("WebPage");
+ get = new Get(Bytes.toBytes("com.example/http"));
+ result = table.get(get);
+ actualBytes = result.getValue(Bytes.toBytes("content"), null);
Assert.assertNotNull(actualBytes);
- Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+ Assert.assertEquals(0, actualBytes.length) ;
+ table.close();
+
+ }
+
+ /**
+ * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format
+ * @throws Exception
+ */
+ @Test
+ public void assertTopLevelUnions() throws Exception {
+ WebPage page = webPageStore.newPersistent();
+
+ // Write webpage data
+ page.setUrl(new Utf8("http://example.com"));
+ byte[] contentBytes = "example content in example.com".getBytes();
+ ByteBuffer buff = ByteBuffer.wrap(contentBytes);
+ page.setContent(buff);
+ webPageStore.put("com.example/http", page);
+ webPageStore.flush() ;
+
+ // Read directly from HBase
+ HTable table = new HTable("WebPage");
+ Get get = new Get(Bytes.toBytes("com.example/http"));
+ org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+ byte[] bytesRead = result.getValue(Bytes.toBytes("content"), null);
+
+ Assert.assertNotNull(bytesRead) ;
+ Assert.assertTrue(Arrays.equals(bytesRead, contentBytes));
+ }
+
+ /**
+ * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
+ * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized
+ * with Avro.
+ * @throws Exception
+ */
+ @Test
+ public void assertTopLevelUnionsNull() throws Exception {
+ WebPage page = webPageStore.newPersistent();
+
+ // Write webpage data
+ page.setUrl(new Utf8("http://example.com"));
+ page.setContent(null); // This won't change internal field status to dirty, so
+ page.setDirty("content") ; // need to change it manually
+ webPageStore.put("com.example/http", page);
+ webPageStore.flush() ;
+
+ // Read directly from HBase
+ HTable table = new HTable("WebPage");
+ Get get = new Get(Bytes.toBytes("com.example/http"));
+ org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+ byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null);
+ Assert.assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ;
+ Assert.assertTrue(contentBytes == null || contentBytes.length == 0) ;
}
@Override