You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:36:28 UTC

[11/50] [abbrv] GORA-321. Merge GORA_94 into Gora trunk

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 557c233..51138ca 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -21,6 +21,7 @@ package org.apache.gora.cassandra.store;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
@@ -40,30 +42,34 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraResult;
 import org.apache.gora.cassandra.query.CassandraResultSet;
 import org.apache.gora.cassandra.query.CassandraRow;
 import org.apache.gora.cassandra.query.CassandraSubColumn;
 import org.apache.gora.cassandra.query.CassandraSuperColumn;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.persistency.impl.StateManagerImpl;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.PartitionQueryImpl;
-import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * {@link org.apache.gora.cassandra.store.CassandraStore} is the primary class 
  * responsible for directing Gora CRUD operations into Cassandra. We (delegate) rely 
- * heavily on {@ link org.apache.gora.cassandra.store.CassandraClient} for many operations
+ * heavily on {@link org.apache.gora.cassandra.store.CassandraClient} for many operations
  * such as initialization, creating and deleting schemas (Cassandra Keyspaces), etc.  
  */
 public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
@@ -71,38 +77,45 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
   /** Logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
 
-  /** Consistency property level for Cassandra column families */
-  private static final String COL_FAM_CL = "cf.consistency.level";
+  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
 
-  /** Consistency property level for Cassandra read operations. */
-  private static final String READ_OP_CL = "read.consistency.level";
-
-  /** Consistency property level for Cassandra write operations. */
-  private static final String WRITE_OP_CL = "write.consistency.level";
-
-  /** Variables to hold different consistency levels defined by the properties. */
-  public static String colFamConsLvl;
-  public static String readOpConsLvl;
-  public static String writeOpConsLvl;
-
-  private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
+  /**
+   * Fixed string with value "UnionIndex" used to generate an extra column based on 
+   * the original field's name
+   */
+  public static String UNION_COL_SUFIX = "_UnionIndex";
 
   /**
-   * Default schema index used when AVRO Union data types are stored
+   * Default schema index with value "0" used when AVRO Union data types are stored
    */
   public static int DEFAULT_UNION_SCHEMA = 0;
-  
+
   /**
    * The values are Avro fields pending to be stored.
    *
    * We want to iterate over the keys in insertion order.
-   * We don't want to lock the entire collection before iterating over the keys, since in the meantime other threads are adding entries to the map.
+   * We don't want to lock the entire collection before iterating over the keys, 
+   * since in the meantime other threads are adding entries to the map.
    */
   private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>());
 
+  public static final ThreadLocal<BinaryEncoder> encoders =
+      new ThreadLocal<BinaryEncoder>();
+  
+  /**
+   * Create a {@link java.util.concurrent.ConcurrentHashMap} for the 
+   * datum readers and writers. 
+   * This is necessary because they are not thread safe, at least not before 
+   * Avro 1.4.0 (See AVRO-650).
+   * When they are thread safe, it is possible to maintain a single reader and
+   * writer pair for every schema, instead of one for every thread.
+   * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a>
+   */
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = 
+      new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+  
   /** The default constructor for CassandraStore */
   public CassandraStore() throws Exception {
-    // this.cassandraClient.initialize();
   }
 
   /** 
@@ -114,14 +127,6 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
   public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
     try {
       super.initialize(keyClass, persistent, properties);
-      if (autoCreateSchema) {
-        // If this is not set, then each Cassandra client should set its default
-        // column family
-        colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null);
-        // operations
-        readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null);
-        writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null);
-      }
       this.cassandraClient.initialize(keyClass, persistent);
     } catch (Exception e) {
       LOG.error(e.getMessage());
@@ -143,7 +148,6 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
 
   @Override
   public boolean delete(K key) {
-    LOG.debug("delete " + key);
     this.cassandraClient.deleteByKey(key);
     return true;
   }
@@ -162,7 +166,7 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
 
   /**
    * When executing Gora Queries in Cassandra we query the Cassandra keyspace by families.
-   * When add sub/supercolumns, Gora keys are mapped to Cassandra partition keys only. 
+   * When we add sub/supercolumns, Gora keys are mapped to Cassandra partition keys only. 
    * This is because we follow the Cassandra logic where column family data is 
    * partitioned across nodes based on row Key.
    */
@@ -238,14 +242,14 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
    * partitioned across nodes based on row Key.
    */
   private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery, 
-      CassandraResultSet cassandraResultSet) {
+      CassandraResultSet<K> cassandraResultSet) {
 
     List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
     for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
       K key = superRow.getKey();
       CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
-        cassandraRow = new CassandraRow();
+        cassandraRow = new CassandraRow<K>();
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
@@ -261,7 +265,11 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
   }
 
   /**
-   * Flush the buffer. Write the buffered rows.
+   * Flush the buffer which is a synchronized {@link java.util.LinkedHashMap}
+   * storing fields pending to be stored by 
+   * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, PersistentBase)}
+   * operations. Invoking this method therefore writes the buffered rows
+   * into Cassandra.
    * @see org.apache.gora.store.DataStore#flush()
    */
   @Override
@@ -270,24 +278,28 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
     Set<K> keys = this.buffer.keySet();
 
     // this duplicates memory footprint
+    @SuppressWarnings("unchecked")
     K[] keyArray = (K[]) keys.toArray();
 
-    // iterating over the key set directly would throw ConcurrentModificationException with java.util.HashMap and subclasses
+    // iterating over the key set directly would throw 
+    //ConcurrentModificationException with java.util.HashMap and subclasses
     for (K key: keyArray) {
       T value = this.buffer.get(key);
       if (value == null) {
-        LOG.info("Value to update is null for key " + key);
+        LOG.info("Value to update is null for key: " + key);
         continue;
       }
       Schema schema = value.getSchema();
+
       for (Field field: schema.getFields()) {
         if (value.isDirty(field.pos())) {
-          addOrUpdateField(key, field, value.get(field.pos()));
+          addOrUpdateField(key, field, field.schema(), value.get(field.pos()));
         }
       }
     }
 
-    // remove flushed rows
+    // remove flushed rows from the buffer as all 
+    // added or updated fields should now have been written.
     for (K key: keyArray) {
       this.buffer.remove(key);
     }
@@ -298,14 +310,31 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
     CassandraQuery<K,T> query = new CassandraQuery<K,T>();
     query.setDataStore(this);
     query.setKeyRange(key, key);
-    query.setFields(fields);
+    
+    if (fields == null){
+      fields = this.getFields();
+    }
+    // Generating UnionFields
+    ArrayList<String> unionFields = new ArrayList<String>();
+    for (String field: fields){
+      Field schemaField =this.fieldMap.get(field);
+      Type type = schemaField.schema().getType();
+      if (type.getName().equals("UNION".toLowerCase())){
+        unionFields.add(field+UNION_COL_SUFIX);
+      }
+    }
+    
+    String[] arr = unionFields.toArray(new String[unionFields.size()]);
+    String[] both = (String[]) ArrayUtils.addAll(fields, arr);
+    
+    query.setFields(both);
+
     query.setLimit(1);
     Result<K,T> result = execute(query);
     boolean hasResult = false;
     try {
       hasResult = result.next();
     } catch (Exception e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
     return hasResult ? result.get() : null;
@@ -314,7 +343,7 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
   @Override
   public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
       throws IOException {
-    // just a single partition
+    // TODO GORA-298 Implement CassandraStore#getPartitions
     List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
     PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<K, T>(query);
     pqi.setConf(getConf());
@@ -339,168 +368,287 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
   }
 
   /**
-   * Duplicate instance to keep all the objects in memory till flushing.
-   * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
+   * 
+   * When doing the 
+   * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, PersistentBase)}
+   * operation, the logic is as follows:
+   * <ol>
+   * <li>Obtain the Avro {@link org.apache.avro.Schema} for the object.</li>
+   * <li>Create a new duplicate instance of the object (explained in more detail below) **.</li>
+   * <li>Obtain a {@link java.util.List} of the {@link org.apache.avro.Schema} 
+   * {@link org.apache.avro.Schema.Field}'s.</li>
+   * <li>Iterate through the field {@link java.util.List}. This allows us to 
+   * consequently process each item.</li>
+   * <li>Check to see if the {@link org.apache.avro.Schema.Field} is NOT dirty. 
+   * If this condition is true then we DO NOT process this field.</li>
+   * <li>Obtain the element at the specified position in this list so we can 
+   * directly operate on it.</li>
+   * <li>Obtain the {@link org.apache.avro.Schema.Type} of the element obtained 
+   * above and process it accordingly. N.B. For nested type ARRAY, MAP
+   * RECORD or UNION, we shadow the checks in bullet point 5 above to infer that the 
+   * {@link org.apache.avro.Schema.Field} is either at 
+   * position 0 OR it is NOT dirty. If one of these conditions is true then we DO NOT
+   * process this field. This is carried out in 
+   * {@link org.apache.gora.cassandra.store.CassandraStore#getFieldValue(Schema, Type, Object)}</li>
+   * <li>We then insert the Key and Object into the {@link java.util.LinkedHashMap} buffer 
+   * before being flushed. This performs a structural modification of the map.</li>
+   * </ol>
+   * ** We create a duplicate instance of the object to be persisted and insert processed
+   * objects into a synchronized {@link java.util.LinkedHashMap}. This allows 
+   * us to keep all the objects in memory till flushing.
+   * @see org.apache.gora.store.DataStore#put(java.lang.Object, 
+   * org.apache.gora.persistency.Persistent).
+   * @param key for the Avro Record (object).
+   * @param value Record object to be persisted in Cassandra
    */
   @Override
   public void put(K key, T value) {
-    T p = (T) value.newInstance(new StateManagerImpl());
     Schema schema = value.getSchema();
-    for (Field field: schema.getFields()) {
-      int fieldPos = field.pos();
-      if (value.isDirty(fieldPos)) {
-        Object fieldValue = value.get(fieldPos);
-
-        // check if field has a nested structure (array, map, or record)
-        Schema fieldSchema = field.schema();
-        Type type = fieldSchema.getType();
-        switch(type) {
-        case RECORD:
-          PersistentBase persistent = (PersistentBase) fieldValue;
-          PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
-          for (Field member: fieldSchema.getFields()) {
-            newRecord.put(member.pos(), persistent.get(member.pos()));
-          }
-          fieldValue = newRecord;
-          break;
-        case MAP:
-          StatefulHashMap map = (StatefulHashMap) fieldValue;
-          StatefulHashMap newMap = new StatefulHashMap();
-          for (Object mapKey : map.keySet()) {
-            newMap.put(mapKey, map.get(mapKey));
-            newMap.putState(mapKey, map.getState(mapKey));
-          }
-          fieldValue = newMap;
-          break;
-        case ARRAY:
-          GenericArray array = (GenericArray) fieldValue;
-          ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
-          Iterator iter = array.iterator();
-          while (iter.hasNext()) {
-            newArray.add(iter.next());
-          }
-          fieldValue = newArray;
-          break;
-        case UNION:
-          // storing the union selected schema, the actual value will be stored as soon as getting out of here
-          // TODO determine which schema we are using: int schemaPos = getUnionSchema(fieldValue,fieldSchema);
-          // and save it p.put( p.getFieldIndex(field.name() + CassandraStore.UNION_COL_SUFIX), schemaPos);
-          break;
-        }
-
-        p.put(fieldPos, fieldValue);
+    @SuppressWarnings("unchecked")
+    T p = (T) SpecificData.get().newRecord(value, schema);
+    List<Field> fields = schema.getFields();
+    for (int i = 1; i < fields.size(); i++) {
+      if (!value.isDirty(i)) {
+        continue;
       }
+      Field field = fields.get(i);
+      Type type = field.schema().getType();
+      Object fieldValue = value.get(field.pos());
+      Schema fieldSchema = field.schema();
+      // check if field has a nested structure (array, map, record or union)
+      fieldValue = getFieldValue(fieldSchema, type, fieldValue);
+      p.put(field.pos(), fieldValue);
     }
-
     // this performs a structural modification of the map
     this.buffer.put(key, p);
   }
 
   /**
+   * For every field within an object, we pass in a field schema, Type and value.
+   * This enables us to process fields (based on their characteristics) 
+   * preparing them for persistence.
+   * @param fieldSchema the associated field schema
+   * @param type the field type
+   * @param fieldValue the field value.
+   * @return
+   */
+  private Object getFieldValue(Schema fieldSchema, Type type, Object fieldValue ){
+    switch(type) {
+    case RECORD:
+      Persistent persistent = (Persistent) fieldValue;
+      Persistent newRecord = (Persistent) SpecificData.get().newRecord(persistent, persistent.getSchema());
+      for (Field member: fieldSchema.getFields()) {
+        if (member.pos() == 0 || !persistent.isDirty()) {
+          continue;
+        }
+        Schema memberSchema = member.schema();
+        Type memberType = memberSchema.getType();
+        Object memberValue = persistent.get(member.pos());
+        newRecord.put(member.pos(), getFieldValue(memberSchema, memberType, memberValue));
+      }
+      fieldValue = newRecord;
+      break;
+    case MAP:
+      Map<?, ?> map = (Map<?, ?>) fieldValue;
+      fieldValue = map;
+      break;
+    case ARRAY:
+      fieldValue = (List<?>) fieldValue;
+      break;
+    case UNION:
+      // storing the union selected schema, the actual value will 
+      // be stored as soon as we get break out.
+      if (fieldValue != null){
+        int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+        Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+        Type unionType = unionSchema.getType();
+        fieldValue = getFieldValue(unionSchema, unionType, fieldValue);
+      }
+      //p.put( schemaPos, p.getSchema().getField(field.name() + CassandraStore.UNION_COL_SUFIX));
+      //p.put(fieldPos, fieldValue);
+      break;
+    default:
+      break;
+    }    
+    return fieldValue;
+  }
+  
+  /**
    * Add a field to Cassandra according to its type.
    * @param key     the key of the row where the field should be added
    * @param field   the Avro field representing a datum
+   * @param schema  the schema belonging to the particular Avro field
    * @param value   the field value
    */
-  private void addOrUpdateField(K key, Field field, Object value) {
-    Schema schema = field.schema();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void addOrUpdateField(K key, Field field, Schema schema, Object value) {
     Type type = schema.getType();
-    switch (type) {
-    case STRING:
-    case BOOLEAN:
-    case INT:
-    case LONG:
-    case BYTES:
-    case FLOAT:
-    case DOUBLE:
-    case FIXED:
-      this.cassandraClient.addColumn(key, field.name(), value);
-      break;
-    case RECORD:
-      if (value != null) {
-        if (value instanceof PersistentBase) {
-          PersistentBase persistentBase = (PersistentBase) value;
-          for (Field member: schema.getFields()) {
-
-            // TODO: hack, do not store empty arrays
-            Object memberValue = persistentBase.get(member.pos());
-            if (memberValue instanceof GenericArray<?>) {
-              if (((GenericArray)memberValue).size() == 0) {
-                continue;
+    // checking if the value to be updated is used for saving union schema
+    if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
+      switch (type) {
+      case STRING:
+      case BOOLEAN:
+      case INT:
+      case LONG:
+      case BYTES:
+      case FLOAT:
+      case DOUBLE:
+      case FIXED:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case RECORD:
+        if (value != null) {
+          if (value instanceof PersistentBase) {
+            PersistentBase persistentBase = (PersistentBase) value;            
+            try {
+              byte[] byteValue = AvroSerializerUtil.serializer(persistentBase, schema);
+              this.cassandraClient.addColumn(key, field.name(), byteValue);
+            } catch (IOException e) {
+              LOG.warn(field.name() + " named record could not be serialized.");
+            }
+          } else {
+            LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name());
+          }
+        } else {
+          LOG.warn("Setting content of: " + field.name() + " to null.");
+          String familyName =  this.cassandraClient.getCassandraMapping().getFamily(field.name());
+          this.cassandraClient.deleteColumn(key, familyName, this.cassandraClient.toByteBuffer(field.name()));
+        }
+        break;
+      case MAP:
+        if (value != null) {
+          if (value instanceof Map<?, ?>) {            
+            Map<CharSequence,Object> map = (Map<CharSequence,Object>)value;
+            Schema valueSchema = schema.getValueType();
+            Type valueType = valueSchema.getType();
+            if (Type.UNION.equals(valueType)){
+              Map<CharSequence,Object> valueMap = new HashMap<CharSequence, Object>();
+              for (CharSequence mapKey: map.keySet()) {
+                Object mapValue = map.get(mapKey);
+                int valueUnionIndex = getUnionSchema(mapValue, valueSchema);
+                valueMap.put((mapKey+UNION_COL_SUFIX), valueUnionIndex);
+                valueMap.put(mapKey, mapValue);
               }
+              map = valueMap;
             }
-            this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+            
+            String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+            
+            // If map is not super column. We using Avro serializer. 
+            if (!this.cassandraClient.isSuper( familyName )){
+              try {
+                byte[] byteValue = AvroSerializerUtil.serializer(map, schema);
+                this.cassandraClient.addColumn(key, field.name(), byteValue);
+              } catch (IOException e) {
+                LOG.warn(field.name() + " named map could not be serialized.");
+              }
+            }else{
+              this.cassandraClient.addStatefulHashMap(key, field.name(), map);              
+            }
+          } else {
+            LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
           }
         } else {
-          LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name()); 
+          // delete map
+          LOG.warn("Setting content of: " + field.name() + " to null.");
+          this.cassandraClient.deleteStatefulHashMap(key, field.name());
         }
-      }
-      break;
-    case MAP:
-      if (value != null) {
-        if (value instanceof StatefulHashMap<?, ?>) {
-          this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
+        break;
+      case ARRAY:
+        if (value != null) {
+          if (value instanceof DirtyListWrapper<?>) {
+            DirtyListWrapper fieldValue = (DirtyListWrapper<?>)value;
+            GenericArray valueArray = new Array(fieldValue.size(), schema);
+            for (int i = 0; i < fieldValue.size(); i++) {
+              valueArray.add(i, fieldValue.get(i));
+            }
+            this.cassandraClient.addGenericArray(key, field.name(), (GenericArray<?>)valueArray);
+          } else {
+            LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name());
+          }
         } else {
-          LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
+          LOG.warn("Setting content of: " + field.name() + " to null.");
+          this.cassandraClient.deleteGenericArray(key, field.name());
         }
-      }
-      break;
-    case ARRAY:
-      if (value != null) {
-        if (value instanceof GenericArray<?>) {
-          this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
+        break;
+      case UNION:
+     // adding union schema index
+        String columnName = field.name() + UNION_COL_SUFIX;
+        String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+        if(value != null) {
+          int schemaPos = getUnionSchema(value, schema);
+          LOG.debug("Union with value: " + value.toString() + " at index: " + schemaPos + " supported for field: " + field.name());
+          this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
+          if (this.cassandraClient.isSuper( familyName )){
+            this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);
+          }else{
+            this.cassandraClient.addColumn(key, columnName, schemaPos);
+            
+          }
+          //this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
+          // adding union value
+          Schema unionSchema = schema.getTypes().get(schemaPos);
+          addOrUpdateField(key, field, unionSchema, value);
+          //this.cassandraClient.addColumn(key, field.name(), value);
         } else {
-          LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name());
+          LOG.warn("Setting content of: " + field.name() + " to null.");
+          if (this.cassandraClient.isSuper( familyName )){
+            this.cassandraClient.deleteSubColumn(key, field.name());
+          } else {
+            this.cassandraClient.deleteColumn(key, familyName, this.cassandraClient.toByteBuffer(field.name()));
+          }
         }
+        break;
+      default:
+        LOG.warn("Type: " + type.name() + " not considered for field: " + field.name() + ". Please report this to dev@gora.apache.org");
       }
-      break;
-    case UNION:
-      if(value != null) {
-        LOG.debug("Union with value: " + value.toString() + " at index: " + getUnionSchema(value, schema) + " supported for field: " + field.name());
-        this.cassandraClient.addColumn(key, field.name(), value);
-      } else {
-        LOG.warn("Union with 'null' value not supported for field: " + field.name());
-      }
-    default:
-      LOG.warn("Type: " + type.name() + " not considered for field: " + field.name() + ". Please report this to dev@gora.apache.org");
     }
   }
 
   /**
-   * Gets the position within the schema of the type used
+   * Given an object and the object schema this function obtains,
+   * from within the UNION schema, the position of the type used.
+   * If no data type can be inferred then we return a default value
+   * of position 0.
    * @param pValue
    * @param pUnionSchema
-   * @return
+   * @return the unionSchemaPosition.
    */
   private int getUnionSchema(Object pValue, Schema pUnionSchema){
     int unionSchemaPos = 0;
-    String valueType = pValue.getClass().getSimpleName();
+//    String valueType = pValue.getClass().getSimpleName();
     Iterator<Schema> it = pUnionSchema.getTypes().iterator();
     while ( it.hasNext() ){
-      String schemaName = it.next().getName();
-      if (valueType.equals("Utf8") && schemaName.equals(Type.STRING.name().toLowerCase()))
+      Type schemaType = it.next().getType();
+      if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
         return unionSchemaPos;
-      else if (valueType.equals("HeapByteBuffer") && schemaName.equals(Type.STRING.name().toLowerCase()))
+      else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
         return unionSchemaPos;
-      else if (valueType.equals("Integer") && schemaName.equals(Type.INT.name().toLowerCase()))
+      else if (pValue instanceof Integer && schemaType.equals(Type.INT))
         return unionSchemaPos;
-      else if (valueType.equals("Long") && schemaName.equals(Type.LONG.name().toLowerCase()))
+      else if (pValue instanceof Long && schemaType.equals(Type.LONG))
         return unionSchemaPos;
-      else if (valueType.equals("Double") && schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+      else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
         return unionSchemaPos;
-      else if (valueType.equals("Float") && schemaName.equals(Type.FLOAT.name().toLowerCase()))
+      else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
         return unionSchemaPos;
-      else if (valueType.equals("Boolean") && schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+      else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+        return unionSchemaPos;
+      else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+        return unionSchemaPos;
+      else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
         return unionSchemaPos;
       unionSchemaPos ++;
     }
     // if we weren't able to determine which data type it is, then we return the default
-    return 0;
+    return DEFAULT_UNION_SCHEMA;
   }
 
   /**
-   * Checks to see if a Cassandra Keyspace actually exists.
-   * Returns true if it does.
+   * Simple method to check if a Cassandra Keyspace exists.
+   * @return true if a Keyspace exists.
    */
   @Override
   public boolean schemaExists() {

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
index f6b4bbd..da13f04 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
@@ -27,7 +27,6 @@ import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
 import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.MutationResult;
 import me.prettyprint.hector.api.mutation.Mutator;
 
 import org.apache.gora.persistency.Persistent;
@@ -39,7 +38,6 @@ import org.apache.gora.persistency.Persistent;
  */
 public class HectorUtils<K,T extends Persistent> {
 
-  /** Methods to insert columns. */
   public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
     mutator.insert(key, columnFamily, createColumn(columnName, columnValue, ttlAttr));
   }
@@ -48,7 +46,7 @@ public class HectorUtils<K,T extends Persistent> {
     mutator.insert(key, columnFamily, createColumn(columnName, columnValue, ttlAttr));
   }
 
-  /** Methods to create columns. */
+
   public static<K> HColumn<ByteBuffer,ByteBuffer> createColumn(ByteBuffer name, ByteBuffer value, String ttlAttr) {
     return HFactory.createColumn(name, value, ByteBufferSerializer.get(), ByteBufferSerializer.get()).setTtl(Integer.parseInt(ttlAttr));
   }
@@ -61,7 +59,7 @@ public class HectorUtils<K,T extends Persistent> {
     return HFactory.createColumn(name, value, IntegerSerializer.get(), ByteBufferSerializer.get()).setTtl(Integer.parseInt(ttlAttr));
   }
 
-  /** Methods to create subColumns. */
+
   public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
     mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr));
   }
@@ -74,30 +72,25 @@ public class HectorUtils<K,T extends Persistent> {
     mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr));
   }
 
-  /** Methods to delete subColumns. */
+
   public static<K> void deleteSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName) {
     mutator.subDelete(key, columnFamily, superColumnName, columnName, StringSerializer.get(), ByteBufferSerializer.get());
   }
 
-  /** Methods do delete columns. */
   public static<K> void deleteColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName){
-    MutationResult mr = mutator.delete(key, columnFamily, columnName, ByteBufferSerializer.get());
-    System.out.println(mr.toString());
+    mutator.delete(key, columnFamily, columnName, ByteBufferSerializer.get());
   }
 
-  /** Methods to create superColumns. */
-  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
     return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
   }
 
-  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue, String ttlAttr) {
     return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
   }
 
-  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue, String ttlAttr) {
     return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/conf/cassandra.yaml b/gora-cassandra/src/test/conf/cassandra.yaml
index e56d1b5..772648d 100644
--- a/gora-cassandra/src/test/conf/cassandra.yaml
+++ b/gora-cassandra/src/test/conf/cassandra.yaml
@@ -7,7 +7,7 @@
 
 # The name of the cluster. This is mainly used to prevent machines in
 # one logical cluster from joining another.
-cluster_name: 'Gora Cassandra Test Cluster'
+cluster_name: "Gora Cassandra Test Cluster"
 
 # You should always specify InitialToken when setting up a production
 # cluster for the first time, and often when adding capacity later.
@@ -27,13 +27,13 @@ hinted_handoff_enabled: true
 # generated. After it has been dead this long, hints will be dropped.
 max_hint_window_in_ms: 3600000 # one hour
 # Sleep this long after delivering each row or row fragment
-hinted_handoff_throttle_delay_in_ms: 50
+#hinted_handoff_throttle_delay_in_ms: 50 (deprecated in 2.0.2)
 
 # authentication backend, implementing IAuthenticator; used to identify users
 authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
 
 # authorization backend, implementing IAuthority; used to limit access/provide permissions
-authority: org.apache.cassandra.auth.AllowAllAuthority
+authorizer: org.apache.cassandra.auth.AllowAllAuthorizer
 
 # The partitioner is responsible for distributing rows (by key) across
 # nodes in the cluster. Any IPartitioner may be used, including your
@@ -109,7 +109,7 @@ seed_provider:
 # it is most effective under light to moderate load, or read-heavy
 # workloads; under truly massive write load, it will often be too
 # little, too late.
-flush_largest_memtables_at: 0.75
+#flush_largest_memtables_at: 0.75 (deprecated in 2.0.2)
 
 # emergency pressure valve #2: the first time heap usage after a full
 # (CMS) garbage collection is above this fraction of the max,
@@ -120,8 +120,8 @@ flush_largest_memtables_at: 0.75
 #
 # Set to 1.0 to disable. Setting this lower than
 # CMSInitiatingOccupancyFraction is not likely to be useful.
-reduce_cache_sizes_at: 0.85
-reduce_cache_capacity_to: 0.6
+#reduce_cache_sizes_at: 0.85 (deprecated in 2.0.2)
+#reduce_cache_capacity_to: 0.6 (deprecated in 2.0.2)
 
 # For workloads with more data than can fit in memory, Cassandra's
 # bottleneck will be reads that need to fetch data from
@@ -311,7 +311,7 @@ compaction_preheat_key_cache: true
 # stream_throughput_outbound_megabits_per_sec: 400
 
 # Time to wait for a reply from other nodes before failing the command
-rpc_timeout_in_ms: 10000
+#rpc_timeout_in_ms: 10000 (deprecated in 2.0.2)
 
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.
@@ -410,7 +410,7 @@ index_interval: 128
 # The passwords used in these options must match the passwords used when generating
 # the keystore and truststore. For instructions on generating these files, see:
 # http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
-encryption_options:
+server_encryption_options:
     internode_encryption: none
     keystore: conf/.keystore
     keystore_password: cassandra

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml b/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
index cd54821..c626b98 100644
--- a/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
+++ b/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
@@ -47,7 +47,8 @@
     <field name="content" family="p" qualifier="p:cnt:c" ttl="10"/>
     <field name="parsedContent" family="sc" qualifier="p:parsedContent" ttl="10"/>
     <field name="outlinks" family="sc" qualifier="p:outlinks" ttl="10"/>
-    <field name="metadata" family="sc" qualifier="c:mt" ttl="10"/>
+    <field name="headers" family="sc" qualifier="p:headers" ttl="10"/>
+    <field name="metadata" family="p" qualifier="c:mt" ttl="10"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" keyspace="TokenDatum">

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/test/conf/gora.properties
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/conf/gora.properties b/gora-cassandra/src/test/conf/gora.properties
index 9de4fce..80427b4 100644
--- a/gora-cassandra/src/test/conf/gora.properties
+++ b/gora-cassandra/src/test/conf/gora.properties
@@ -14,10 +14,16 @@
 # limitations under the License.
 
 gora.datastore.default=org.apache.gora.cassandra.CassandraStore
-gora.cassandrastore.cluster=Test Cluster
-gora.cassandrastore.host=localhost
-# property is annotated in CassandraClient#checkKeyspace()
-# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL. 
-gora.cassandrastore.cf.consistency.level=ONE
-gora.cassandrastore.read.consistency.level=QUORUM
-gora.cassandrastore.write.consistency.level=ONE
+gora.cassandrastore.keyspace=
+gora.cassandrastore.name=
+gora.cassandrastore.class=
+gora.cassandrastore.qualifier=
+gora.cassandrastore.family=
+gora.cassandrastore.type=
+gora.cassandraStore.cluster=Test Cluster
+gora.cassandraStore.host=localhost
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/test/conf/log4j-server.properties
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/conf/log4j-server.properties b/gora-cassandra/src/test/conf/log4j-server.properties
index 086306e..266738d 100644
--- a/gora-cassandra/src/test/conf/log4j-server.properties
+++ b/gora-cassandra/src/test/conf/log4j-server.properties
@@ -42,3 +42,6 @@ log4j.appender.R.File=/var/log/cassandra/system.log
 # Adding this to avoid thrift logging disconnect errors.
 log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
 
+# Add for gora-cassandra specific logging duing tests
+log4j.logger.org.apache.gora.cassandra=DEBUG
+

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
index 7f0eca0..ef9410e 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
@@ -23,17 +23,13 @@
 
 package org.apache.gora.cassandra;
 
-import java.io.IOException;
-
 import org.apache.gora.GoraTestDriver;
 import org.apache.gora.cassandra.store.CassandraStore;
 
-import org.apache.hadoop.conf.Configuration;
-
 import java.io.File;
 
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.cassandra.service.CassandraDaemon;
 
 // Logging imports
 import org.slf4j.Logger;
@@ -70,7 +66,7 @@ public class GoraCassandraTestDriver extends GoraTestDriver {
   }
 	
   /**
-   * starts embedded Cassandra server.
+   * Starts embedded Cassandra server.
    *
    * @throws Exception
    * 	if an error occurs
@@ -91,21 +87,21 @@ public class GoraCassandraTestDriver extends GoraTestDriver {
 	
         public void run() {
           try {
-	    cassandraDaemon.start();
-	  } catch (Exception e) {
-	    log.error("Embedded casandra server run failed!", e);
-	  }
+            cassandraDaemon.start();
+          } catch (Exception e) {
+            log.error("Embedded casandra server run failed!", e);
+          }
         }
       });
 	
       cassandraThread.setDaemon(true);
       cassandraThread.start();
-      } catch (Exception e) {
-	log.error("Embedded casandra server start failed!", e);
+    } catch (Exception e) {
+      log.error("Embedded casandra server start failed!", e);
 
-	// cleanup
-	tearDownClass();
-      }
+      // cleanup
+      tearDownClass();
+    }
   }
 
   /**
@@ -136,15 +132,15 @@ public class GoraCassandraTestDriver extends GoraTestDriver {
     int tries = 3;
     while (tries-- > 0) {
       try {
-	cleanupDirectories();
-	break;
+        cleanupDirectories();
+        break;
       } catch (Exception e) {
-	// ignore exception
-	try {
-	  Thread.sleep(250);
-	} catch (InterruptedException e1) {
-	  // ignore exception
-	}
+        // ignore exception
+        try {
+          Thread.sleep(250);
+        } catch (InterruptedException e1) {
+          // ignore exception
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
index e80dff4..aad1a87 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
@@ -68,49 +68,37 @@ public class TestCassandraStore extends DataStoreTestBase{
   }
 
 
-  //We need to skip the following tests for a while until we fix some issues..
-
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetWebPageDefaultFields() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQuery() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryStartKey() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryEndKey() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryKeyRange() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+  @Override
+  public void testQueryWebPageSingleKey() throws IOException {}
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryWebPageSingleKeyDefaultFields() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+  @Override
+  public void testQueryWebPageQueryEmptyResults() throws IOException {}
+  @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
   @Override
   public void testDelete() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
   @Override
   public void testDeleteByQuery() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
   @Override
   public void testDeleteByQueryFields() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-298 Implement CassandraStore#getPartitions")
   @Override
   public void testGetPartitions() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetRecursive() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetDoubleRecursive() throws IOException{}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetNested() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGet3UnionField() throws IOException {}
-
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-compiler-cli/pom.xml
----------------------------------------------------------------------
diff --git a/gora-compiler-cli/pom.xml b/gora-compiler-cli/pom.xml
new file mode 100644
index 0000000..873f64a
--- /dev/null
+++ b/gora-compiler-cli/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+     <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.gora</groupId>
+	<artifactId>gora</artifactId>
+	<version>0.4-SNAPSHOT</version>
+	<relativePath>../</relativePath>
+    </parent>
+    <artifactId>gora-compiler-cli</artifactId>
+    <packaging>bundle</packaging>
+
+    <name>Apache Gora :: Compiler-CLI</name>
+        <url>http://gora.apache.org</url>
+    <description>The Apache Gora open source framework provides an in-memory data model and 
+    persistence for big data. Gora supports persisting to column stores, key value stores, 
+    document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce 
+    support.</description>
+    <inceptionYear>2010</inceptionYear>
+    <organization>
+    	<name>The Apache Software Foundation</name>
+    	<url>http://www.apache.org/</url>
+    </organization>
+    <scm>
+    	<url>http://svn.apache.org/viewvc/gora/trunk/gora-compiler-cli/</url>
+    	<connection>scm:svn:http://svn.apache.org/repos/asf/gora/trunk/gora-compiler-cli/</connection>
+    	<developerConnection>scm:svn:https://svn.apache.org/repos/asf/gora/trunk/gora-compiler-cli/</developerConnection>
+    </scm>
+    <issueManagement>
+    	<system>JIRA</system>
+    	<url>https://issues.apache.org/jira/browse/GORA</url>
+    </issueManagement>
+    <ciManagement>
+    	<system>Jenkins</system>
+    	<url>https://builds.apache.org/job/Gora-trunk/</url>
+    </ciManagement>
+
+    <properties>
+        <osgi.import>*</osgi.import>
+        <osgi.export>org.apache.gora.compiler.cli*;version="${project.version}";-noimport:=true</osgi.export>
+    </properties>
+    <dependencies>
+        <dependency>
+	    <artifactId>gora-compiler</artifactId>
+	    <groupId>org.apache.gora</groupId>
+	</dependency>
+	<dependency>
+	    <groupId>org.slf4j</groupId>
+	    <artifactId>slf4j-simple</artifactId>
+	</dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java
----------------------------------------------------------------------
diff --git a/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java b/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java
new file mode 100644
index 0000000..1849228
--- /dev/null
+++ b/gora-compiler-cli/src/main/java/org/apache/gora/compiler/cli/GoraCompilerCLI.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.compiler.cli;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.gora.compiler.GoraCompiler;
+
+
+public class GoraCompilerCLI {
+
+  public static void main(String[] args) {
+    if(args.length == 1 && (args[0].equals("--help") || args[0].equals("-h"))){
+      printHelp();
+      System.exit(0);
+    }
+    if(args.length < 2){
+      System.err.println("Must supply at least one source file and an output directory.");
+      printHelp();
+      System.exit(1);
+    }
+    File outputDir = new File(args[args.length-1]);
+    if(!outputDir.isDirectory()){
+      System.err.println("Must supply a directory for output");
+      printHelp();
+      System.exit(1);
+    }
+    File[] inputs = new File[args.length-1];
+    for(int i  = 0; i<inputs.length; i++){
+      File inputFile = new File(args[i]);
+      if(!inputFile.isFile()){
+        System.err.println("Input must be a file.");
+        printHelp();
+        System.exit(1);
+      }
+      inputs[i] = inputFile;
+    }
+    try {
+      GoraCompiler.compileSchema(inputs, outputDir);
+      System.out.println("Compiler executed SUCCESSFULL.");
+    } catch (IOException e) {
+      System.err.println("Error while compiling schema files. Check that the schemas are properly formatted.");
+      printHelp();
+      e.printStackTrace(System.err);
+    }
+  }
+
+  private static void printHelp() {
+    PrintStream out = System.out;
+    out.println("Usage: gora-compiler ( -h | --help ) | (<input> [<input>...] <output>)");
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-compiler/pom.xml
----------------------------------------------------------------------
diff --git a/gora-compiler/pom.xml b/gora-compiler/pom.xml
new file mode 100644
index 0000000..5d15bd7
--- /dev/null
+++ b/gora-compiler/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+     <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.gora</groupId>
+	<artifactId>gora</artifactId>
+	<version>0.4-SNAPSHOT</version>
+	<relativePath>../</relativePath>
+    </parent>
+    <artifactId>gora-compiler</artifactId>
+    <packaging>bundle</packaging>
+
+    <name>Apache Gora :: Compiler</name>
+        <url>http://gora.apache.org</url>
+    <description>The Apache Gora open source framework provides an in-memory data model and 
+    persistence for big data. Gora supports persisting to column stores, key value stores, 
+    document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce 
+    support.</description>
+    <inceptionYear>2010</inceptionYear>
+    <organization>
+    	<name>The Apache Software Foundation</name>
+    	<url>http://www.apache.org/</url>
+    </organization>
+    <scm>
+    	<url>http://svn.apache.org/viewvc/gora/trunk/gora-compiler/</url>
+    	<connection>scm:svn:http://svn.apache.org/repos/asf/gora/trunk/gora-compiler/</connection>
+    	<developerConnection>scm:svn:https://svn.apache.org/repos/asf/gora/trunk/gora-compiler/</developerConnection>
+    </scm>
+    <issueManagement>
+    	<system>JIRA</system>
+    	<url>https://issues.apache.org/jira/browse/GORA</url>
+    </issueManagement>
+    <ciManagement>
+    	<system>Jenkins</system>
+    	<url>https://builds.apache.org/job/Gora-trunk/</url>
+    </ciManagement>
+
+    <properties>
+        <osgi.import>*</osgi.import>
+        <osgi.export>org.apache.gora.compiler*;version="${project.version}";-noimport:=true</osgi.export>
+    </properties>
+    <build>
+        <directory>target</directory>
+        <outputDirectory>target/classes</outputDirectory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <testOutputDirectory>target/test-classes</testOutputDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testResources>
+            <testResource>
+                <directory>src/test/conf/</directory>
+                <includes>
+                    <include>**</include>
+                </includes>
+            </testResource>
+        </testResources>
+	<resources>
+	    <resource>
+	        <directory>src/main/velocity</directory>
+		<includes>
+		    <include>**/*.vm</include>
+		</includes>
+	    </resource>
+	</resources>
+	<plugins>
+	    <plugin>
+	        <groupId>org.apache.maven.plugins</groupId>
+		<artifactId>maven-jar-plugin</artifactId>
+		<configuration>
+		    <archive>
+		        <manifest>
+			    <mainClass>org.apache.gora.compiler.GoraCompiler</mainClass>
+			    <packageName>org.apache.gora.compiler</packageName>
+			</manifest>
+		    </archive>
+		</configuration>
+	    </plugin>
+	</plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+	    <groupId>org.apache.avro</groupId>
+	    <artifactId>avro-compiler</artifactId>
+	</dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java
----------------------------------------------------------------------
diff --git a/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java b/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java
new file mode 100644
index 0000000..b4d77c5
--- /dev/null
+++ b/gora-compiler/src/main/java/org/apache/gora/compiler/GoraCompiler.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.compiler;
+
+import java.beans.PersistenceDelegate;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.compiler.specific.SpecificCompiler;
+import org.apache.avro.generic.GenericData.StringType;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+public class GoraCompiler extends SpecificCompiler {
+
+  public static String DIRTY_BYTES_FIELD_NAME = "__g__dirty";
+  public static final int FIRST_UNMANAGED_FIELD_INDEX = 1;
+
+  private static final Set<String> GORA_RESERVED_NAMES = new HashSet<String>();
+  
+  static {
+    GORA_RESERVED_NAMES.addAll(Arrays.asList(new String[] { DIRTY_BYTES_FIELD_NAME }));
+  }
+
+  private static final Set<String> GORA_HIDDEN_FIELD_NAMES = new HashSet<String>();
+  
+  static {
+    GORA_HIDDEN_FIELD_NAMES.add(DIRTY_BYTES_FIELD_NAME);
+  }
+  
+  public static void compileSchema(File[] srcFiles, File dest)
+      throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+
+    for (File src : srcFiles) {
+      System.out.println("Compiling: " + src.getAbsolutePath());
+      Schema originalSchema = parser.parse(src);
+      Map<Schema,Schema> queue = new HashMap<Schema,Schema>();
+      Schema newSchema = getSchemaWithDirtySupport(originalSchema, queue);
+      GoraCompiler compiler = new GoraCompiler(newSchema);
+      compiler.setTemplateDir("/org/apache/gora/compiler/templates/");
+      compiler.compileToDestination(src, dest);
+      System.out.println("Compiled into: " + dest.getAbsolutePath());
+    }
+  }
+
+  public static String generateAppropriateImmutabilityModifier(Schema schema){
+    switch (schema.getType()) {
+      case BYTES:
+        return ".asReadOnlyBuffer()";
+      default:
+        return "";
+    }
+  }
+
+  public static String generateAppropriateWrapperOrValue(Schema schema) {
+    switch (schema.getType()) {
+      case MAP:
+        return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+          + "value : new org.apache.gora.persistency.impl.DirtyMapWrapper(value)";
+      case ARRAY:
+        return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+          + "value : new org.apache.gora.persistency.impl.DirtyListWrapper(value)";
+      case BYTES:
+        return "deepCopyToReadOnlyBuffer(value)";
+      default:
+        return "value";
+    }
+  }
+
+  public static String generateAppropriateWrapperOrValueForPut(Schema schema) {
+    switch (schema.getType()) {
+      case MAP:
+        return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+          + "value : new org.apache.gora.persistency.impl.DirtyMapWrapper((java.util.Map)value)";
+      case ARRAY:
+        return "(value instanceof org.apache.gora.persistency.Dirtyable) ? "
+          + "value : new org.apache.gora.persistency.impl.DirtyListWrapper((java.util.List)value)";
+      default:
+        return "value";
+    }
+  }
+
+  public static String generateAppropriateWrapper(Schema schema, Field field) {
+    if (field.name() == "__g__dirty") {
+      return "java.nio.ByteBuffer.wrap(new byte["
+        + getNumberOfBytesNeededForDirtyBits(schema) + "])";
+    } else {
+      switch (field.schema().getType()) {
+      case RECORD:
+        return field.schema().getName()+".newBuilder().build()";
+      case MAP:
+        return "new org.apache.gora.persistency.impl.DirtyMapWrapper((java.util.Map)defaultValue(fields()["+field.pos()+"]))";
+      case ARRAY:
+        return "new org.apache.gora.persistency.impl.DirtyListWrapper((java.util.List)defaultValue(fields()["+field.pos()+"]))";
+      default:
+        return "defaultValue(fields()["+field.pos()+"])";
+      }
+    }
+    
+  }
+  
+  public static String generateAppropriateValue(Field field) {
+    switch (field.schema().getType()) {
+      case RECORD:
+        return field.schema().getName()+".newBuilder().build()";
+      case MAP:
+        return "new org.apache.gora.persistency.impl.DirtyMapWrapper(new java.util.HashMap())";
+      case ARRAY:
+        return "new org.apache.gora.persistency.impl.DirtyListWrapper(new java.util.ArrayList())";
+      default:
+        return "this."+field.name();
+    }
+  }
+
+  /** Recognizes camel case */
+  public static String toUpperCase(String s) {
+    StringBuilder builder = new StringBuilder();
+
+    for (int i = 0; i < s.length(); i++) {
+      if (i > 0) {
+        if (Character.isUpperCase(s.charAt(i))
+            && Character.isLowerCase(s.charAt(i - 1))
+            && Character.isLetter(s.charAt(i))) {
+          builder.append("_");
+        }
+      }
+      builder.append(Character.toUpperCase(s.charAt(i)));
+    }
+
+    return builder.toString();
+  }
+
+  private static int getNumberOfBytesNeededForDirtyBits(Schema originalSchema) {
+    return (int) Math.ceil((originalSchema.getFields().size() + 1) * 0.125);
+  }
+
+  public static String generateDirtyMethod(Schema schema, Field field) {
+    /*
+     * TODO: See AVRO-1127. This is dirty. We need to file a bug in avro to
+     * get them to open the API so other compilers can use their utility
+     * methods
+     */
+    String getMethod = generateGetMethod(schema, field);
+    String dirtyMethod = "is" + getMethod.substring(3) + "Dirty";
+    return dirtyMethod; 
+  }
+
+  public static String generateDefaultValueString(Schema schema, String fieldName) {
+    if (fieldName == "__g__dirty") {
+      return "java.nio.ByteBuffer.wrap(new byte["
+        + getNumberOfBytesNeededForDirtyBits(schema) + "])";
+    } else {
+      throw new IllegalArgumentException(fieldName
+        + " is not a gora managed field.");
+    }
+  }
+
+  public static boolean isNotHiddenField(String fieldName) {
+    return !GORA_HIDDEN_FIELD_NAMES.contains(fieldName);
+  }
+
+  GoraCompiler(Schema schema) {
+    super(schema);
+  }
+
+  private static Schema getSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+    switch (originalSchema.getType()) {
+      case RECORD:
+        if (queue.containsKey(originalSchema)) {
+          return queue.get(originalSchema);
+        }
+        return getRecordSchemaWithDirtySupport(originalSchema,queue);
+      case UNION:
+        return getUnionSchemaWithDirtySupport(originalSchema,queue);
+      case MAP:
+        return getMapSchemaWithDirtySupport(originalSchema,queue);
+      case ARRAY:
+        return getArraySchemaWithDirtySupport(originalSchema,queue);
+      default:
+        return originalSchema;
+    }
+  }
+  
+  private static Schema getArraySchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+    return Schema.createArray(getSchemaWithDirtySupport(originalSchema.getElementType(),queue));
+  }
+
+  private static Schema getMapSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+    return Schema.createMap(getSchemaWithDirtySupport(originalSchema.getValueType(),queue));
+  }
+
+  private static Schema getUnionSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+    List<Schema> schemaTypes = originalSchema.getTypes();
+    List<Schema> newTypeSchemas = new ArrayList<Schema>();
+    for (int i = 0; i < schemaTypes.size(); i++) {
+      Schema currentTypeSchema = schemaTypes.get(i);
+      newTypeSchemas.add(getSchemaWithDirtySupport(currentTypeSchema,queue));
+    }
+    return Schema.createUnion(newTypeSchemas);
+  }
+
+  private static Schema getRecordSchemaWithDirtySupport(Schema originalSchema, Map<Schema,Schema> queue) throws IOException {
+    if (originalSchema.getType() != Type.RECORD) {
+      throw new IOException("Gora only supports record schemas.");
+    }
+    List<Field> originalFields = originalSchema.getFields();
+    /* make sure the schema doesn't contain the field __g__dirty */
+    for (Field field : originalFields) {
+      if (GORA_RESERVED_NAMES.contains(field.name())) {
+        throw new IOException(
+          "Gora schemas cannot contain the field name " + field.name());
+      }
+    }
+    Schema newSchema = Schema.createRecord(originalSchema.getName(),
+    originalSchema.getDoc(), originalSchema.getNamespace(),
+    originalSchema.isError());
+    
+    queue.put(originalSchema, newSchema);
+    
+    List<Field> newFields = new ArrayList<Schema.Field>();
+    byte[] defaultDirtyBytesValue = new byte[getNumberOfBytesNeededForDirtyBits(originalSchema)];
+    Arrays.fill(defaultDirtyBytesValue, (byte) 0);
+    JsonNode defaultDirtyJsonValue = JsonNodeFactory.instance
+      .binaryNode(defaultDirtyBytesValue);
+    Field dirtyBits = new Field(DIRTY_BYTES_FIELD_NAME,
+      Schema.create(Type.BYTES),
+      "Bytes used to represent weather or not a field is dirty.",
+      defaultDirtyJsonValue);
+    newFields.add(dirtyBits);
+    for (Field originalField : originalFields) {
+      // recursively add dirty support
+      Field newField = new Field(originalField.name(),
+        getSchemaWithDirtySupport(originalField.schema(),queue),
+        originalField.doc(), originalField.defaultValue(),
+        originalField.order());
+      newFields.add(newField);
+    }
+    newSchema.setFields(newFields);
+    return newSchema;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm
----------------------------------------------------------------------
diff --git a/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm b/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm
new file mode 100644
index 0000000..0a73b80
--- /dev/null
+++ b/gora-compiler/src/main/velocity/org/apache/gora/compiler/templates/record.vm
@@ -0,0 +1,349 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##     http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+#if ($schema.getNamespace())
+package $schema.getNamespace();  
+#end
+@SuppressWarnings("all")
+#if ($schema.getDoc())
+/** $schema.getDoc() */
+#end
+public class ${this.mangle($schema.getName())}#if ($schema.isError()) extends org.apache.avro.specific.SpecificExceptionBase#else extends org.apache.gora.persistency.impl.PersistentBase#end implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("${this.javaEscape($schema.toString())}");
+
+  /** Enum containing all data bean's fields. */
+  public static enum Field {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+    ${this.toUpperCase($field.name())}($i, "${this.mangle($field.name(), $schema.isError())}"),
+#set ($i = $i + 1)
+#end
+    ;
+    /**
+     * Field's index.
+     */
+    private int index;
+
+    /**
+     * Field's name.
+     */
+    private String name;
+
+    /**
+     * Field's constructor
+     * @param index field's index.
+     * @param name field's name.
+     */
+    Field(int index, String name) {this.index=index;this.name=name;}
+
+    /**
+     * Gets field's index.
+     * @return int field's index.
+     */
+    public int getIndex() {return index;}
+
+    /**
+     * Gets field's name.
+     * @return String field's name.
+     */
+    public String getName() {return name;}
+
+    /**
+     * Gets field's attributes to string.
+     * @return String field's attributes to string.
+     */
+    public String toString() {return name;}
+  };
+
+  public static final String[] _ALL_FIELDS = {
+#foreach ($field in $schema.getFields())
+  "${this.mangle($field.name(), $schema.isError())}",
+#end
+  };
+
+#foreach ($field in $schema.getFields())
+#if ($field.doc())
+  /** $field.doc() */
+#end
+  private ${this.javaUnbox($field.schema())} ${this.mangle($field.name(), $schema.isError())}#if(! $this.isNotHiddenField($field.name()) ) = ${this.generateDefaultValueString($schema,$field.name())}#end;
+#end
+#if ($schema.isError())
+
+  public ${this.mangle($schema.getName())}() {
+    super();
+  }
+  
+  public ${this.mangle($schema.getName())}(Object value) {
+    super(value);
+  }
+
+  public ${this.mangle($schema.getName())}(Throwable cause) {
+    super(cause);
+  }
+
+  public ${this.mangle($schema.getName())}(Object value, Throwable cause) {
+    super(value, cause);
+  }
+
+#end
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+    case $i: return ${this.mangle($field.name(), $schema.isError())};
+#set ($i = $i + 1)
+#end
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value) {
+    switch (field$) {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+    case $i: ${this.mangle($field.name(), $schema.isError())} = (${this.javaType($field.schema())})(${this.generateAppropriateWrapperOrValueForPut($field.schema())}); break;
+#set ($i = $i + 1)
+#end
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+#foreach ($field in $schema.getFields())
+#if ($this.isNotHiddenField($field.name()))
+  /**
+   * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())   * $field.doc()#end
+   */
+  public ${this.javaType($field.schema())} ${this.generateGetMethod($schema, $field)}() {
+    return ${this.mangle($field.name(), $schema.isError())}${this.generateAppropriateImmutabilityModifier($field.schema())};
+  }
+
+  /**
+   * Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())   * $field.doc()#end
+   * @param value the value to set.
+   */
+  public void ${this.generateSetMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+    this.${this.mangle($field.name(), $schema.isError())} = ${this.generateAppropriateWrapperOrValue($field.schema())};
+    setDirty(${field.pos()});
+  }
+  
+  /**
+   * Checks the dirty status of the '${this.mangle($field.name(), $schema.isError())}' field. A field is dirty if it represents a change that has not yet been written to the database.
+#if ($field.doc())   * $field.doc()#end
+   * @param value the value to set.
+   */
+  public boolean ${this.generateDirtyMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+    return isDirty(${field.pos()});
+  }
+
+#end
+#end
+  /** Creates a new ${this.mangle($schema.getName())} RecordBuilder */
+  public static #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder newBuilder() {
+    return new #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder();
+  }
+  
+  /** Creates a new ${this.mangle($schema.getName())} RecordBuilder by copying an existing Builder */
+  public static #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder newBuilder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder other) {
+    return new #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder(other);
+  }
+  
+  /** Creates a new ${this.mangle($schema.getName())} RecordBuilder by copying an existing $this.mangle($schema.getName()) instance */
+  public static #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder newBuilder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())} other) {
+    return new #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder(other);
+  }
+  
+  private static java.nio.ByteBuffer deepCopyToWriteOnlyBuffer(
+      java.nio.ByteBuffer input) {
+    java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity());
+    int position = input.position();
+    input.reset();
+    int mark = input.position();
+    int limit = input.limit();
+    input.rewind();
+    input.limit(input.capacity());
+    copy.put(input);
+    input.rewind();
+    copy.rewind();
+    input.position(mark);
+    input.mark();
+    copy.position(mark);
+    copy.mark();
+    input.position(position);
+    copy.position(position);
+    input.limit(limit);
+    copy.limit(limit);
+    return copy.asReadOnlyBuffer();
+  }
+  
+  /**
+   * RecordBuilder for ${this.mangle($schema.getName())} instances.
+   */
+  public static class Builder extends#if ($schema.isError()) org.apache.avro.specific.SpecificErrorBuilderBase<${this.mangle($schema.getName())}>#else org.apache.avro.specific.SpecificRecordBuilderBase<${this.mangle($schema.getName())}>#end
+
+    implements#if ($schema.isError()) org.apache.avro.data.ErrorBuilder<${this.mangle($schema.getName())}>#else org.apache.avro.data.RecordBuilder<${this.mangle($schema.getName())}>#end {
+
+#foreach ($field in $schema.getFields())
+    private ${this.javaUnbox($field.schema())} ${this.mangle($field.name(), $schema.isError())};
+#end
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder other) {
+      super(other);
+    }
+    
+    /** Creates a Builder by copying an existing $this.mangle($schema.getName()) instance */
+    private Builder(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())} other) {
+      #if ($schema.isError())super(other)#else
+      super(#if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.SCHEMA$)#end;
+#foreach ($field in $schema.getFields())
+      if (isValidValue(fields()[$field.pos()], other.${this.mangle($field.name(), $schema.isError())})) {
+        this.${this.mangle($field.name(), $schema.isError())} = (${this.javaType($field.schema())}) data().deepCopy(fields()[$field.pos()].schema(), other.${this.mangle($field.name(), $schema.isError())});
+        fieldSetFlags()[$field.pos()] = true;
+      }
+#end
+    }
+#if ($schema.isError())
+
+    @Override
+    public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder setValue(Object value) {
+      super.setValue(value);
+      return this;
+    }
+    
+    @Override
+    public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder clearValue() {
+      super.clearValue();
+      return this;
+    }
+
+    @Override
+    public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder setCause(Throwable cause) {
+      super.setCause(cause);
+      return this;
+    }
+    
+    @Override
+    public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder clearCause() {
+      super.clearCause();
+      return this;
+    }
+#end
+
+#foreach ($field in $schema.getFields())
+#if ($this.isNotHiddenField($field.name()))
+    /** Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field */
+    public ${this.javaType($field.schema())} ${this.generateGetMethod($schema, $field)}() {
+      return ${this.mangle($field.name(), $schema.isError())};
+    }
+    
+    /** Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field */
+    public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder ${this.generateSetMethod($schema, $field)}(${this.javaUnbox($field.schema())} value) {
+      validate(fields()[$field.pos()], value);
+      this.${this.mangle($field.name(), $schema.isError())} = value;
+      fieldSetFlags()[$field.pos()] = true;
+      return this; 
+    }
+    
+    /** Checks whether the '${this.mangle($field.name(), $schema.isError())}' field has been set */
+    public boolean ${this.generateHasMethod($schema, $field)}() {
+      return fieldSetFlags()[$field.pos()];
+    }
+    
+    /** Clears the value of the '${this.mangle($field.name(), $schema.isError())}' field */
+    public #if ($schema.getNamespace())$schema.getNamespace().#end${this.mangle($schema.getName())}.Builder ${this.generateClearMethod($schema, $field)}() {
+#if (${this.isUnboxedJavaTypeNullable($field.schema())})
+      ${this.mangle($field.name(), $schema.isError())} = null;
+#end
+      fieldSetFlags()[$field.pos()] = false;
+      return this;
+    }
+    
+#end
+#end
+    @Override
+    public ${this.mangle($schema.getName())} build() {
+      try {
+        ${this.mangle($schema.getName())} record = new ${this.mangle($schema.getName())}(#if ($schema.isError())getValue(), getCause()#end);
+#foreach ($field in $schema.getFields())
+        record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : (${this.javaType($field.schema())}) ${this.generateAppropriateWrapper($schema,$field)};
+#end
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+  
+  public ${this.mangle($schema.getName())}.Tombstone getTombstone(){
+  	return TOMBSTONE;
+  }
+
+  public ${this.mangle($schema.getName())} newInstance(){
+    return newBuilder().build();
+  }
+
+  private static final Tombstone TOMBSTONE = new Tombstone();
+  
+  public static final class Tombstone extends ${this.mangle($schema.getName())} implements org.apache.gora.persistency.Tombstone {
+  
+      private Tombstone() { }
+  
+	  #foreach ($field in $schema.getFields())
+	#if ($this.isNotHiddenField($field.name()))
+	  /**
+	   * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+	#if ($field.doc())   * $field.doc()#end
+	   */
+	  public ${this.javaType($field.schema())} ${this.generateGetMethod($schema, $field)}() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+	#if ($field.doc())   * $field.doc()#end
+	   * @param value the value to set.
+	   */
+	  public void ${this.generateSetMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the '${this.mangle($field.name(), $schema.isError())}' field. A field is dirty if it represents a change that has not yet been written to the database.
+	#if ($field.doc())   * $field.doc()#end
+	   * @param value the value to set.
+	   */
+	  public boolean ${this.generateDirtyMethod($schema, $field)}(${this.javaType($field.schema())} value) {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+	#end
+	#end
+  
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index fd2144d..85957c2 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -99,9 +99,9 @@
                 <version>${build-helper-maven-plugin.version}</version>
                 <executions>
                     <execution>
-                        <phase>generate-sources</phase>
+                        <phase>generate-test-sources</phase>
                         <goals>
-                            <goal>add-source</goal>
+                            <goal>add-test-source</goal>
                         </goals>
                         <configuration>
                             <sources>
@@ -127,14 +127,24 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
         </dependency>
+        
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-mapred</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>
+        
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
 
         <!-- Logging Dependencies -->
         <dependency>
@@ -158,7 +168,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-core/src/examples/avro/employee.json
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/avro/employee.json b/gora-core/src/examples/avro/employee.json
index e398620..cfbbb26 100644
--- a/gora-core/src/examples/avro/employee.json
+++ b/gora-core/src/examples/avro/employee.json
@@ -1,30 +1,31 @@
   {
     "type": "record",
-    "name": "Employee",
+    "name": "Employee","default":null,
     "namespace": "org.apache.gora.examples.generated",
     "fields" : [
-      {"name": "name", "type": "string"},
-      {"name": "dateOfBirth", "type": "long"},
-      {"name": "ssn", "type": "string"},
-      {"name": "salary", "type": "int"},
-      {"name": "boss", "type":["null","Employee","string"]},
-      {"name": "webpage", "type":["null",
+      {"name": "name", "type": ["null","string"],"default":null},
+      {"name": "dateOfBirth", "type": "long","default":0},
+      {"name": "ssn", "type": "string", "default":""},
+      {"name": "salary", "type": "int","default":0},
+      {"name": "boss", "type":["null","Employee","string"],"default":null},
+      {"name": "webpage","default":null, "type":["null",
         {
       	  "type": "record",
       	  "name": "WebPage",
       	  "namespace": "org.apache.gora.examples.generated",
           "fields" : [
-           {"name": "url", "type": "string"},
-           {"name": "content", "type": ["null","bytes"]},
-           {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
-           {"name": "outlinks", "type": {"type":"map", "values":"string"}},
-           {"name": "metadata", "type": {
+           {"name": "url", "type": ["null","string"], "default":null},
+           {"name": "content", "type": ["null","bytes"],"default":null},
+           {"name": "parsedContent", "type": {"type":"array", "items": "string"},"default":{}},
+           {"name": "outlinks", "type": {"type":"map", "values":["null", "string"]},"default":{}},
+           {"name": "headers", "type": ["null", {"type": "map", "values": ["null", "string"]}],"default":null},
+           {"name": "metadata", "default":null, "type": {
             "name": "Metadata",
             "type": "record",
             "namespace": "org.apache.gora.examples.generated",
             "fields": [
-             {"name": "version", "type": "int"},
-             {"name": "data", "type": {"type": "map", "values": "string"}}
+             {"name": "version", "type": "int","default":0},
+             {"name": "data", "type": {"type": "map", "values": "string"},"default":{}}
             ]
           }}
           ]

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-core/src/examples/avro/immutable_fields.json
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/avro/immutable_fields.json b/gora-core/src/examples/avro/immutable_fields.json
new file mode 100644
index 0000000..76e0bcf
--- /dev/null
+++ b/gora-core/src/examples/avro/immutable_fields.json
@@ -0,0 +1,18 @@
+{
+  "type": "record",
+  "name": "ImmutableFields",
+  "doc": "Record with only immutable or dirtyable fields, used for testing",
+  "namespace": "org.apache.gora.examples.generated",
+  "fields" : [
+    {"name": "v1", "type": "int","default":0},
+    {"name": "v2", "type": [{
+      "name": "V2",
+      "type": "record",
+      "namespace": "org.apache.gora.examples.generated",
+      "fields": [
+        {"name": "v3", "type": "int","default":0}
+      ]
+    },"null"], "default":null
+    }
+  ]
+}