You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2013/06/14 02:55:25 UTC

svn commit: r1492920 [1/2] - in /gora/trunk: ./ gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ gora-accumulo/src/test/resources/ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cas...

Author: lewismc
Date: Fri Jun 14 00:55:24 2013
New Revision: 1492920

URL: http://svn.apache.org/r1492920
Log:
GORA-174 GORA compiler does not handle [string, null] unions in the AVRO schema

Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
    gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
    gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
    gora/trunk/gora-core/src/examples/avro/employee.json
    gora/trunk/gora-core/src/examples/avro/webpage.json
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
    gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
    gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
    gora/trunk/gora-tutorial/conf/gora.properties
    gora/trunk/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java
    gora/trunk/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/Pageview.java
    gora/trunk/pom.xml

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jun 14 00:55:24 2013
@@ -4,12 +4,15 @@
 
 Gora Change Log
 
+* GORA-174 GORA compiler does not handle ["string", "null"] unions in the AVRO schema (alfonsonishikawa, rmarroquin, kturner via lewismc)
+  incl. GORA-206, 207 and 216.
+
 * GORA-239 Add null checks and better message in AccumuloStore (David Medinets via hsaputra)
 
 0.3 release: 05/03/2013 (mm/dd/yyyy)
-Release Report: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12311172&version=12317954
+Release Report: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12311172&version=12317954Gora Change Log
 
-* GORA-191 Support multiple Avro Schemas within GoraCompiler (Udesh Liyanaarachchi, rmarroquin, lewismc)
+* GORA-191 Support multiple Avro Schemas within GoraCompiler (Udesh Liyanaarachchi, rmarroquin, lewismc) 
 
 * GORA-159 gora-hbase MR tests should use HBaseTestingUtility instead of deprecated HBaseClusterTestCase via GORA-89 
 

Modified: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java (original)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java Fri Jun 14 00:55:24 2013
@@ -95,11 +95,11 @@ import org.apache.gora.store.impl.DataSt
 import org.apache.gora.util.AvroUtils;
 import org.apache.gora.util.GoraException;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * 
@@ -460,6 +460,7 @@ public class AccumuloStore<K,T extends P
 
           break;
         case RECORD:
+        case UNION:
           SpecificDatumReader reader = new SpecificDatumReader(field.schema());
           byte[] val = entry.getValue().get();
           // TODO reuse decoder
@@ -588,6 +589,7 @@ public class AccumuloStore<K,T extends P
             }
             break;
           case RECORD:
+          case UNION:
             SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
             ByteArrayOutputStream os = new ByteArrayOutputStream();
             BinaryEncoder encoder = new BinaryEncoder(os);

Modified: gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml (original)
+++ gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml Fri Jun 14 00:55:24 2013
@@ -38,6 +38,8 @@
     <field name="dateOfBirth" family="info" qualifier="db"/>
     <field name="ssn" family="info" qualifier="sn"/>
     <field name="salary" family="info" qualifier="sl"/>
+    <field name="boss" family="info" qualifier="bs"/>
+    <field name="webpage" family="info" qualifier="wp"/>
   </class>
   
   <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">
@@ -51,4 +53,4 @@
   <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String">
     <field name="count" family="common" qualifier="count"/>
   </class>  
-</gora-orm>  
\ No newline at end of file
+</gora-orm>  

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jun 14 00:55:24 2013
@@ -24,7 +24,6 @@ import me.prettyprint.hector.api.Seriali
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +40,15 @@ public abstract class CassandraColumn {
   private String family;
   private int type;
   private Field field;
+  private int unionType;
+
+  public void setUnionType(int pUnionType){
+    this.unionType = pUnionType;
+  }
+
+  public int getUnionType(){
+    return unionType;
+  }
   
   public String getFamily() {
     return family;
@@ -67,7 +75,7 @@ public abstract class CassandraColumn {
   
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
-    Serializer serializer = GoraSerializerTypeInferer.getSerializer(schema);
+    Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
     if (serializer == null) {
       LOG.info("Schema is not supported: " + schema.toString());
     } else {

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jun 14 00:55:24 2013
@@ -26,7 +26,8 @@ import me.prettyprint.cassandra.serializ
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.Schema.Type;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
@@ -58,6 +59,25 @@ public class CassandraResult<K, T extend
     ++this.rowNumber;
     return (this.rowNumber <= this.cassandraResultSet.size());
   }
+  
+  /**
+   * Gets the column containing the type of the union type element stored.
+   * TODO: This might seem too much of a overhead if we consider that N rows have M columns,
+   *       this might have to be reviewed to get the specific column in O(1)
+   * @param pFieldName
+   * @param pCassandraRow
+   * @return
+   */
+  private CassandraColumn getUnionTypeColumn(String pFieldName, Object[] pCassandraRow){
+    
+    for (int iCnt = 0; iCnt < pCassandraRow.length; iCnt++){
+      CassandraColumn cColumn = (CassandraColumn)pCassandraRow[iCnt];
+      String columnName = StringSerializer.get().fromByteBuffer(cColumn.getName());
+      if (pFieldName.equals(columnName))
+        return cColumn;
+    }
+    return null;
+  }
 
 
   /**
@@ -80,21 +100,42 @@ public class CassandraResult<K, T extend
       String family = cassandraColumn.getFamily();
       String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
       
-      // get field
-      int pos = this.persistent.getFieldIndex(fieldName);
-      Field field = fields.get(pos);
-      
-      // get value
-      cassandraColumn.setField(field);
-      Object value = cassandraColumn.getValue();
-      
-      this.persistent.put(pos, value);
-      // this field does not need to be written back to the store
-      this.persistent.clearDirty(pos);
+      if (fieldName != null ){
+        // get field
+        int pos = this.persistent.getFieldIndex(fieldName);
+        Field field = fields.get(pos);
+        Type fieldType = field.schema().getType();
+        System.out.println(StringSerializer.get().fromByteBuffer(cassandraColumn.getName()) + fieldName + " " + fieldType.name());
+        if (fieldType == Type.UNION){
+          // TODO getting UNION stored type
+          // TODO get value of UNION stored type. This field does not need to be written back to the store
+          cassandraColumn.setUnionType(getNonNullTypePos(field.schema().getTypes()));
+        }
+
+        // get value
+        cassandraColumn.setField(field);
+        Object value = cassandraColumn.getValue();
+
+        this.persistent.put(pos, value);
+        // this field does not need to be written back to the store
+        this.persistent.clearDirty(pos);
+      }
+      else
+        LOG.debug("FieldName was null while iterating CassandraRow and using Avro Union type");
     }
 
   }
 
+  private int getNonNullTypePos(List<Schema> pTypes){
+    int iCnt = 0;
+    for (Schema sch :  pTypes)
+      if (!sch.getName().equals("null"))
+        return iCnt;
+      else 
+        iCnt++;
+    return CassandraStore.DEFAULT_UNION_SCHEMA;
+  }
+
   @Override
   public void close() throws IOException {
     // TODO Auto-generated method stub

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jun 14 00:55:24 2013
@@ -20,6 +20,9 @@ package org.apache.gora.cassandra.query;
 
 import java.util.ArrayList;
 
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
+
 /**
  * List of key value pairs representing a row, tagged by a key.
  */
@@ -38,5 +41,18 @@ public class CassandraRow<K> extends Arr
   public void setKey(K key) {
     this.key = key;
   }
+  
+  /**
+   * Gets a specific CassandraColumn within a row using its name
+   * @param pCassandraColumnName
+   * @return CassandraColumn
+   */
+  public CassandraColumn getCassandraColumn(String pCassandraColumnName){
+    for (CassandraColumn cColumn: this)
+      if ( pCassandraColumnName.equals(StringSerializer.get().fromByteBuffer(cColumn.getName())) )
+        return cColumn;
+    
+    return null;
+  }
 
 }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jun 14 00:55:24 2013
@@ -40,6 +40,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.serializers.GenericArraySerializer;
 import org.apache.gora.cassandra.serializers.StatefulHashMapSerializer;
 import org.apache.gora.cassandra.serializers.TypeUtils;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,12 +83,31 @@ public class CassandraSubColumn extends 
       StatefulHashMapSerializer serializer = StatefulHashMapSerializer.get(fieldSchema.getValueType());
       StatefulHashMap map = serializer.fromByteBuffer(byteBuffer);
       value = map;
+    } else if (type == Type.UNION){
+      // the selected union schema is obtained
+      Schema unionFieldSchema = getUnionSchema(super.getUnionType(), field.schema());
+      // we use the selected union schema to deserialize our actual value
+      value = fromByteBuffer(unionFieldSchema, byteBuffer);
     } else {
       value = fromByteBuffer(fieldSchema, byteBuffer);
     }
 
     return value;
   }
+  
+  /**
+   * Gets the specific schema for a union data type
+   * @param pSchemaPos
+   * @param pSchema
+   * @return
+   */
+  private Schema getUnionSchema (int pSchemaPos, Schema pSchema){
+    Schema unionSchema = pSchema.getTypes().get(pSchemaPos);
+    // default union element
+    if ( unionSchema == null )
+      pSchema.getTypes().get(CassandraStore.DEFAULT_UNION_SCHEMA);
+    return unionSchema;
+  }
 
   public void setValue(HColumn<ByteBuffer, ByteBuffer> hColumn) {
     this.hColumn = hColumn;

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Fri Jun 14 00:55:24 2013
@@ -148,6 +148,8 @@ public class GoraSerializerTypeInferer {
       serializer = GenericArraySerializer.get(schema.getElementType());
     } else if (type == Type.MAP) {
       serializer = StatefulHashMapSerializer.get(schema.getValueType());
+    } else if (type == Type.UNION){
+      serializer = ByteBufferSerializer.get();
     } else {
       serializer = null;
     }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jun 14 00:55:24 2013
@@ -221,10 +221,24 @@ public class CassandraClient<K, T extend
     }
   }
 
+  /**
+   * Adds an subColumn inside the cassandraMapping file when a String is serialized
+   * @param key
+   * @param fieldName
+   * @param columnName
+   * @param value
+   */
   public void addSubColumn(K key, String fieldName, String columnName, Object value) {
     addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
   }
 
+  /**
+   * Adds an subColumn inside the cassandraMapping file when an Integer is serialized
+   * @param key
+   * @param fieldName
+   * @param columnName
+   * @param value
+   */
   public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
     addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
   }
@@ -364,6 +378,20 @@ public class CassandraClient<K, T extend
     
     return orderedRows.getList();
   }
+  
+  private String getMappingFamily(String pField){
+    String family = null;
+    // TODO checking if it was a UNION field the one we are retrieving
+      family = this.cassandraMapping.getFamily(pField);
+    return family;
+  }
+  
+  private String getMappingColumn(String pField){
+    String column = null;
+    // TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
+      column = this.cassandraMapping.getColumn(pField);
+    return column;
+  }
 
   /**
    * Select the families that contain at least one column mapped to a query field.
@@ -373,8 +401,8 @@ public class CassandraClient<K, T extend
   public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
     Map<String, List<String>> map = new HashMap<String, List<String>>();
     for (String field: query.getFields()) {
-      String family = this.cassandraMapping.getFamily(field);
-      String column = this.cassandraMapping.getColumn(field);
+      String family = this.getMappingFamily(field);
+      String column = this.getMappingColumn(field);
       
       // check if the family value was already initialized 
       List<String> list = map.get(family);
@@ -391,6 +419,14 @@ public class CassandraClient<K, T extend
     
     return map;
   }
+
+  /**
+   * Retrieves the cassandraMapping which holds whatever was mapped from the gora-cassandra-mapping.xml
+   * @return
+   */
+  public CassandraMapping getCassandraMapping(){
+    return this.cassandraMapping;
+  }
   
   /**
    * Select the field names according to the column names, which format if fully qualified: "family:column"
@@ -400,16 +436,15 @@ public class CassandraClient<K, T extend
   public Map<String, String> getReverseMap(Query<K, T> query) {
     Map<String, String> map = new HashMap<String, String>();
     for (String field: query.getFields()) {
-      String family = this.cassandraMapping.getFamily(field);
-      String column = this.cassandraMapping.getColumn(field);
+      String family = this.getMappingFamily(field);
+      String column = this.getMappingColumn(field);
       
       map.put(family + ":" + column, field);
     }
     
     return map;
-     
   }
-
+  
   public boolean isSuper(String family) {
     return this.cassandraMapping.isSuper(family);
   }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jun 14 00:55:24 2013
@@ -206,6 +206,17 @@ public class CassandraMapping {
     }    
   }
 
+  /**
+   * Add new column to CassandraMapping using the self-explanatory parameters
+   * @param pFamilyName
+   * @param pFieldName
+   * @param pColumnName
+   */
+  public void addColumn(String pFamilyName, String pFieldName, String pColumnName){
+    this.familyMap.put(pFieldName, pFamilyName);
+    this.columnMap.put(pFieldName, pColumnName);
+  }
+
   public String getFamily(String name) {
     return this.familyMap.get(name);
   }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java Fri Jun 14 00:55:24 2013
@@ -19,6 +19,7 @@
 package org.apache.gora.cassandra.store;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +49,9 @@ public class CassandraMappingManager {
     return manager;
   }
 
-  //
+  /**
+  * Objects to maintain mapped keyspaces
+  */
   private Map<String, Element> keyspaceMap = null;
   private Map<String, Element>  mappingMap = null;
 
@@ -95,7 +98,12 @@ public class CassandraMappingManager {
   public void loadConfiguration() throws JDOMException, IOException {
     SAXBuilder saxBuilder = new SAXBuilder();
     // get mapping file
-    Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
+    InputStream inputStream = getClass().getClassLoader().getResourceAsStream(MAPPING_FILE);
+    if (inputStream == null){
+      LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
+      throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be found!");
+    }
+    Document document = saxBuilder.build(inputStream);
     if (document == null) {
       LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
     }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jun 14 00:55:24 2013
@@ -29,8 +29,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Collections;
 
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -42,7 +40,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraResult;
@@ -68,6 +65,11 @@ public class CassandraStore<K, T extends
   private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
 
   /**
+   * Default schema index used when AVRO Union data types are stored
+   */
+  public static int DEFAULT_UNION_SCHEMA = 0;
+
+  /**
    * The values are Avro fields pending to be stored.
    *
    * We want to iterate over the keys in insertion order.
@@ -132,7 +134,7 @@ public class CassandraStore<K, T extends
     CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
     cassandraResult.setReverseMap(reverseMap);
 
-    CassandraResultSet cassandraResultSet = new CassandraResultSet();
+    CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
     
     // We query Cassandra keyspace by families.
     for (String family : familyMap.keySet()) {
@@ -322,6 +324,11 @@ public class CassandraStore<K, T extends
             }
             fieldValue = newArray;
             break;
+          case UNION:
+            // storing the union selected schema, the actual value will be stored as soon as getting out of here
+            // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+            // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
+            break;
         }
         
         p.put(fieldPos, fieldValue);
@@ -341,37 +348,32 @@ public class CassandraStore<K, T extends
   private void addOrUpdateField(K key, Field field, Object value) {
     Schema schema = field.schema();
     Type type = schema.getType();
-    switch (type) {
-      case STRING:
-      case BOOLEAN:
-      case INT:
-      case LONG:
-      case BYTES:
-      case FLOAT:
-      case DOUBLE:
-      case FIXED:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
-      case RECORD:
-        if (value != null) {
-          if (value instanceof PersistentBase) {
-            PersistentBase persistentBase = (PersistentBase) value;
-            for (Field member: schema.getFields()) {
-              
-              // TODO: hack, do not store empty arrays
-              Object memberValue = persistentBase.get(member.pos());
-              if (memberValue instanceof GenericArray<?>) {
-                if (((GenericArray)memberValue).size() == 0) {
-                  continue;
-                }
-              } else if (memberValue instanceof StatefulHashMap<?,?>) {
-                if (((StatefulHashMap)memberValue).size() == 0) {
-                  continue;
+      switch (type) {
+        case STRING:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case BYTES:
+        case FLOAT:
+        case DOUBLE:
+        case FIXED:
+          this.cassandraClient.addColumn(key, field.name(), value);
+          break;
+        case RECORD:
+          if (value != null) {
+            if (value instanceof PersistentBase) {
+              PersistentBase persistentBase = (PersistentBase) value;
+              for (Field member: schema.getFields()) {
+                
+                // TODO: hack, do not store empty arrays
+                Object memberValue = persistentBase.get(member.pos());
+                if (memberValue instanceof GenericArray<?>) {
+                  if (((GenericArray)memberValue).size() == 0) {
+                    continue;
+                  }
                 }
+                this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
               }
-
-              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
-            }
           } else {
             LOG.info("Record not supported: " + value.toString());
             
@@ -396,11 +398,52 @@ public class CassandraStore<K, T extends
           }
         }
         break;
+       case UNION:
+         if(value != null) {
+           LOG.info("Union being supported with value: " + value.toString());
+           // TODO add union schema index used
+           // adding union value
+           this.cassandraClient.addColumn(key, field.name(), value);
+         } else {
+           LOG.info("Union not supported: " + value.toString());
+         }
       default:
         LOG.info("Type not considered: " + type.name());      
     }
   }
 
+  /**
+   * Gets the position within the schema of the type used
+   * @param pValue
+   * @param pUnionSchema
+   * @return
+   */
+  private int getUnionSchema(Object pValue, Schema pUnionSchema){
+    int unionSchemaPos = 0;
+    String valueType = pValue.getClass().getSimpleName();
+    Iterator<Schema> it = pUnionSchema.getTypes().iterator();
+    while ( it.hasNext() ){
+      String schemaName = it.next().getName();
+      if (valueType.equals("Utf8") && schemaName.equals(Type.STRING.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("HeapByteBuffer") && schemaName.equals(Type.STRING.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Integer") && schemaName.equals(Type.INT.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Long") && schemaName.equals(Type.LONG.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Double") && schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Float") && schemaName.equals(Type.FLOAT.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Boolean") && schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+        return unionSchemaPos;
+      unionSchemaPos ++;
+    }
+    // if we weren't able to determine which data type it is, then we return the default
+    return 0;
+  }
+
   @Override
   public boolean schemaExists() {
     LOG.info("schema exists");

Modified: gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (original)
+++ gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Fri Jun 14 00:55:24 2013
@@ -20,19 +20,16 @@
 <gora-orm>
   <keyspace name="Employee" host="localhost" cluster="Gora Cassandra Test Cluster">
     <family name="p"/>
-    <family name="f"/>
      <family name="sc" type="super" />
   </keyspace>
 
   <keyspace name="WebPage" host="localhost" cluster="Gora Cassandra Test Cluster">
     <family name="p"/>
-    <family name="f"/>
     <family name="sc" type="super"/>
   </keyspace>
 
   <keyspace name="TokenDatum" host="localhost" cluster="Gora Cassandra Test Cluster">
     <family name="p"/>
-    <family name="f"/>
     <family name="sc" type="super"/>
   </keyspace>
 
@@ -41,6 +38,8 @@
     <field name="dateOfBirth"  family="p" qualifier="info:db"/>
     <field name="ssn"  family="p" qualifier="info:sn"/>
     <field name="salary"  family="p" qualifier="info:sl"/>
+    <field name="boss" family="p" qualifier="info:bs"/>
+    <field name="webpage" family="p" qualifier="info:wp"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" keyspace="WebPage">

Modified: gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java Fri Jun 14 00:55:24 2013
@@ -17,8 +17,9 @@
  */
 
 /**
- * @author lewismc
- *
+ * Testing class for all standard gora-cassandra functionality.
+ * We extend DataStoreTestBase enabling us to run the entire base test
+ * suite for Gora. 
  */
 package org.apache.gora.cassandra.store;
 
@@ -38,7 +39,6 @@ import org.junit.Test;
 
 /**
  * Test for CassandraStore.
- * @author lewismc
  */
 public class TestCassandraStore extends DataStoreTestBase{
 	
@@ -93,6 +93,14 @@ public class TestCassandraStore extends 
   public void testDeleteByQueryFields() throws IOException {}
   @Override
   public void testGetPartitions() throws IOException {}
+  @Override
+  public void testGetRecursive() throws IOException {}
+  @Override
+  public void testGetDoubleRecursive() throws IOException{}
+  @Override
+  public void testGetNested() throws IOException {}
+  @Override
+  public void testGet3UnionField() throws IOException {}
 // ============================================================================
 
 

Modified: gora/trunk/gora-core/src/examples/avro/employee.json
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/avro/employee.json?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/avro/employee.json (original)
+++ gora/trunk/gora-core/src/examples/avro/employee.json Fri Jun 14 00:55:24 2013
@@ -6,6 +6,29 @@
       {"name": "name", "type": "string"},
       {"name": "dateOfBirth", "type": "long"},
       {"name": "ssn", "type": "string"},
-      {"name": "salary", "type": "int"}
+      {"name": "salary", "type": "int"},
+      {"name": "boss", "type":["null","Employee","string"]},
+      {"name": "webpage", "type":["null",
+        {
+      	  "type": "record",
+      	  "name": "WebPage",
+      	  "namespace": "org.apache.gora.examples.generated",
+          "fields" : [
+           {"name": "url", "type": "string"},
+           {"name": "content", "type": ["null","bytes"]},
+           {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
+           {"name": "outlinks", "type": {"type":"map", "values":"string"}},
+           {"name": "metadata", "type": {
+            "name": "Metadata",
+            "type": "record",
+            "namespace": "org.apache.gora.examples.generated",
+            "fields": [
+             {"name": "version", "type": "int"},
+             {"name": "data", "type": {"type": "map", "values": "string"}}
+            ]
+          }}
+          ]
+      	}
+      ]}
     ]
   }

Modified: gora/trunk/gora-core/src/examples/avro/webpage.json
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/avro/webpage.json?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/avro/webpage.json (original)
+++ gora/trunk/gora-core/src/examples/avro/webpage.json Fri Jun 14 00:55:24 2013
@@ -4,7 +4,7 @@
   "namespace": "org.apache.gora.examples.generated",
   "fields" : [
     {"name": "url", "type": "string"},
-    {"name": "content", "type": "bytes"},
+    {"name": "content", "type": ["null","bytes"]},
     {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
     {"name": "outlinks", "type": {"type":"map", "values":"string"}},
     {"name": "metadata", "type": {

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java Fri Jun 14 00:55:24 2013
@@ -61,7 +61,7 @@ public class WebPageDataCreator {
   }
   
   public static final String[] CONTENTS = {
-    "foo baz bar",
+    null,
     "foo",
     "foo1 bar1 baz1",
     "a b c d e",
@@ -116,11 +116,12 @@ public class WebPageDataCreator {
 	    for(int i=0; i<URLS.length; i++) {
 	      page = new WebPage();
 	      page.setUrl(new Utf8(URLS[i]));
-	      page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
-	      for(String token : CONTENTS[i].split(" ")) {
-	        page.addToParsedContent(new Utf8(token));  
-	      }
-	      
+	      if (CONTENTS[i]!=null){
+	        page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
+	        for(String token : CONTENTS[i].split(" ")) {
+	    	  page.addToParsedContent(new Utf8(token));  
+	        }
+          }
 	      for(int j=0; j<LINKS[i].length; j++) {
 	        page.putToOutlinks(new Utf8(URLS[LINKS[i][j]]), new Utf8(ANCHORS[i][j]));
 	      }

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Employee.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+  * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
 package org.apache.gora.examples.generated;
 
 import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
 import org.apache.avro.specific.SpecificExceptionBase;
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.specific.SpecificRecord;
@@ -22,12 +41,14 @@ import org.apache.gora.persistency.ListG
 
 @SuppressWarnings("all")
 public class Employee extends PersistentBase {
-  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"dateOfBirth\",\"type\":\"long\"},{\"name\":\"ssn\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"}]}");
+  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"dateOfBirth\",\"type\":\"long\"},{\"name\":\"ssn\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"},{\"name\":\"boss\",\"type\":[\"null\",\"Employee\",\"string\"]},{\"name\":\"webpage\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"WebPage\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}]}]}");
   public static enum Field {
     NAME(0,"name"),
     DATE_OF_BIRTH(1,"dateOfBirth"),
     SSN(2,"ssn"),
     SALARY(3,"salary"),
+    BOSS(4,"boss"),
+    WEBPAGE(5,"webpage"),
     ;
     private int index;
     private String name;
@@ -36,7 +57,7 @@ public class Employee extends Persistent
     public String getName() {return name;}
     public String toString() {return name;}
   };
-  public static final String[] _ALL_FIELDS = {"name","dateOfBirth","ssn","salary",};
+  public static final String[] _ALL_FIELDS = {"name","dateOfBirth","ssn","salary","boss","webpage",};
   static {
     PersistentBase.registerFields(Employee.class, _ALL_FIELDS);
   }
@@ -44,6 +65,8 @@ public class Employee extends Persistent
   private long dateOfBirth;
   private Utf8 ssn;
   private int salary;
+  private Object boss;
+  private WebPage webpage;
   public Employee() {
     this(new StateManagerImpl());
   }
@@ -60,6 +83,8 @@ public class Employee extends Persistent
     case 1: return dateOfBirth;
     case 2: return ssn;
     case 3: return salary;
+    case 4: return boss;
+    case 5: return webpage;
     default: throw new AvroRuntimeException("Bad index");
     }
   }
@@ -72,6 +97,8 @@ public class Employee extends Persistent
     case 1:dateOfBirth = (Long)_value; break;
     case 2:ssn = (Utf8)_value; break;
     case 3:salary = (Integer)_value; break;
+    case 4:boss = (Object)_value; break;
+    case 5:webpage = (WebPage)_value; break;
     default: throw new AvroRuntimeException("Bad index");
     }
   }
@@ -99,4 +126,19 @@ public class Employee extends Persistent
   public void setSalary(int value) {
     put(3, value);
   }
+  public Object getBoss() {
+    return (Object) get(4);
+  }
+  public void setBoss(Employee value) {
+    put(4, value);
+  }
+  public void setBoss(Utf8 value) {
+    put(4, value);
+  }
+  public WebPage getWebpage() {
+    return (WebPage) get(5);
+  }
+  public void setWebpage(WebPage value) {
+    put(5, value);
+  }
 }

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/Metadata.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+  * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
 package org.apache.gora.examples.generated;
 
 import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
 import org.apache.avro.specific.SpecificExceptionBase;
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.specific.SpecificRecord;

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/TokenDatum.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+  * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
 package org.apache.gora.examples.generated;
 
 import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
 import org.apache.avro.specific.SpecificExceptionBase;
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.specific.SpecificRecord;

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/generated/WebPage.java Fri Jun 14 00:55:24 2013
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+  * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+
 package org.apache.gora.examples.generated;
 
 import java.nio.ByteBuffer;
@@ -10,6 +28,7 @@ import org.apache.avro.Protocol;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.FixedSize;
 import org.apache.avro.specific.SpecificExceptionBase;
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.specific.SpecificRecord;
@@ -22,7 +41,7 @@ import org.apache.gora.persistency.ListG
 
 @SuppressWarnings("all")
 public class WebPage extends PersistentBase {
-  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}");
+  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.gora.examples.generated\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}");
   public static enum Field {
     URL(0,"url"),
     CONTENT(1,"content"),

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/WordCount.java Fri Jun 14 00:55:24 2013
@@ -65,13 +65,15 @@ public class WordCount extends Configure
     protected void map(String key, WebPage page, Context context) 
       throws IOException ,InterruptedException {
       
-      //Get the content from a WebPage as obtained from the DataStore
-      String content =  new String(page.getContent().array());
-      
-      StringTokenizer itr = new StringTokenizer(content);
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        context.write(word, one);
+      if (page.getContent() != null) {
+        // Get the content from a WebPage as obtained from the DataStore
+        String content = new String(page.getContent().array());
+
+        StringTokenizer itr = new StringTokenizer(content);
+        while (itr.hasMoreTokens()) {
+          word.set(itr.nextToken());
+          context.write(word, one);
+        }
       }
     };
   }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java Fri Jun 14 00:55:24 2013
@@ -389,6 +389,27 @@ public class GoraCompiler {
             line(2, "getStateManager().setDirty(this, "+i+");");
             line(2, "return "+field.name()+".remove(key);");
             line(1, "}");
+            break;
+          case UNION:
+            fieldType = type(fieldSchema);
+            //Create get method: public <unbox(field.schema())> get<camelKey>()
+            line(1, "public "+unbox(field.schema())+" get" +camelKey+"() {");
+            line(2, "return ("+unbox(field.schema())+") get("+i+");");
+            line(1, "}");
+            
+            //Create set methods: public void set<camelKey>(<subschema.fieldType> value)
+            for (Schema s : fieldSchema.getTypes()) {
+              if (s.getType().equals(Schema.Type.NULL)) continue ;
+              String unionFieldType = type(s);
+              line(1, "public void set"+camelKey+"("+unionFieldType+" value) {");
+              line(2, "put("+i+", value);");
+              line(1, "}");
+            }
+            break;
+          case NULL:
+            throw new RuntimeException("Unexpected NULL field: "+field);
+          default:
+            throw new RuntimeException("Unknown field: "+field);
           }
           i++;
         }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java Fri Jun 14 00:55:24 2013
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.gora.avro.PersistentDatumReader;
 import org.apache.gora.persistency.ListGenericArray;
@@ -253,12 +254,33 @@ public abstract class PersistentBase imp
     return result;
   }
 
+  /**
+   * Computes a (record's) field's hash code.
+   * @param i Index of the field in the actual
+   * @param field
+   * @return
+   */
   private int getFieldHashCode(int i, Field field) {
     Object o = get(i);
     if(o == null)
       return 0;
 
-    if(field.schema().getType() == Type.BYTES) {
+    // XXX Union special case: in a field being union we have to check the
+    // inner schemas for Type.BYTES special case, but because it is not a
+    // field we check it this way. Too simple case to create another
+    // private method
+    boolean isUnionField = false ;
+    int unionIndex = -1 ;
+    
+    if (field.schema().getType() == Type.UNION) {
+      isUnionField = true ;
+      unionIndex = GenericData.get().resolveUnion(field.schema(), o);
+    }
+    
+    if(field.schema().getType() == Type.BYTES
+       || (isUnionField
+           && field.schema().getTypes().get(unionIndex).getType() == Type.BYTES)) {
+      // ByteBuffer.hashCode() depends on internal 'position' index, but we must ignore that.
       return getByteBufferHashCode((ByteBuffer)o);
     }
 

Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java Fri Jun 14 00:55:24 2013
@@ -78,11 +78,13 @@ public class MapReduceTestUtils {
     //assert results
     HashMap<String, Integer> actualCounts = new HashMap<String, Integer>();
     for(String content : WebPageDataCreator.CONTENTS) {
-      for(String token:content.split(" ")) {
-        Integer count = actualCounts.get(token);
-        if(count == null) 
-          count = 0;
-        actualCounts.put(token, ++count);
+      if (content != null) {
+        for(String token:content.split(" ")) {
+          Integer count = actualCounts.get(token);
+          if(count == null) 
+            count = 0;
+          actualCounts.put(token, ++count);
+        }
       }
     }
     for(Map.Entry<String, Integer> entry:actualCounts.entrySet()) {

Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java Fri Jun 14 00:55:24 2013
@@ -279,6 +279,54 @@ public abstract class DataStoreTestBase 
   }
 
   @Test
+  /**
+   * Tests put and get a record with a nested recursive record
+   * Employee with a boss (nested).
+   * @throws IOException
+   * @throws Exception
+   */
+  public void testGetRecursive() throws IOException, Exception {
+    log.info("test method: testGetRecursive") ;
+    DataStoreTestUtil.testGetEmployeeRecursive(employeeStore) ;
+  }
+
+  @Test
+  /**
+   * Tests put and get a record with a double  nested recursive record
+   * Employee with a boss (nested).
+   * @throws IOException
+   * @throws Exception
+   */
+  public void testGetDoubleRecursive() throws IOException, Exception {
+    log.info("test method: testGetDoubleRecursive") ;
+    DataStoreTestUtil.testGetEmployeeDoubleRecursive(employeeStore) ;
+  }
+  
+  @Test
+  /**
+   * Tests put and get a record with a nested record (not recursive)
+   * The webpage of an Employee
+   * @throws IOException
+   * @throws Exception
+   */
+  public void testGetNested() throws IOException, Exception {
+    log.info("test method: testGetNested") ;
+    DataStoreTestUtil.testGetEmployeeNested(employeeStore) ;
+  }
+  
+  @Test
+  /**
+   * Tests put and get a record with a 3 types union, and
+   * having the value of the 3rd type.
+   * @throws IOException
+   * @throws Exception
+   */
+  public void testGet3UnionField() throws IOException, Exception {
+    log.info("test method: testGet3UnionField") ;
+    DataStoreTestUtil.testGetEmployee3UnionField(employeeStore) ;
+  }
+  
+  @Test
   public void testGetWithFields() throws IOException, Exception {
     log.info("test method: testGetWithFields");
     DataStoreTestUtil.testGetEmployeeWithFields(employeeStore);

Modified: gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java (original)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java Fri Jun 14 00:55:24 2013
@@ -42,8 +42,11 @@ import org.apache.avro.generic.GenericAr
 import org.apache.avro.util.Utf8;
 import org.apache.gora.examples.WebPageDataCreator;
 import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.Metadata;
 import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.persistency.BeanFactory;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -91,6 +94,17 @@ public class DataStoreTestUtil {
     return employee;
   }
 
+  public static <K> Employee createBoss(
+      DataStore<K, Employee> dataStore) throws IOException, Exception {
+
+    Employee employee = dataStore.newPersistent();
+    employee.setName(new Utf8("Random boss"));
+    employee.setDateOfBirth( System.currentTimeMillis() - 22L *  YEAR_IN_MS );
+    employee.setSalary(1000000);
+    employee.setSsn(new Utf8("202020202020"));
+    return employee;
+  }
+  
   public static void testAutoCreateSchema(DataStore<String,Employee> dataStore)
   throws IOException, Exception {
     //should not throw exception
@@ -147,6 +161,76 @@ public class DataStoreTestUtil {
     Assert.assertEquals(employee, after);
   }
 
+
+  public static void testGetEmployeeRecursive(DataStore<String, Employee> dataStore)
+    throws IOException, Exception {
+
+    Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    Employee boss = DataStoreTestUtil.createBoss(dataStore);
+    employee.setBoss(boss) ;
+    
+    String ssn = employee.getSsn().toString();
+    dataStore.put(ssn, employee);
+    dataStore.flush();
+    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+    Assert.assertEquals(employee, after);
+    Assert.assertEquals(boss, after.getBoss()) ;
+  }
+
+  public static void testGetEmployeeDoubleRecursive(DataStore<String, Employee> dataStore)
+      throws IOException, Exception {
+
+      Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+      Employee boss = DataStoreTestUtil.createBoss(dataStore);
+      Employee uberBoss = DataStoreTestUtil.createBoss(dataStore);
+      uberBoss.setName(new Utf8("Überboss")) ;
+      boss.setBoss(uberBoss) ;
+      employee.setBoss(boss) ;
+      
+      String ssn = employee.getSsn().toString();
+      dataStore.put(ssn, employee);
+      dataStore.flush();
+      Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+      Assert.assertEquals(employee, after);
+      Assert.assertEquals(boss, after.getBoss()) ;
+      Assert.assertEquals(uberBoss, ((Employee)after.getBoss()).getBoss()) ;
+    }
+  
+  public static void testGetEmployeeNested(DataStore<String, Employee> dataStore)
+    throws IOException, Exception {
+
+    Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    WebPage webpage = new BeanFactoryImpl<String,WebPage>(String.class,WebPage.class).newPersistent() ;
+    
+    webpage.setUrl(new Utf8("url..")) ;
+    webpage.setContent(ByteBuffer.wrap("test content".getBytes())) ;
+    Metadata metadata = new BeanFactoryImpl<String,Metadata>(String.class,Metadata.class).newPersistent() ;
+    webpage.setMetadata(metadata) ;
+    employee.setWebpage(webpage) ;
+    
+    String ssn = employee.getSsn().toString();
+   
+    dataStore.put(ssn, employee);
+    dataStore.flush();
+    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+    Assert.assertEquals(employee, after);
+    Assert.assertEquals(webpage, after.getWebpage()) ;
+  }
+  
+  public static void testGetEmployee3UnionField(DataStore<String, Employee> dataStore)
+    throws IOException, Exception {
+
+    Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    employee.setBoss(new Utf8("Real boss")) ;
+    
+    String ssn = employee.getSsn().toString();
+    dataStore.put(ssn, employee);
+    dataStore.flush();
+    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+    Assert.assertEquals(employee, after);
+    Assert.assertEquals("Real boss", ((Utf8)after.getBoss()).toString()) ;
+  }
+  
   public static void testGetEmployeeNonExisting(DataStore<String, Employee> dataStore)
     throws IOException, Exception {
     Employee employee = dataStore.get("_NON_EXISTING_SSN_FOR_EMPLOYEE_");
@@ -160,7 +244,14 @@ public class DataStoreTestUtil {
     dataStore.put(ssn, employee);
     dataStore.flush();
 
-    String[] fields = employee.getFields();
+    // XXX See GORA-216: special case until later reviewed.
+    // Like in K-V stores, if retrieved column does not exists ([webpage] case),
+    // get() must return 'null'.
+    // We prepare an actual weird synthetic test.
+    
+    // String[] fields = employee.getFields();
+    String[] fields = {"name","dateOfBirth","ssn","salary"} ;
+    
     for(Set<String> subset : StringUtils.powerset(fields)) {
       if(subset.isEmpty())
         continue;
@@ -171,7 +262,7 @@ public class DataStoreTestUtil {
         expected.put(index, employee.get(index));
       }
 
-      Assert.assertEquals(expected, after);
+      Assert.assertEquals(expected, after);        
     }
   }
 
@@ -336,27 +427,35 @@ public class DataStoreTestUtil {
     Assert.assertNotNull(page);
 
     Assert.assertEquals(URLS[i], page.getUrl().toString());
-    Assert.assertTrue("content error:" + new String( toByteArray(page.getContent()) ) +
+    // 'content' is optional
+    if (page.getContent() != null) {
+      Assert.assertTrue("content error:" + new String( toByteArray(page.getContent()) ) +
         " actual=" + CONTENTS[i] + " i=" + i
-    , Arrays.equals( toByteArray(page.getContent() )
+        , Arrays.equals( toByteArray(page.getContent() )
         , CONTENTS[i].getBytes()));
-
-    GenericArray<Utf8> parsedContent = page.getParsedContent();
-    Assert.assertNotNull(parsedContent);
-    Assert.assertTrue(parsedContent.size() > 0);
-
-    int j=0;
-    String[] tokens = CONTENTS[i].split(" ");
-    for(Utf8 token : parsedContent) {
-      Assert.assertEquals(tokens[j++], token.toString());
+      GenericArray<Utf8> parsedContent = page.getParsedContent();
+      Assert.assertNotNull(parsedContent);
+      Assert.assertTrue(parsedContent.size() > 0);
+    
+      int j=0;
+      String[] tokens = CONTENTS[i].split(" ");
+      for(Utf8 token : parsedContent) {
+        Assert.assertEquals(tokens[j++], token.toString());
+      }
+    } else {
+      // when page.getContent() is null
+      Assert.assertTrue(CONTENTS[i] == null) ;
+      GenericArray<Utf8> parsedContent = page.getParsedContent();
+      Assert.assertNotNull(parsedContent);
+      Assert.assertTrue(parsedContent.size() == 0);
     }
 
     if(LINKS[i].length > 0) {
       Assert.assertNotNull(page.getOutlinks());
       Assert.assertTrue(page.getOutlinks().size() > 0);
-      for(j=0; j<LINKS[i].length; j++) {
-        Assert.assertEquals(ANCHORS[i][j],
-            page.getFromOutlinks(new Utf8(URLS[LINKS[i][j]])).toString());
+      for(int k=0; k<LINKS[i].length; k++) {
+        Assert.assertEquals(ANCHORS[i][k],
+          page.getFromOutlinks(new Utf8(URLS[LINKS[i][k]])).toString());
       }
     } else {
       Assert.assertTrue(page.getOutlinks() == null || page.getOutlinks().isEmpty());

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Fri Jun 14 00:55:24 2013
@@ -192,7 +192,7 @@ implements Configurable {
       Get get = new Get(toBytes(key));
       addFields(get, fields);
       Result result = table.get(get);
-      return newInstance(result, fields);
+      return newInstance(result, fields);      
     } catch(IOException ex2){
       LOG.error(ex2.getMessage());
       LOG.error(ex2.getStackTrace().toString());
@@ -200,6 +200,17 @@ implements Configurable {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   * Serializes the Persistent data and saves in HBase.
+   * Topmost fields of the record are persisted in "raw" format (not avro serialized). This behavior happens
+   * in maps and arrays too.
+   * 
+   * ["null","type"] type (a.k.a. optional field) is persisted like as if it is ["type"], but the column get
+   * deleted if value==null (so value read after will be null).
+   * 
+   * @param persistent Record to be persisted in HBase
+   */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void put(K key, T persistent) {
@@ -234,8 +245,14 @@ implements Configurable {
                   case DIRTY:
                     byte[] qual = Bytes.toBytes(mapKey.toString());
                     byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
-                    put.add(hcol.getFamily(), qual, val);
-                    hasPuts = true;
+                    // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+                    if (val == null) { // value == null => must delete the column
+                      delete.deleteColumn(hcol.getFamily(), qual);
+                      hasDeletes = true;
+                    } else {
+                      put.add(hcol.getFamily(), qual, val);
+                      hasPuts = true;
+                    }
                     break;
                   case DELETED:
                     qual = Bytes.toBytes(mapKey.toString());
@@ -248,9 +265,15 @@ implements Configurable {
               Set<Map.Entry> set = ((Map)o).entrySet();
               for(Entry entry: set) {
                 byte[] qual = toBytes(entry.getKey());
-                byte[] val = toBytes(entry.getValue());
-                put.add(hcol.getFamily(), qual, val);
-                hasPuts = true;
+                byte[] val = toBytes(entry.getValue(), field.schema().getValueType());
+                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+                if (val == null) { // value == null => must delete the column
+                  delete.deleteColumn(hcol.getFamily(), qual);
+                  hasDeletes = true;
+                } else {
+                  put.add(hcol.getFamily(), qual, val);
+                  hasPuts = true;
+                }
               }
             }
             break;
@@ -259,15 +282,28 @@ implements Configurable {
               GenericArray arr = (GenericArray) o;
               int j=0;
               for(Object item : arr) {
-                byte[] val = toBytes(item);
-                put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
-                hasPuts = true;
+                byte[] val = toBytes(item, field.schema().getElementType());
+                // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+                if (val == null) { // value == null => must delete the column
+                  delete.deleteColumn(hcol.getFamily(), Bytes.toBytes(j++));
+                  hasDeletes = true;
+                } else {
+                  put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
+                  hasPuts = true;
+                }
               }
             }
             break;
           default:
-            put.add(hcol.getFamily(), hcol.getQualifier(), toBytes(o, field.schema()));
-            hasPuts = true;
+            // XXX - Gora 207: Top-most record level ["null","type"] must be saved raw. "null"=>delete
+            byte[] serializedBytes = toBytes(o, field.schema()) ;
+            if (serializedBytes == null) { // value == null => must delete the column
+              delete.deleteColumn(hcol.getFamily(), hcol.getQualifier());
+              hasDeletes = true;
+            } else {
+              put.add(hcol.getFamily(), hcol.getQualifier(), serializedBytes);
+              hasPuts = true;
+            }
             break;
         }
       }
@@ -511,6 +547,14 @@ implements Configurable {
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
+  /**
+   * Creates a new Persistent instance with the values in 'result' for the fields listed.
+   * @param result result form a HTable#get()
+   * @param fields List of fields queried, or null for all
+   * @return A new instance with default values for not listed fields
+   *         null if 'result' is null.
+   * @throws IOException
+   */
   public T newInstance(Result result, String[] fields)
   throws IOException {
     if(result == null || result.isEmpty())
@@ -555,8 +599,7 @@ implements Configurable {
           setField(persistent, field, arr);
           break;
         default:
-          byte[] val =
-            result.getValue(col.getFamily(), col.getQualifier());
+          byte[] val = result.getValue(col.getFamily(), col.getQualifier());
           if (val == null) {
             continue;
           }

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Fri Jun 14 00:55:24 2013
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
@@ -34,6 +35,8 @@ import org.apache.avro.specific.Specific
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.util.AvroUtils;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.avro.PersistentDatumWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -86,7 +89,17 @@ public class HBaseByteInterface {
       };
   };
 
-
+  /**
+   * Deserializes an array of bytes matching the given schema to the proper basic (enum, Utf8,...) or
+   * complex type (Persistent/Record).
+   * 
+   * Does not handle <code>arrays/maps</code> if not inside a <code>record</code> type.
+   * 
+   * @param schema Avro schema describing the expected data
+   * @param val array of bytes with the data serialized
+   * @return Enum|Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Persistent|Null
+   * @throws IOException
+   */
   @SuppressWarnings("rawtypes")
   public static Object fromBytes(Schema schema, byte[] val) throws IOException {
     Type type = schema.getType();
@@ -99,12 +112,57 @@ public class HBaseByteInterface {
     case FLOAT:   return Bytes.toFloat(val);
     case DOUBLE:  return Bytes.toDouble(val);
     case BOOLEAN: return val[0] != 0;
+    case UNION:
+      // XXX Special case: When reading the top-level field of a record we must handle the
+      // special case ["null","type"] definitions: this will be written as if it was ["type"]
+      // if not in a special case, will execute "case RECORD".
+      
+      // if 'val' is empty we ignore the special case (will match Null in "case RECORD")  
+      if (schema.getTypes().size() == 2) {
+        
+        // schema [type0, type1]
+        Type type0 = schema.getTypes().get(0).getType() ;
+        Type type1 = schema.getTypes().get(1).getType() ;
+        
+        // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+        if (!type0.equals(type1)
+            && (   type0.equals(Schema.Type.NULL)
+                || type1.equals(Schema.Type.NULL))) {
+
+          if (type0.equals(Schema.Type.NULL))
+            schema = schema.getTypes().get(1) ;
+          else 
+            schema = schema.getTypes().get(0) ;
+          
+          return fromBytes(schema, val) ; // Deserialize as if schema was ["type"] 
+        }
+        
+      }
+      // else
+      //   type = [type0,type1] where type0=type1
+      //   or val == null
+      // => deserialize like "case RECORD"
+
     case RECORD:
       Map<String, SpecificDatumReader<?>> readerMap = readerMaps.get();
-      SpecificDatumReader<?> reader = readerMap.get(schema.getFullName());
-      if (reader == null) {
-        reader = new SpecificDatumReader(schema);     
-        readerMap.put(schema.getFullName(), reader);
+      PersistentDatumReader<?> reader = null ;
+            
+      // For UNION schemas, must use a specific PersistentDatumReader
+      // from the readerMap since unions don't have own name
+      // (key name in map will be "UNION-type-type-...")
+      if (schema.getType().equals(Schema.Type.UNION)) {
+        reader = (PersistentDatumReader<?>)readerMap.get(String.valueOf(schema.hashCode()));
+        if (reader == null) {
+          reader = new PersistentDatumReader(schema, false);// ignore dirty bits
+          readerMap.put(String.valueOf(schema.hashCode()), reader);
+        }
+      } else {
+        // ELSE use reader for Record
+        reader = (PersistentDatumReader<?>)readerMap.get(schema.getFullName());
+        if (reader == null) {
+          reader = new PersistentDatumReader(schema, false);// ignore dirty bits
+          readerMap.put(schema.getFullName(), reader);
+        }
       }
       
       // initialize a decoder, possibly reusing previous one
@@ -116,11 +174,18 @@ public class HBaseByteInterface {
         decoders.set(decoder);
       }
       
-      return reader.read(null, decoder);
+      return reader.read((Object)null, schema, decoder);
     default: throw new RuntimeException("Unknown type: "+type);
     }
   }
 
+  /**
+   * Converts an array of bytes to the target <em>basic class</em>.
+   * @param clazz (Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8).class
+   * @param val array of bytes with the value
+   * @return an instance of <code>clazz</code> with the bytes in <code>val</code>
+   *         deserialized with org.apache.hadoop.hbase.util.Bytes
+   */
   @SuppressWarnings("unchecked")
   public static <K> K fromBytes(Class<K> clazz, byte[] val) {
     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
@@ -145,6 +210,11 @@ public class HBaseByteInterface {
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }
 
+  /**
+   * Converts an instance of a <em>basic class</em> to an array of bytes.
+   * @param o Instance of Enum|Byte|Boolean|Short|Integer|Long|Float|Double|String|Utf8
+   * @return array of bytes with <code>o</code> serialized with org.apache.hadoop.hbase.util.Bytes
+   */
   public static byte[] toBytes(Object o) {
     Class<?> clazz = o.getClass();
     if (clazz.equals(Enum.class)) {
@@ -171,6 +241,14 @@ public class HBaseByteInterface {
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }
 
+  /**
+   * Serializes an object following the given schema.
+   * Does not handle <code>array/map</code> if it is not inside a <code>record</code>
+   * @param o Utf8|ByteBuffer|Integer|Long|Float|Double|Boolean|Enum|Persistent
+   * @param schema The schema describing the object (or a compatible description)
+   * @return array of bytes of the serialized object
+   * @throws IOException
+   */
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public static byte[] toBytes(Object o, Schema schema) throws IOException {
     Type type = schema.getType();
@@ -183,12 +261,54 @@ public class HBaseByteInterface {
     case DOUBLE:  return Bytes.toBytes((Double)o);
     case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
     case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
+    case UNION:
+      // XXX Special case: When writing the top-level field of a record we must handle the
+      // special case ["null","type"] definitions: this will be written as if it was ["type"]
+      // if not in a special case, will execute "case RECORD".
+      
+      if (schema.getTypes().size() == 2) {
+        
+        // schema [type0, type1]
+        Type type0 = schema.getTypes().get(0).getType() ;
+        Type type1 = schema.getTypes().get(1).getType() ;
+        
+        // Check if types are different and there's a "null", like ["null","type"] or ["type","null"]
+        if (!type0.equals(type1)
+            && (   type0.equals(Schema.Type.NULL)
+                || type1.equals(Schema.Type.NULL))) {
+
+          if (o == null) return null ;
+          
+          int index = GenericData.get().resolveUnion(schema, o);
+          schema = schema.getTypes().get(index) ;
+          
+          return toBytes(o, schema) ; // Serialize as if schema was ["type"] 
+        }
+        
+      }
+      // else
+      //   type = [type0,type1] where type0=type1
+      // => Serialize like "case RECORD" with Avro
+      
     case RECORD:
       Map<String, SpecificDatumWriter<?>> writerMap = writerMaps.get();
-      SpecificDatumWriter writer = writerMap.get(schema.getFullName());
-      if (writer == null) {
-        writer = new SpecificDatumWriter(schema);
-        writerMap.put(schema.getFullName(),writer);
+      PersistentDatumWriter writer = null ;
+      // For UNION schemas, must use a specific PersistentDatumReader
+      // from the readerMap since unions don't have own name
+      // (key name in map will be "UNION-type-type-...")
+      if (schema.getType().equals(Schema.Type.UNION)) {
+        writer = (PersistentDatumWriter<?>) writerMap.get(String.valueOf(schema.hashCode()));
+        if (writer == null) {
+          writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
+          writerMap.put(String.valueOf(schema.hashCode()),writer);
+        }
+      } else {
+        // ELSE use writer for Record
+        writer = (PersistentDatumWriter<?>) writerMap.get(schema.getFullName());
+        if (writer == null) {
+          writer = new PersistentDatumWriter(schema,false);// ignore dirty bits
+          writerMap.put(schema.getFullName(),writer);
+        }
       }
       
       BinaryEncoderWithStream encoder = encoders.get();
@@ -200,7 +320,7 @@ public class HBaseByteInterface {
       ByteArrayOutputStream os = (ByteArrayOutputStream) encoder.getOut();
       os.reset();
       
-      writer.write(o, encoder);
+      writer.write(schema,o, encoder);
       encoder.flush();
       return os.toByteArray();
     default: throw new RuntimeException("Unknown type: "+type);

Modified: gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml (original)
+++ gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml Fri Jun 14 00:55:24 2013
@@ -35,6 +35,8 @@
     <field name="dateOfBirth" family="info" qualifier="db"/>
     <field name="ssn" family="info" qualifier="sn"/>
     <field name="salary" family="info" qualifier="sl"/>
+    <field name="boss" family="info" qualifier="bs"/>
+    <field name="webpage" family="info" qualifier="wp"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">

Modified: gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java?rev=1492920&r1=1492919&r2=1492920&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java Fri Jun 14 00:55:24 2013
@@ -19,21 +19,29 @@
 package org.apache.gora.hbase.store;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Properties;
 
 import junit.framework.Assert;
 
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.examples.WebPageDataCreator;
 import org.apache.gora.examples.generated.Employee;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.hbase.GoraHBaseTestDriver;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.DataStoreTestUtil;
+import org.apache.gora.util.GoraException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
 
 /**
  * Test case for HBaseStore.
@@ -94,8 +102,14 @@ public class TestHBaseStore extends Data
   }
   
   
+  /**
+   * Asserts that writing bytes actually works at low level in HBase.
+   * Checks writing null unions too.
+   */
   @Override
   public void assertPutBytes(byte[] contentBytes) throws IOException {    
+
+    // Check first the parameter "contentBytes" if written+read right.
     HTable table = new HTable("WebPage");
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
@@ -103,7 +117,99 @@ public class TestHBaseStore extends Data
     byte[] actualBytes = result.getValue(Bytes.toBytes("content"), null);
     Assert.assertNotNull(actualBytes);
     Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+    table.close();    
+
+    // Since "content" is an optional field, we are forced to reopen the DataStore
+    // to retrieve the union correctly
+    
+    // Test writing+reading a null value. FIELD in HBASE MUST become DELETED
+    WebPage page = webPageStore.get("com.example/http") ;
+    page.setContent(null) ;
+    webPageStore.put("com.example/http", page) ;
+    webPageStore.close() ;
+    webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+    page = webPageStore.get("com.example/http") ;
+    Assert.assertNull(page.getContent()) ;
+    // Check directly with HBase
+    table = new HTable("WebPage");
+    get = new Get(Bytes.toBytes("com.example/http"));
+    result = table.get(get);
+    actualBytes = result.getValue(Bytes.toBytes("content"), null);
+    Assert.assertNull(actualBytes);
+    table.close();
+    
+    // Test writing+reading an empty bytes field. FIELD in HBASE MUST become EMPTY (byte[0])
+    page = webPageStore.get("com.example/http") ;
+    page.setContent(ByteBuffer.wrap("".getBytes())) ;
+    webPageStore.put("com.example/http", page) ;
+    webPageStore.close() ;
+    webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+    page = webPageStore.get("com.example/http") ;
+    Assert.assertTrue(Arrays.equals("".getBytes(),page.getContent().array())) ;
+    // Check directly with HBase
+    table = new HTable("WebPage");
+    get = new Get(Bytes.toBytes("com.example/http"));
+    result = table.get(get);
+    actualBytes = result.getValue(Bytes.toBytes("content"), null);
+    Assert.assertNotNull(actualBytes);
+    Assert.assertEquals(0, actualBytes.length) ;
     table.close();
+    
+  }
+  
+  /**
+   * Checks that when writing a top level union <code>['null','type']</code> the value is written in raw format
+   * @throws Exception
+   */
+  @Test
+  public void assertTopLevelUnions() throws Exception {
+    WebPage page = webPageStore.newPersistent();
+
+    // Write webpage data
+    page.setUrl(new Utf8("http://example.com"));
+    byte[] contentBytes = "example content in example.com".getBytes();
+    ByteBuffer buff = ByteBuffer.wrap(contentBytes);
+    page.setContent(buff);
+    webPageStore.put("com.example/http", page);
+    webPageStore.flush() ;
+    
+    // Read directly from HBase
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+
+    byte[] bytesRead = result.getValue(Bytes.toBytes("content"), null);
+    
+    Assert.assertNotNull(bytesRead) ;
+    Assert.assertTrue(Arrays.equals(bytesRead, contentBytes));
+  }
+  
+  /**
+   * Checks that when writing a top level union <code>['null','type']</code> with the option <code>RAW_ROOT_FIELDS_OPTION=true</code>
+   * the column is not created, and when <code>RAW_ROOT_FIELDS_OPTION=false</code> the <code>null</code> value is serialized
+   * with Avro.
+   * @throws Exception
+   */
+  @Test
+  public void assertTopLevelUnionsNull() throws Exception {
+    WebPage page = webPageStore.newPersistent();
+    
+    // Write webpage data
+    page.setUrl(new Utf8("http://example.com"));
+    page.setContent(null);     // This won't change internal field status to dirty, so
+    page.setDirty("content") ; // need to change it manually
+    webPageStore.put("com.example/http", page);
+    webPageStore.flush() ;
+    
+    // Read directly from HBase
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+        
+    byte[] contentBytes = result.getValue(Bytes.toBytes("content"), null);
+
+    Assert.assertNull(webPageStore.get("com.example/http", new String[]{"content"})) ;
+    Assert.assertTrue(contentBytes == null || contentBytes.length == 0) ;
   }
   
   @Override