You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2013/05/08 01:19:04 UTC

svn commit: r1480130 - in /gora/branches/GORA_174/gora-hbase/src: main/java/org/apache/gora/hbase/store/HBaseStore.java main/java/org/apache/gora/hbase/util/HBaseByteInterface.java test/java/org/apache/gora/hbase/store/TestHBaseStore.java

Author: alfonsonishikawa
Date: Tue May  7 23:19:03 2013
New Revision: 1480130

URL: http://svn.apache.org/r1480130
Log:
GORA-207: Fixing some bugs. This is the last patch about GORA-207 (hopefully).

Full support of unions (2 types unions, 3 types unions,...) :

- Support of ["null",type] (a.k.a. optional field).
- Support for mutitypes(3+) unions.
- Support of nested unions.
- Support of recursive optional records.
- Support of unions as value in maps and arrays.
- Serialization of topmost optional fields of the main record in "raw": topmost ["null","type"] (optional field) will be persisted like if it was ["type"] (and non-existant column === null). This ensures data form 0.2.1 can be read.

Modified:
    gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
    gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
    gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java

Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1480130&r1=1480129&r2=1480130&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Tue May  7 23:19:03 2013
@@ -200,6 +200,17 @@ implements Configurable {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   * Serializes the Persistent data and saves in HBase.
+   * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
+   * in maps and arrays too.
+   * 
+   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
+   * deleted if value==null (so value read after will be null).
+   * 
+   * @param persistent Record to be persisted in HBase
+   */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void put(K key, T persistent) {
@@ -234,8 +245,14 @@ implements Configurable {
                   case DIRTY:
                     byte[] qual = Bytes.toBytes(mapKey.toString());
                     byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
-                    put.add(hcol.getFamily(), qual, val);
-                    hasPuts = true;
+                    // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+                    if (val == null) { // value == null => must delete the column
+                      delete.deleteColumn(hcol.getFamily(), qual);
+                      hasDeletes = true;
+                    } else {
+                      put.add(hcol.getFamily(), qual, val);
+                      hasPuts = true;
+                    }
                     break;
                   case DELETED:
                     qual = Bytes.toBytes(mapKey.toString());
@@ -248,9 +265,15 @@ implements Configurable {
               Set<Map.Entry> set = ((Map)o).entrySet();
               for(Entry entry: set) {
                 byte[] qual = toBytes(entry.getKey());
-                byte[] val = toBytes(entry.getValue());
-                put.add(hcol.getFamily(), qual, val);
-                hasPuts = true;
+                byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
+                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+                if (val == null) { // value == null => must delete the column
+                  delete.deleteColumn(hcol.getFamily(), qual);
+                  hasDeletes = true;
+                } else {
+                  put.add(hcol.getFamily(), qual, val);
+                  hasPuts = true;
+                }
               }
             }
             break;
@@ -259,15 +282,28 @@ implements Configurable {
               GenericArray arr = (GenericArray) o;
               int j=0;
               for(Object item : arr) {
-                byte[] val = toBytes(item);
-                put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
-                hasPuts = true;
+                byte[] val = toBytes(item, field.schema().getElementType());
+                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+                if (val == null) { // value == null => must delete the column
+                  delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
+                  hasDeletes = true;
+                } else {
+                  put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
+                  hasPuts = true;
+                }
               }
             }
             break;
           default:
-            put.add(hcol.getFamily(), hcol.getQualifier(), toBytes(o, field.schema()));
-            hasPuts = true;
+            // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+            byte[] serializedBytes = toBytes(o, field.schema()) ;
+            if (serializedBytes == null) { // value == null => must delete the column
+              delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
+              hasDeletes = true;
+            } else {
+              put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
+              hasPuts = true;
+            }
             break;
         }
       }
@@ -563,8 +599,7 @@ implements Configurable {
           setField(persistent, field, arr);
           break;
         default:
-          byte[] val =
-            result.getValue(col.getFamily(), col.getQualifier());
+          byte[] val = result.getValue(col.getFamily(), col.getQualifier());
           if (val == null) {
             continue;
           }

Modified: gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1480130&r1=1480129&r2=1480130&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/branches/GORA_174/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Tue May  7 23:19:03 2013
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
@@ -88,7 +89,17 @@ public class HBaseByteInterface {
       };
   };
 
-
+  /**
+   * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
+   * complex type (Persistent/Record).
+   * 
+   * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
+   * 
+   * @param schema Avro schema describing the expected data
+   * @param val array of bytes with the data serialized
+   * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
+   * @throws IOException
+   */
   @SuppressWarnings("rawtypes")
   public static Object fromBytes(Schema schema, byte[] val) throws IOException {
     Type type = schema.getType();
@@ -102,12 +113,43 @@ public class HBaseByteInterface {
     case DOUBLE:  return Bytes.toDouble(val);
     case BOOLEAN: return val[0] != 0;
     case UNION:
+      // XXX Special case: When reading the top-level field of a record we must handle the
+      // special case ["null","type"] definitions: this will be written as if it was ["type"]
+      // if not in a special case, will execute "case RECORD".
+      
+      // if 'val' is empty we ignore the special case (will match Null in "case RECORD")  
+      if (schema.getTypes().size() == 2) {
+        
+        // schema [type0, type1]
+        Type type0 = schema.getTypes().get(0).getType() ;
+        Type type1 = schema.getTypes().get(1).getType() ;
+        
+        // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+        if (!type0.equals(type1)
+            && (   type0.equals(Schema.Type.NULL)
+                || type1.equals(Schema.Type.NULL))) {
+
+          if (type0.equals(Schema.Type.NULL))
+            schema = schema.getTypes().get(1) ;
+          else 
+            schema = schema.getTypes().get(0) ;
+          
+          return fromBytes(schema, val) ; // Deserialize as if schema was ["type"] 
+        }
+        
+      }
+      // else
+      //   type = [type0,type1] where type0=type1
+      //   or val == null
+      // => deserialize like "case RECORD"
+
     case RECORD:
       Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
       PersistentDatumReader<?> reader = null ;
             
-      // For UNION schemas, must use a specific (UNION-type-type-type)
-      // since unions don't have own name
+      // For UNION schemas, must use a specific PersistentDatumReader
+      // from the readerMap since unions don't have own name
+      // (key name in map will be "UNION-type-type-...")
       if (schema.getType().equals(Schema.Type.UNION)) {
         reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
         if (reader == null) {
@@ -137,6 +179,13 @@ public class HBaseByteInterface {
     }
   }
 
+  /**
+   * Converts an array of bytes to the target <em>basic class</em>.
+   * @param clazz (Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8).class
+   * @param val array of bytes with the value
+   * @return an instance of <code>clazz</code> with the bytes in <code>val</code>
+   *         deserialized with org.apache.hadoop.hbase.util.Bytes
+   */
   @SuppressWarnings("unchecked")
   public static <K> K fromBytes(Class<K> clazz, byte[] val) {
     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
@@ -161,6 +210,11 @@ public class HBaseByteInterface {
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }
 
+  /**
+   * Converts an instance of a <em>basic class</em> to an array of bytes.
+   * @param o Instance of Enum|Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8
+   * @return array of bytes with <code>o</code> serialized with org.apache.hadoop.hbase.util.Bytes
+   */
   public static byte[] toBytes(Object o) {
     Class<?> clazz = o.getClass();
     if (clazz.equals(Enum.class)) {
@@ -187,6 +241,14 @@ public class HBaseByteInterface {
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }
 
+  /**
+   * Serializes an object following the given schema.
+   * Does not handle <code>array/map</code> if it is not inside a <code>record</code>
+   * @param o Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Enum|Persistent
+   * @param schema The schema describing the object (or a compatible description)
+   * @return array of bytes of the serialized object
+   * @throws IOException
+   */
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public static byte[] toBytes(Object o, Schema schema) throws IOException {
     Type type = schema.getType();
@@ -200,11 +262,40 @@ public class HBaseByteInterface {
     case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
     case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
     case UNION:
+      // XXX Special case: When writing the top-level field of a record we must handle the
+      // special case ["null","type"] definitions: this will be written as if it was ["type"]
+      // if not in a special case, will execute "case RECORD".
+      
+      if (schema.getTypes().size() == 2) {
+        
+        // schema [type0, type1]
+        Type type0 = schema.getTypes().get(0).getType() ;
+        Type type1 = schema.getTypes().get(1).getType() ;
+        
+        // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+        if (!type0.equals(type1)
+            && (   type0.equals(Schema.Type.NULL)
+                || type1.equals(Schema.Type.NULL))) {
+
+          if (o == null) return null ;
+          
+          int index = GenericData.get().resolveUnion(schema, o);
+          schema = schema.getTypes().get(index) ;
+          
+          return toBytes(o, schema) ; // Serialize as if schema was ["type"] 
+        }
+        
+      }
+      // else
+      //   type = [type0,type1] where type0=type1
+      // => Serialize like "case RECORD" with Avro
+      
     case RECORD:
       Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
       PersistentDatumWriter writer = null ;
-      // For UNION schemas, must use a specific (UNION-type-type-type)
-      // since unions don't have own name
+      // For UNION schemas, must use a specific PersistentDatumReader
+      // from the readerMap since unions don't have own name
+      // (key name in map will be "UNION-type-type-...")
       if (schema.getType().equals(Schema.Type.UNION)) {
         writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
         if (writer == null) {

Modified: gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java?rev=1480130&r1=1480129&r2=1480130&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java (original)
+++ gora/branches/GORA_174/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Tue May  7 23:19:03 2013
@@ -19,21 +19,29 @@
 package org.apache.gora.hbase.store;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Properties;
 
 import junit.framework.Assert;
 
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.examples.WebPageDataCreator;
 import org.apache.gora.examples.generated.Employee;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.hbase.GoraHBaseTestDriver;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.DataStoreTestUtil;
+import org.apache.gora.util.GoraException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
 
 /**
  * Test case for HBaseStore.
@@ -94,21 +102,114 @@ public class TestHBaseStore extends Data
   }
   
   
+  /**
+   * Asserts that writing bytes actually works at low level in HBase.
+   * Checks writing null unions too.
+   */
   @Override
   public void assertPutBytes(byte[] contentBytes) throws IOException {    
 
+    // Check first the parameter "contentBytes" if written+read right.
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+    
+    byte[] actualBytes = result.getValue(Bytes.toBytes("content"), null);
+    Assert.assertNotNull(actualBytes);
+    Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+    table.close();    
+
     // Since "content" is an optional field, we are forced to reopen the DataStore
     // to retrieve the union correctly
-    webPageStore = testDriver.createDataStore(String.class, WebPage.class);
     
+    // Test writing+reading a null value. FIELD in HBASE MUST become DELETED
     WebPage page = webPageStore.get("com.example/http") ;
-    byte[] actualBytes = page.getContent().array() ;
+    page.setContent(null) ;
+    webPageStore.put("com.example/http", page) ;
+    webPageStore.close() ;
+    webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+    page = webPageStore.get("com.example/http") ;
+    Assert.assertNull(page.getContent()) ;
+    // Check directly with HBase
+    table = new HTable("WebPage");
+    get = new Get(Bytes.toBytes("com.example/http"));
+    result = table.get(get);
+    actualBytes = result.getValue(Bytes.toBytes("content"), null);
+    Assert.assertNull(actualBytes);
+    table.close();
     
+    // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0])
+    page = webPageStore.get("com.example/http") ;
+    page.setContent(ByteBuffer.wrap("".getBytes())) ;
+    webPageStore.put("com.example/http", page) ;
     webPageStore.close() ;
-
+    webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+    page = webPageStore.get("com.example/http") ;
+    Assert.assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
+    // Check directly with HBase
+    table = new HTable("WebPage");
+    get = new Get(Bytes.toBytes("com.example/http"));
+    result = table.get(get);
+    actualBytes = result.getValue(Bytes.toBytes("content"), null);
     Assert.assertNotNull(actualBytes);
-    Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+    Assert.assertEquals(0, actualBytes.length) ;
+    table.close();
+    
+  }
+  
+  /**
+   * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format
+   * @throws Exception
+   */
+  @Test
+  public void assertTopLevelUnions() throws Exception {
+    WebPage page = webPageStore.newPersistent();
+
+    // Write webpage data
+    page.setUrl(new Utf8("http://example.com"));
+    byte[] contentBytes = "example content in example.com".getBytes();
+    ByteBuffer buff = ByteBuffer.wrap(contentBytes);
+    page.setContent(buff);
+    webPageStore.put("com.example/http", page);
+    webPageStore.flush() ;
+    
+    // Read directly from HBase
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+    byte[] bytesRead = result.getValue(Bytes.toBytes("content"), null);
+    
+    Assert.assertNotNull(bytesRead) ;
+    Assert.assertTrue(Arrays.equals(bytesRead, contentBytes));
+  }
+  
+  /**
+   * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
+   * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized
+   * with Avro.
+   * @throws Exception
+   */
+  @Test
+  public void assertTopLevelUnionsNull() throws Exception {
+    WebPage page = webPageStore.newPersistent();
+    
+    // Write webpage data
+    page.setUrl(new Utf8("http://example.com"));
+    page.setContent(null);     // This won't change internal field status to dirty, so
+    page.setDirty("content") ; // need to change it manually
+    webPageStore.put("com.example/http", page);
+    webPageStore.flush() ;
+    
+    // Read directly from HBase
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+        
+    byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null);
 
+    Assert.assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ;
+    Assert.assertTrue(contentBytes == null || contentBytes.length == 0) ;
   }
   
   @Override