You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2013/04/30 19:45:16 UTC

svn commit: r1477735 - in /gora/branches/GORA_174: gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-cass...

Author: rmarroquin
Date: Tue Apr 30 17:45:09 2013
New Revision: 1477735

URL: http://svn.apache.org/r1477735
Log:
Committing GORA-206. Step towards putting an end to GORA-174

Modified:
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
    gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/branches/GORA_174/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
    gora/branches/GORA_174/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
    gora/branches/GORA_174/gora-core/src/examples/avro/employee.json

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Tue Apr 30 17:45:09 2013
@@ -24,7 +24,6 @@ import me.prettyprint.hector.api.Seriali
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +40,15 @@ public abstract class CassandraColumn {
   private String family;
   private int type;
   private Field field;
+  private int unionType;
+
+  public void setUnionType(int pUnionType){
+    this.unionType = pUnionType;
+  }
+
+  public int getUnionType(){
+    return unionType;
+  }
   
   public String getFamily() {
     return family;
@@ -67,7 +75,7 @@ public abstract class CassandraColumn {
   
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
-    Serializer serializer = GoraSerializerTypeInferer.getSerializer(schema);
+    Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
     if (serializer == null) {
       LOG.info("Schema is not supported: " + schema.toString());
     } else {

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Tue Apr 30 17:45:09 2013
@@ -26,7 +26,8 @@ import me.prettyprint.cassandra.serializ
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.Schema.Type;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
@@ -58,6 +59,25 @@ public class CassandraResult<K, T extend
     ++this.rowNumber;
     return (this.rowNumber <= this.cassandraResultSet.size());
   }
+  
+  /**
+   * Gets the column containing the type of the union type element stored.
+   * TODO: This might seem too much of a overhead if we consider that N rows have M columns,
+   *       this might have to be reviewed to get the specific column in O(1)
+   * @param pFieldName
+   * @param pCassandraRow
+   * @return
+   */
+  private CassandraColumn getUnionTypeColumn(String pFieldName, Object[] pCassandraRow){
+    
+    for (int iCnt = 0; iCnt < pCassandraRow.length; iCnt++){
+      CassandraColumn cColumn = (CassandraColumn)pCassandraRow[iCnt];
+      String columnName = StringSerializer.get().fromByteBuffer(cColumn.getName());
+      if (pFieldName.equals(columnName))
+        return cColumn;
+    }
+    return null;
+  }
 
 
   /**
@@ -80,21 +100,42 @@ public class CassandraResult<K, T extend
       String family = cassandraColumn.getFamily();
       String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
       
-      // get field
-      int pos = this.persistent.getFieldIndex(fieldName);
-      Field field = fields.get(pos);
-      
-      // get value
-      cassandraColumn.setField(field);
-      Object value = cassandraColumn.getValue();
-      
-      this.persistent.put(pos, value);
-      // this field does not need to be written back to the store
-      this.persistent.clearDirty(pos);
+      if (fieldName != null ){
+        // get field
+        int pos = this.persistent.getFieldIndex(fieldName);
+        Field field = fields.get(pos);
+        Type fieldType = field.schema().getType();
+        System.out.println(StringSerializer.get().fromByteBuffer(cassandraColumn.getName()) + fieldName + " " + fieldType.name());
+        if (fieldType == Type.UNION){
+          // TODO getting UNION stored type
+          // TODO get value of UNION stored type. This field does not need to be written back to the store
+          cassandraColumn.setUnionType(getNonNullTypePos(field.schema().getTypes()));
+        }
+
+        // get value
+        cassandraColumn.setField(field);
+        Object value = cassandraColumn.getValue();
+
+        this.persistent.put(pos, value);
+        // this field does not need to be written back to the store
+        this.persistent.clearDirty(pos);
+      }
+      else
+        LOG.debug("FieldName was null while iterating CassandraRow and using Avro Union type");
     }
 
   }
 
+  private int getNonNullTypePos(List<Schema> pTypes){
+    int iCnt = 0;
+    for (Schema sch :  pTypes)
+      if (!sch.getName().equals("null"))
+        return iCnt;
+      else 
+        iCnt++;
+    return CassandraStore.DEFAULT_UNION_SCHEMA;
+  }
+
   @Override
   public void close() throws IOException {
     // TODO Auto-generated method stub

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Tue Apr 30 17:45:09 2013
@@ -20,6 +20,9 @@ package org.apache.gora.cassandra.query;
 
 import java.util.ArrayList;
 
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
+
 /**
  * List of key value pairs representing a row, tagged by a key.
  */
@@ -38,5 +41,18 @@ public class CassandraRow<K> extends Arr
   public void setKey(K key) {
     this.key = key;
   }
+  
+  /**
+   * Gets a specific CassandraColumn within a row using its name
+   * @param pCassandraColumnName
+   * @return CassandraColumn
+   */
+  public CassandraColumn getCassandraColumn(String pCassandraColumnName){
+    for (CassandraColumn cColumn: this)
+      if ( pCassandraColumnName.equals(StringSerializer.get().fromByteBuffer(cColumn.getName())) )
+        return cColumn;
+    
+    return null;
+  }
 
 }

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Tue Apr 30 17:45:09 2013
@@ -40,6 +40,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.serializers.GenericArraySerializer;
 import org.apache.gora.cassandra.serializers.StatefulHashMapSerializer;
 import org.apache.gora.cassandra.serializers.TypeUtils;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,12 +83,31 @@ public class CassandraSubColumn extends 
       StatefulHashMapSerializer serializer = StatefulHashMapSerializer.get(fieldSchema.getValueType());
       StatefulHashMap map = serializer.fromByteBuffer(byteBuffer);
       value = map;
+    } else if (type == Type.UNION){
+      // the selected union schema is obtained
+      Schema unionFieldSchema = getUnionSchema(super.getUnionType(), field.schema());
+      // we use the selected union schema to deserialize our actual value
+      value = fromByteBuffer(unionFieldSchema, byteBuffer);
     } else {
       value = fromByteBuffer(fieldSchema, byteBuffer);
     }
 
     return value;
   }
+  
+  /**
+   * Gets the specific schema for a union data type
+   * @param pSchemaPos
+   * @param pSchema
+   * @return
+   */
+  private Schema getUnionSchema (int pSchemaPos, Schema pSchema){
+    Schema unionSchema = pSchema.getTypes().get(pSchemaPos);
+    // default union element
+    if ( unionSchema == null )
+      pSchema.getTypes().get(CassandraStore.DEFAULT_UNION_SCHEMA);
+    return unionSchema;
+  }
 
   public void setValue(HColumn<ByteBuffer, ByteBuffer> hColumn) {
     this.hColumn = hColumn;

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Tue Apr 30 17:45:09 2013
@@ -148,6 +148,8 @@ public class GoraSerializerTypeInferer {
       serializer = GenericArraySerializer.get(schema.getElementType());
     } else if (type == Type.MAP) {
       serializer = StatefulHashMapSerializer.get(schema.getValueType());
+    } else if (type == Type.UNION){
+      serializer = ByteBufferSerializer.get();
     } else {
       serializer = null;
     }

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Tue Apr 30 17:45:09 2013
@@ -221,10 +221,24 @@ public class CassandraClient<K, T extend
     }
   }
 
+  /**
+   * Adds an subColumn inside the cassandraMapping file when a String is serialized
+   * @param key
+   * @param fieldName
+   * @param columnName
+   * @param value
+   */
   public void addSubColumn(K key, String fieldName, String columnName, Object value) {
     addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
   }
 
+  /**
+   * Adds an subColumn inside the cassandraMapping file when an Integer is serialized
+   * @param key
+   * @param fieldName
+   * @param columnName
+   * @param value
+   */
   public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
     addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
   }
@@ -364,6 +378,20 @@ public class CassandraClient<K, T extend
     
     return orderedRows.getList();
   }
+  
+  private String getMappingFamily(String pField){
+    String family = null;
+    // TODO checking if it was a UNION field the one we are retrieving
+      family = this.cassandraMapping.getFamily(pField);
+    return family;
+  }
+  
+  private String getMappingColumn(String pField){
+    String column = null;
+    // TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
+      column = this.cassandraMapping.getColumn(pField);
+    return column;
+  }
 
   /**
    * Select the families that contain at least one column mapped to a query field.
@@ -373,8 +401,8 @@ public class CassandraClient<K, T extend
   public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
     Map<String, List<String>> map = new HashMap<String, List<String>>();
     for (String field: query.getFields()) {
-      String family = this.cassandraMapping.getFamily(field);
-      String column = this.cassandraMapping.getColumn(field);
+      String family = this.getMappingFamily(field);
+      String column = this.getMappingColumn(field);
       
       // check if the family value was already initialized 
       List<String> list = map.get(family);
@@ -391,6 +419,14 @@ public class CassandraClient<K, T extend
     
     return map;
   }
+
+  /**
+   * Retrieves the cassandraMapping which holds whatever was mapped from the gora-cassandra-mapping.xml
+   * @return
+   */
+  public CassandraMapping getCassandraMapping(){
+    return this.cassandraMapping;
+  }
   
   /**
    * Select the field names according to the column names, which format if fully qualified: "family:column"
@@ -400,16 +436,15 @@ public class CassandraClient<K, T extend
   public Map<String, String> getReverseMap(Query<K, T> query) {
     Map<String, String> map = new HashMap<String, String>();
     for (String field: query.getFields()) {
-      String family = this.cassandraMapping.getFamily(field);
-      String column = this.cassandraMapping.getColumn(field);
+      String family = this.getMappingFamily(field);
+      String column = this.getMappingColumn(field);
       
       map.put(family + ":" + column, field);
     }
     
     return map;
-     
   }
-
+  
   public boolean isSuper(String family) {
     return this.cassandraMapping.isSuper(family);
   }

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Tue Apr 30 17:45:09 2013
@@ -206,6 +206,17 @@ public class CassandraMapping {
     }    
   }
 
+  /**
+   * Add new column to CassandraMapping using the self-explanatory parameters
+   * @param pFamilyName
+   * @param pFieldName
+   * @param pColumnName
+   */
+  public void addColumn(String pFamilyName, String pFieldName, String pColumnName){
+    this.familyMap.put(pFieldName, pFamilyName);
+    this.columnMap.put(pFieldName, pColumnName);
+  }
+
   public String getFamily(String name) {
     return this.familyMap.get(name);
   }

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java Tue Apr 30 17:45:09 2013
@@ -19,6 +19,7 @@
 package org.apache.gora.cassandra.store;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +49,9 @@ public class CassandraMappingManager {
     return manager;
   }
 
-  //
+  /**
+  * Objects to maintain mapped keyspaces
+  */
   private Map<String, Element> keyspaceMap = null;
   private Map<String, Element>  mappingMap = null;
 
@@ -95,7 +98,12 @@ public class CassandraMappingManager {
   public void loadConfiguration() throws JDOMException, IOException {
     SAXBuilder saxBuilder = new SAXBuilder();
     // get mapping file
-    Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
+    InputStream inputStream = getClass().getClassLoader().getResourceAsStream(MAPPING_FILE);
+    if (inputStream == null){
+      LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
+      throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be found!");
+    }
+    Document document = saxBuilder.build(inputStream);
     if (document == null) {
       LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
     }

Modified: gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Tue Apr 30 17:45:09 2013
@@ -29,8 +29,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Collections;
 
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -42,7 +40,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraResult;
@@ -68,6 +65,11 @@ public class CassandraStore<K, T extends
   private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
 
   /**
+   * Default schema index used when AVRO Union data types are stored
+   */
+  public static int DEFAULT_UNION_SCHEMA = 0;
+
+  /**
    * The values are Avro fields pending to be stored.
    *
    * We want to iterate over the keys in insertion order.
@@ -132,7 +134,7 @@ public class CassandraStore<K, T extends
     CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
     cassandraResult.setReverseMap(reverseMap);
 
-    CassandraResultSet cassandraResultSet = new CassandraResultSet();
+    CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
     
     // We query Cassandra keyspace by families.
     for (String family : familyMap.keySet()) {
@@ -322,6 +324,11 @@ public class CassandraStore<K, T extends
             }
             fieldValue = newArray;
             break;
+          case UNION:
+            // storing the union selected schema, the actual value will be stored as soon as getting out of here
+            // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+            // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
+            break;
         }
         
         p.put(fieldPos, fieldValue);
@@ -341,37 +348,32 @@ public class CassandraStore<K, T extends
   private void addOrUpdateField(K key, Field field, Object value) {
     Schema schema = field.schema();
     Type type = schema.getType();
-    switch (type) {
-      case STRING:
-      case BOOLEAN:
-      case INT:
-      case LONG:
-      case BYTES:
-      case FLOAT:
-      case DOUBLE:
-      case FIXED:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
-      case RECORD:
-        if (value != null) {
-          if (value instanceof PersistentBase) {
-            PersistentBase persistentBase = (PersistentBase) value;
-            for (Field member: schema.getFields()) {
-              
-              // TODO: hack, do not store empty arrays
-              Object memberValue = persistentBase.get(member.pos());
-              if (memberValue instanceof GenericArray<?>) {
-                if (((GenericArray)memberValue).size() == 0) {
-                  continue;
-                }
-              } else if (memberValue instanceof StatefulHashMap<?,?>) {
-                if (((StatefulHashMap)memberValue).size() == 0) {
-                  continue;
+      switch (type) {
+        case STRING:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case BYTES:
+        case FLOAT:
+        case DOUBLE:
+        case FIXED:
+          this.cassandraClient.addColumn(key, field.name(), value);
+          break;
+        case RECORD:
+          if (value != null) {
+            if (value instanceof PersistentBase) {
+              PersistentBase persistentBase = (PersistentBase) value;
+              for (Field member: schema.getFields()) {
+                
+                // TODO: hack, do not store empty arrays
+                Object memberValue = persistentBase.get(member.pos());
+                if (memberValue instanceof GenericArray<?>) {
+                  if (((GenericArray)memberValue).size() == 0) {
+                    continue;
+                  }
                 }
+                this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
               }
-
-              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
-            }
           } else {
             LOG.info("Record not supported: " + value.toString());
             
@@ -396,11 +398,52 @@ public class CassandraStore<K, T extends
           }
         }
         break;
+       case UNION:
+         if(value != null) {
+           LOG.info("Union being supported with value: " + value.toString());
+           // TODO add union schema index used
+           // adding union value
+           this.cassandraClient.addColumn(key, field.name(), value);
+         } else {
+           LOG.info("Union not supported: " + value.toString());
+         }
       default:
         LOG.info("Type not considered: " + type.name());      
     }
   }
 
+  /**
+   * Gets the position within the schema of the type used
+   * @param pValue
+   * @param pUnionSchema
+   * @return
+   */
+  private int getUnionSchema(Object pValue, Schema pUnionSchema){
+    int unionSchemaPos = 0;
+    String valueType = pValue.getClass().getSimpleName();
+    Iterator<Schema> it = pUnionSchema.getTypes().iterator();
+    while ( it.hasNext() ){
+      String schemaName = it.next().getName();
+      if (valueType.equals("Utf8") && schemaName.equals(Type.STRING.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("HeapByteBuffer") && schemaName.equals(Type.STRING.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Integer") && schemaName.equals(Type.INT.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Long") && schemaName.equals(Type.LONG.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Double") && schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Float") && schemaName.equals(Type.FLOAT.name().toLowerCase()))
+        return unionSchemaPos;
+      else if (valueType.equals("Boolean") && schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+        return unionSchemaPos;
+      unionSchemaPos ++;
+    }
+    // if we weren't able to determine which data type it is, then we return the default
+    return 0;
+  }
+
   @Override
   public boolean schemaExists() {
     LOG.info("schema exists");

Modified: gora/branches/GORA_174/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (original)
+++ gora/branches/GORA_174/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Tue Apr 30 17:45:09 2013
@@ -20,19 +20,16 @@
 <gora-orm>
   <keyspace name="Employee" host="localhost" cluster="Gora Cassandra Test Cluster">
     <family name="p"/>
-    <family name="f"/>
      <family name="sc" type="super" />
   </keyspace>
 
   <keyspace name="WebPage" host="localhost" cluster="Gora Cassandra Test Cluster">
     <family name="p"/>
-    <family name="f"/>
     <family name="sc" type="super"/>
   </keyspace>
 
   <keyspace name="TokenDatum" host="localhost" cluster="Gora Cassandra Test Cluster">
     <family name="p"/>
-    <family name="f"/>
     <family name="sc" type="super"/>
   </keyspace>
 
@@ -41,6 +38,8 @@
     <field name="dateOfBirth"  family="p" qualifier="info:db"/>
     <field name="ssn"  family="p" qualifier="info:sn"/>
     <field name="salary"  family="p" qualifier="info:sl"/>
+    <field name="boss" family="p" qualifier="info:bs"/>
+    <field name="webpage" family="p" qualifier="info:wp"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" keyspace="WebPage">

Modified: gora/branches/GORA_174/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java (original)
+++ gora/branches/GORA_174/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java Tue Apr 30 17:45:09 2013
@@ -38,7 +38,6 @@ import org.junit.Test;
 
 /**
  * Test for CassandraStore.
- * @author lewismc
  */
 public class TestCassandraStore extends DataStoreTestBase{
 	

Modified: gora/branches/GORA_174/gora-core/src/examples/avro/employee.json
URL: http://svn.apache.org/viewvc/gora/branches/GORA_174/gora-core/src/examples/avro/employee.json?rev=1477735&r1=1477734&r2=1477735&view=diff
==============================================================================
--- gora/branches/GORA_174/gora-core/src/examples/avro/employee.json (original)
+++ gora/branches/GORA_174/gora-core/src/examples/avro/employee.json Tue Apr 30 17:45:09 2013
@@ -9,25 +9,26 @@
       {"name": "salary", "type": "int"},
       {"name": "boss", "type":["null","Employee","string"]},
       {"name": "webpage", "type":["null",
-      							 {
-      							   "type": "record",
-      							   "name": "WebPage",
-      							   "namespace": "org.apache.gora.examples.generated",
-        							 "fields" : [
-          							 {"name": "url", "type": "string"},
-          							 {"name": "content", "type": ["null","bytes"]},
-          							 {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
-          							 {"name": "outlinks", "type": {"type":"map", "values":"string"}},
-          							 {"name": "metadata", "type": {
-            							 "name": "Metadata",
-            							 "type": "record",
-            							 "namespace": "org.apache.gora.examples.generated",
-            							 "fields": [
-              							 {"name": "version", "type": "int"},
-              							 {"name": "data", "type": {"type": "map", "values": "string"}}
-            							 ]
-          							 }}
-        							 ]
-      							 }]}
+        {
+      	  "type": "record",
+      	  "name": "WebPage",
+      	  "namespace": "org.apache.gora.examples.generated",
+          "fields" : [
+           {"name": "url", "type": "string"},
+           {"name": "content", "type": ["null","bytes"]},
+           {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
+           {"name": "outlinks", "type": {"type":"map", "values":"string"}},
+           {"name": "metadata", "type": {
+            "name": "Metadata",
+            "type": "record",
+            "namespace": "org.apache.gora.examples.generated",
+            "fields": [
+             {"name": "version", "type": "int"},
+             {"name": "data", "type": {"type": "map", "values": "string"}}
+            ]
+          }}
+          ]
+      	}
+      ]}
     ]
   }