You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:36:29 UTC

[12/50] [abbrv] GORA-321. Merge GORA_94 into Gora trunk

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
index b03afce..e69de29 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import me.prettyprint.cassandra.serializers.AbstractSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.hector.api.Serializer;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.ListGenericArray;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro.
- */
-public class GenericArraySerializer<T> extends AbstractSerializer<GenericArray<T>> {
-
-  public static final Logger LOG = LoggerFactory.getLogger(GenericArraySerializer.class);
-
-  private static Map<Type, GenericArraySerializer> elementTypeToSerializerMap = new HashMap<Type, GenericArraySerializer>();
-  private static Map<Class, GenericArraySerializer> fixedClassToSerializerMap = new HashMap<Class, GenericArraySerializer>();
-
-  public static GenericArraySerializer get(Type elementType) {
-    GenericArraySerializer serializer = elementTypeToSerializerMap.get(elementType);
-    if (serializer == null) {
-      serializer = new GenericArraySerializer(elementType);
-      elementTypeToSerializerMap.put(elementType, serializer);
-    }
-    return serializer;
-  }
-
-  public static GenericArraySerializer get(Type elementType, Class clazz) {
-    if (elementType != Type.FIXED) {
-      return null;
-    }
-    GenericArraySerializer serializer = elementTypeToSerializerMap.get(clazz);
-    if (serializer == null) {
-      serializer = new GenericArraySerializer(clazz);
-      fixedClassToSerializerMap.put(clazz, serializer);
-    }
-    return serializer;
-  }
-
-  public static GenericArraySerializer get(Schema elementSchema) {
-    Type type = elementSchema.getType();
-    if (type == Type.FIXED) {
-      return get(Type.FIXED, TypeUtils.getClass(elementSchema));
-    } else {
-      return get(type);
-    }
-  }
-
-  private Schema elementSchema = null;
-  private Type elementType = null;
-  private int size = -1;
-  private Class<T> clazz = null;
-  private Serializer<T> elementSerializer = null;
-
-  public GenericArraySerializer(Serializer<T> elementSerializer) {
-    this.elementSerializer = elementSerializer;
-  }
-
-  public GenericArraySerializer(Schema elementSchema) {
-    this.elementSchema = elementSchema;
-    elementType = elementSchema.getType();
-    size = TypeUtils.getFixedSize(elementSchema);
-    elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema);
-  }
-
-  public GenericArraySerializer(Type elementType) {
-    this.elementType = elementType;
-    if (elementType != Type.FIXED) {
-      elementSchema = Schema.create(elementType);
-    }
-    clazz = TypeUtils.getClass(elementType);
-    size = TypeUtils.getFixedSize(elementType);
-    elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
-  }
-
-  public GenericArraySerializer(Class<T> clazz) {
-    this.clazz = clazz;
-    elementType = TypeUtils.getType(clazz);
-    size = TypeUtils.getFixedSize(clazz);
-    if (elementType == null || elementType == Type.FIXED) {
-      elementType = Type.FIXED;
-      elementSchema = TypeUtils.getSchema(clazz);
-      elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz);
-    } else {
-      elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
-    }
-  }
-
-  @Override
-  public ByteBuffer toByteBuffer(GenericArray<T> array) {
-    if (array == null) {
-      return null;
-    }
-    if (size > 0) {
-      return toByteBufferWithFixedLengthElements(array);
-    } else {
-      return toByteBufferWithVariableLengthElements(array);
-    }
-  }
-
-  private ByteBuffer toByteBufferWithFixedLengthElements(GenericArray<T> array) {
-    ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size);
-    for (T element : array) {
-      byteBuffer.put(elementSerializer.toByteBuffer(element));
-    }
-    byteBuffer.rewind();
-    return byteBuffer;
-  }
-
-  private ByteBuffer toByteBufferWithVariableLengthElements(GenericArray<T> array) {
-    int n = (int) array.size();
-    List<byte[]> list = new ArrayList<byte[]>(n);
-    n *= 4;
-    for (T element : array) {
-      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element));
-      list.add(bytes);
-      n += bytes.length;
-    }
-    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
-    for (byte[] bytes : list) {
-      byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
-      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
-    }
-    byteBuffer.rewind();
-    return byteBuffer;
-  }
-
-  @Override
-  public GenericArray<T> fromByteBuffer(ByteBuffer byteBuffer) {
-    if (byteBuffer == null) {
-      return null;
-    }
-    GenericArray<T> array = new ListGenericArray<T>(elementSchema);
-int i = 0;
-    while (true) {
-      T element = null;
-      try {
-        if (size > 0) {
-          element = elementSerializer.fromByteBuffer(byteBuffer);
-        }
-        else {
-          int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
-          byte[] bytes = new byte[n];
-          byteBuffer.get(bytes, 0, n);
-          element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
-        }
-      } catch (BufferUnderflowException e) {
-        break;
-      }
-      if (element == null) {
-        break;
-      }
-      array.add(element);
-    }
-    return array;
-  }
-
-  @Override
-  public ComparatorType getComparatorType() {
-    return elementSerializer.getComparatorType();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
index 15ebf23..c95d51c 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
@@ -19,16 +19,18 @@
 package org.apache.gora.cassandra.serializers;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
 import me.prettyprint.cassandra.serializers.BooleanSerializer;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
 import me.prettyprint.cassandra.serializers.DoubleSerializer;
 import me.prettyprint.cassandra.serializers.FloatSerializer;
 import me.prettyprint.cassandra.serializers.IntegerSerializer;
 import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.ObjectSerializer;
 import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.Serializer;
 
 import org.apache.avro.Schema;
@@ -36,9 +38,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
-
-import org.apache.gora.persistency.StatefulHashMap;
-
+import org.apache.gora.persistency.Persistent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +56,7 @@ public class GoraSerializerTypeInferer {
     if (value == null) {
       serializer = ByteBufferSerializer.get();
     } else if (value instanceof Utf8) {
-      serializer = Utf8Serializer.get();
+      serializer = CharSequenceSerializer.get();
     } else if (value instanceof Boolean) {
       serializer = BooleanSerializer.get();
     } else if (value instanceof ByteBuffer) {
@@ -80,18 +80,21 @@ public class GoraSerializerTypeInferer {
       if (schema.getType() == Type.ARRAY) {
         schema = schema.getElementType();
       }
-      serializer = GenericArraySerializer.get(schema);
-    } else if (value instanceof StatefulHashMap) {
-      StatefulHashMap map = (StatefulHashMap)value;
+      serializer = ListSerializer.get(schema);
+    } else if (value instanceof Map) {
+      Map map = (Map)value;
       if (map.size() == 0) {
         serializer = ByteBufferSerializer.get();
       }
       else {
         Object value0 = map.values().iterator().next();
         Schema schema = TypeUtils.getSchema(value0);
-        serializer = StatefulHashMapSerializer.get(schema);
+        serializer = MapSerializer.get(schema);
       }
-    } else {
+    } else if (value instanceof Persistent){
+      serializer = ObjectSerializer.get();
+    }
+    else {
       serializer = SerializerTypeInferer.getSerializer(value);
     }
     return serializer;
@@ -101,7 +104,7 @@ public class GoraSerializerTypeInferer {
   public static <T> Serializer<T> getSerializer(Class<?> valueClass) {
     Serializer serializer = null;
     if (valueClass.equals(Utf8.class)) {
-      serializer = Utf8Serializer.get();
+      serializer = CharSequenceSerializer.get();
     } else if (valueClass.equals(Boolean.class) || valueClass.equals(boolean.class)) {
       serializer = BooleanSerializer.get();
     } else if (valueClass.equals(ByteBuffer.class)) {
@@ -126,30 +129,32 @@ public class GoraSerializerTypeInferer {
   public static <T> Serializer<T> getSerializer(Schema schema) {
     Serializer serializer = null;
     Type type = schema.getType();
-    if (type == Type.STRING) {
-      serializer = Utf8Serializer.get();
-    } else if (type == Type.BOOLEAN) {
+    if (type.equals(Type.STRING)) {
+      serializer = CharSequenceSerializer.get();
+    } else if (type.equals(Type.BOOLEAN)) {
       serializer = BooleanSerializer.get();
-    } else if (type == Type.BYTES) {
+    } else if (type.equals(Type.BYTES)) {
       serializer = ByteBufferSerializer.get();
-    } else if (type == Type.DOUBLE) {
+    } else if (type.equals(Type.DOUBLE)) {
       serializer = DoubleSerializer.get();
-    } else if (type == Type.FLOAT) {
+    } else if (type.equals(Type.FLOAT)) {
       serializer = FloatSerializer.get();
-    } else if (type == Type.INT) {
+    } else if (type.equals(Type.INT)) {
       serializer = IntegerSerializer.get();
-    } else if (type == Type.LONG) {
+    } else if (type.equals(Type.LONG)) {
       serializer = LongSerializer.get();
-    } else if (type == Type.FIXED) {
+    } else if (type.equals(Type.FIXED)) {
       Class clazz = TypeUtils.getClass(schema);
       serializer = SpecificFixedSerializer.get(clazz);
       // serializer = SpecificFixedSerializer.get(schema);
-    } else if (type == Type.ARRAY) {
-      serializer = GenericArraySerializer.get(schema.getElementType());
-    } else if (type == Type.MAP) {
-      serializer = StatefulHashMapSerializer.get(schema.getValueType());
-    } else if (type == Type.UNION){
+    } else if (type.equals(Type.ARRAY)) {
+      serializer = ListSerializer.get(schema.getElementType());
+    } else if (type.equals(Type.MAP)) {
+    	serializer = MapSerializer.get(schema.getValueType());
+    } else if (type.equals(Type.UNION)){
       serializer = ByteBufferSerializer.get();
+    } else if (type.equals(Type.RECORD)){
+      serializer = BytesArraySerializer.get();
     } else {
       serializer = null;
     }
@@ -160,7 +165,7 @@ public class GoraSerializerTypeInferer {
   public static <T> Serializer<T> getSerializer(Type type) {
     Serializer serializer = null;
     if (type == Type.STRING) {
-      serializer = Utf8Serializer.get();
+      serializer = CharSequenceSerializer.get();
     } else if (type == Type.BOOLEAN) {
       serializer = BooleanSerializer.get();
     } else if (type == Type.BYTES) {
@@ -197,9 +202,9 @@ public class GoraSerializerTypeInferer {
     }
 
     if (type == Type.ARRAY) {
-      serializer = GenericArraySerializer.get(elementType);
+      serializer = ListSerializer.get(elementType);
     } else if (type == Type.MAP) {
-      serializer = StatefulHashMapSerializer.get(elementType);
+      serializer = MapSerializer.get(elementType);
     } else {
       serializer = null;
     }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java
new file mode 100644
index 0000000..5745be9
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro.
+ */
+public class ListSerializer<T> extends AbstractSerializer<List<T>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ListSerializer.class);
+
+  private static Map<Type, ListSerializer> elementTypeToSerializerMap = new HashMap<Type, ListSerializer>();
+  private static Map<Class, ListSerializer> fixedClassToSerializerMap = new HashMap<Class, ListSerializer>();
+
+  public static ListSerializer get(Type elementType) {
+    ListSerializer serializer = elementTypeToSerializerMap.get(elementType);
+    if (serializer == null) {
+      serializer = new ListSerializer(elementType);
+      elementTypeToSerializerMap.put(elementType, serializer);
+    }
+    return serializer;
+  }
+
+  public static ListSerializer get(Type elementType, Class clazz) {
+    if (elementType != Type.FIXED) {
+      return null;
+    }
+    ListSerializer serializer = elementTypeToSerializerMap.get(clazz);
+    if (serializer == null) {
+      serializer = new ListSerializer(clazz);
+      fixedClassToSerializerMap.put(clazz, serializer);
+    }
+    return serializer;
+  }
+
+  public static ListSerializer get(Schema elementSchema) {
+    Type type = elementSchema.getType();
+    if (type == Type.FIXED) {
+      return get(Type.FIXED, TypeUtils.getClass(elementSchema));
+    } else {
+      return get(type);
+    }
+  }
+
+  private Schema elementSchema = null;
+  private Type elementType = null;
+  private int size = -1;
+  private Class<T> clazz = null;
+  private Serializer<T> elementSerializer = null;
+
+  public ListSerializer(Serializer<T> elementSerializer) {
+    this.elementSerializer = elementSerializer;
+  }
+
+  public ListSerializer(Schema elementSchema) {
+    this.elementSchema = elementSchema;
+    elementType = elementSchema.getType();
+    size = TypeUtils.getFixedSize(elementSchema);
+    elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public ListSerializer(Type elementType) {
+    this.elementType = elementType;
+    if (elementType != Type.FIXED) {
+      elementSchema = Schema.create(elementType);
+    }
+    clazz = (Class<T>) TypeUtils.getClass(elementType);
+    size = TypeUtils.getFixedSize(elementType);
+    elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+  }
+
+  public ListSerializer(Class<T> clazz) {
+    this.clazz = clazz;
+    elementType = TypeUtils.getType(clazz);
+    size = TypeUtils.getFixedSize(clazz);
+    if (elementType == null || elementType == Type.FIXED) {
+      elementType = Type.FIXED;
+      elementSchema = TypeUtils.getSchema(clazz);
+      elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz);
+    } else {
+      elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+    }
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(List<T> array) {
+    if (array == null) {
+      return null;
+    }
+    if (size > 0) {
+      return toByteBufferWithFixedLengthElements(array);
+    } else {
+      return toByteBufferWithVariableLengthElements(array);
+    }
+  }
+
+  private ByteBuffer toByteBufferWithFixedLengthElements(List<T> array) {
+    ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size);
+    for (T element : array) {
+      byteBuffer.put(elementSerializer.toByteBuffer(element));
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  private ByteBuffer toByteBufferWithVariableLengthElements(List<T> array) {
+    int n = (int) array.size();
+    List<byte[]> list = new ArrayList<byte[]>(n);
+    n *= 4;
+    for (T element : array) {
+      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element));
+      list.add(bytes);
+      n += bytes.length;
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+    for (byte[] bytes : list) {
+      byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  @Override
+  public List<T> fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+    ArrayList<T> array = new ArrayList<T>();
+    while (true) {
+      T element = null;
+      try {
+        if (size > 0) {
+          element = elementSerializer.fromByteBuffer(byteBuffer);
+        }
+        else {
+          int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+          byte[] bytes = new byte[n];
+          byteBuffer.get(bytes, 0, n);
+          element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+        }
+      } catch (BufferUnderflowException e) {
+        break;
+      }
+      if (element == null) {
+        break;
+      }
+      array.add(element);
+    }
+    return array;
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return elementSerializer.getComparatorType();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java
new file mode 100644
index 0000000..59af1d1
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A MapSerializer translates the byte[] to and from Map of Avro.
+ */
+public class MapSerializer<T> extends AbstractSerializer<Map<CharSequence, T>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(MapSerializer.class);
+
+  private static Map<Type, MapSerializer> valueTypeToSerializerMap = new HashMap<Type, MapSerializer>();
+  private static Map<Class, MapSerializer> fixedClassToSerializerMap = new HashMap<Class, MapSerializer>();
+
+  public static MapSerializer get(Type valueType) {
+    MapSerializer serializer = valueTypeToSerializerMap.get(valueType);
+    if (serializer == null) {
+      serializer = new MapSerializer(valueType);
+      valueTypeToSerializerMap.put(valueType, serializer);
+    }
+    return serializer;
+  }
+
+  public static MapSerializer get(Type valueType, Class clazz) {
+    if (valueType != Type.FIXED) {
+      return null;
+    }
+    MapSerializer serializer = valueTypeToSerializerMap.get(clazz);
+    if (serializer == null) {
+      serializer = new MapSerializer(clazz);
+      fixedClassToSerializerMap.put(clazz, serializer);
+    }
+    return serializer;
+  }
+
+  public static MapSerializer get(Schema valueSchema) {
+    Type type = valueSchema.getType();
+    if (type == Type.FIXED) {
+      return get(Type.FIXED, TypeUtils.getClass(valueSchema));
+    } else {
+      return get(type);
+    }
+  }
+
+  private Schema valueSchema = null;
+  private Type valueType = null;
+  private int size = -1;
+  private Class<T> clazz = null;
+  private Serializer<T> valueSerializer = null;
+
+  public MapSerializer(Serializer<T> valueSerializer) {
+    this.valueSerializer = valueSerializer;
+  }
+
+  public MapSerializer(Schema valueSchema) {
+    this.valueSchema = valueSchema;
+    valueType = valueSchema.getType();
+    size = TypeUtils.getFixedSize(valueSchema);
+    valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public MapSerializer(Type valueType) {
+    this.valueType = valueType;
+    if (valueType != Type.FIXED) {
+      valueSchema = Schema.create(valueType);
+    }
+    clazz = (Class<T>) TypeUtils.getClass(valueType);
+    size = TypeUtils.getFixedSize(valueType);
+    valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+  }
+
+  public MapSerializer(Class<T> clazz) {
+    this.clazz = clazz;
+    valueType = TypeUtils.getType(clazz);
+    size = TypeUtils.getFixedSize(clazz);
+    if (valueType == null || valueType == Type.FIXED) {
+      valueType = Type.FIXED;
+      valueSchema = TypeUtils.getSchema(clazz);
+      valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz);
+    } else {
+      valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+    }
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(Map<CharSequence, T> map) {
+    if (map == null) {
+      return null;
+    }
+    if (size > 0) {
+      return toByteBufferWithFixedLengthElements(map);
+    } else {
+      return toByteBufferWithVariableLengthElements(map);
+    }
+  }
+
+  private ByteBuffer toByteBufferWithFixedLengthElements(Map<CharSequence, T> map) {
+    int n = (int) map.size();
+    List<byte[]> list = new ArrayList<byte[]>(n);
+    n *= 4;
+    for (CharSequence key : map.keySet()) {
+      T value = map.get(key);
+      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(CharSequenceSerializer.get().toByteBuffer(key));
+      list.add(bytes);
+      n += bytes.length;
+      bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+      list.add(bytes);
+      n += bytes.length;
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+    int i = 0;
+    for (byte[] bytes : list) {
+      if (i % 2 == 0) {
+        byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+      }
+      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+      i += 1;
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  private ByteBuffer toByteBufferWithVariableLengthElements(Map<CharSequence, T> map) {
+    int n = (int) map.size();
+    List<byte[]> list = new ArrayList<byte[]>(n);
+    n *= 8;
+    for (CharSequence key : map.keySet()) {
+      T value = map.get(key);
+      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(CharSequenceSerializer.get().toByteBuffer(key));
+      list.add(bytes);
+      n += bytes.length;
+      bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+      list.add(bytes);
+      n += bytes.length;
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+    for (byte[] bytes : list) {
+      byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  @Override
+  public Map<CharSequence, T> fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+    Map<CharSequence, T> map = new HashMap<CharSequence, T>();
+    while (true) {
+      CharSequence key = null;
+      T value = null;
+      try {
+        int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+        byte[] bytes = new byte[n];
+        byteBuffer.get(bytes, 0, n);
+        key = CharSequenceSerializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+
+        if (size > 0) {
+          value = valueSerializer.fromByteBuffer(byteBuffer);
+        }
+        else {
+          n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+          bytes = new byte[n];
+          byteBuffer.get(bytes, 0, n);
+          value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+        }
+      } catch (BufferUnderflowException e) {
+        break;
+      }
+      if (key == null) {
+        break;
+      }
+      if (value == null) {
+        break;
+      }
+      map.put(key, value);
+    }
+    return map;
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return valueSerializer.getComparatorType();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
index b981fbf..2232c08 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
@@ -25,15 +25,10 @@ import java.util.Map;
 
 import me.prettyprint.cassandra.serializers.AbstractSerializer;
 import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.hector.api.Serializer;
 import me.prettyprint.hector.api.ddl.ComparatorType;
 import static me.prettyprint.hector.api.ddl.ComparatorType.BYTESTYPE;
 
 import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.util.Utf8;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
index 4922220..e69de29 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import me.prettyprint.cassandra.serializers.AbstractSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.hector.api.Serializer;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StatefulHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StatefulHashMapSerializer translates the byte[] to and from StatefulHashMap of Avro.
- */
-public class StatefulHashMapSerializer<T> extends AbstractSerializer<StatefulHashMap<Utf8, T>> {
-
-  public static final Logger LOG = LoggerFactory.getLogger(StatefulHashMapSerializer.class);
-
-  private static Map<Type, StatefulHashMapSerializer> valueTypeToSerializerMap = new HashMap<Type, StatefulHashMapSerializer>();
-  private static Map<Class, StatefulHashMapSerializer> fixedClassToSerializerMap = new HashMap<Class, StatefulHashMapSerializer>();
-
-  public static StatefulHashMapSerializer get(Type valueType) {
-    StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(valueType);
-    if (serializer == null) {
-      serializer = new StatefulHashMapSerializer(valueType);
-      valueTypeToSerializerMap.put(valueType, serializer);
-    }
-    return serializer;
-  }
-
-  public static StatefulHashMapSerializer get(Type valueType, Class clazz) {
-    if (valueType != Type.FIXED) {
-      return null;
-    }
-    StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(clazz);
-    if (serializer == null) {
-      serializer = new StatefulHashMapSerializer(clazz);
-      fixedClassToSerializerMap.put(clazz, serializer);
-    }
-    return serializer;
-  }
-
-  public static StatefulHashMapSerializer get(Schema valueSchema) {
-    Type type = valueSchema.getType();
-    if (type == Type.FIXED) {
-      return get(Type.FIXED, TypeUtils.getClass(valueSchema));
-    } else {
-      return get(type);
-    }
-  }
-
-  private Schema valueSchema = null;
-  private Type valueType = null;
-  private int size = -1;
-  private Class<T> clazz = null;
-  private Serializer<T> valueSerializer = null;
-
-  public StatefulHashMapSerializer(Serializer<T> valueSerializer) {
-    this.valueSerializer = valueSerializer;
-  }
-
-  public StatefulHashMapSerializer(Schema valueSchema) {
-    this.valueSchema = valueSchema;
-    valueType = valueSchema.getType();
-    size = TypeUtils.getFixedSize(valueSchema);
-    valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema);
-  }
-
-  public StatefulHashMapSerializer(Type valueType) {
-    this.valueType = valueType;
-    if (valueType != Type.FIXED) {
-      valueSchema = Schema.create(valueType);
-    }
-    clazz = TypeUtils.getClass(valueType);
-    size = TypeUtils.getFixedSize(valueType);
-    valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
-  }
-
-  public StatefulHashMapSerializer(Class<T> clazz) {
-    this.clazz = clazz;
-    valueType = TypeUtils.getType(clazz);
-    size = TypeUtils.getFixedSize(clazz);
-    if (valueType == null || valueType == Type.FIXED) {
-      valueType = Type.FIXED;
-      valueSchema = TypeUtils.getSchema(clazz);
-      valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz);
-    } else {
-      valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
-    }
-  }
-
-  @Override
-  public ByteBuffer toByteBuffer(StatefulHashMap<Utf8, T> map) {
-    if (map == null) {
-      return null;
-    }
-    if (size > 0) {
-      return toByteBufferWithFixedLengthElements(map);
-    } else {
-      return toByteBufferWithVariableLengthElements(map);
-    }
-  }
-
-  private ByteBuffer toByteBufferWithFixedLengthElements(StatefulHashMap<Utf8, T> map) {
-    List<byte[]> list = new ArrayList<byte[]>(map.size());
-    int n = 0;
-    for (Utf8 key : map.keySet()) {
-      if (map.getState(key) == State.DELETED) {
-        continue;
-      }
-      T value = map.get(key);
-      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
-      list.add(bytes);
-      n += 4;
-      n += bytes.length;
-      bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
-      list.add(bytes);
-      n += bytes.length;
-    }
-    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
-    int i = 0;
-    for (byte[] bytes : list) {
-      if (i % 2 == 0) {
-        byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
-      }
-      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
-      i += 1;
-    }
-    byteBuffer.rewind();
-    return byteBuffer;
-  }
-
-  private ByteBuffer toByteBufferWithVariableLengthElements(StatefulHashMap<Utf8, T> map) {
-    List<byte[]> list = new ArrayList<byte[]>(map.size());
-    int n = 0;
-    for (Utf8 key : map.keySet()) {
-      if (map.getState(key) == State.DELETED) {
-        continue;
-      }
-      T value = map.get(key);
-      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
-      list.add(bytes);
-      n += 4;
-      n += bytes.length;
-      bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
-      list.add(bytes);
-      n += 4;
-      n += bytes.length;
-    }
-    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
-    for (byte[] bytes : list) {
-      byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
-      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
-    }
-    byteBuffer.rewind();
-    return byteBuffer;
-  }
-
-  @Override
-  public StatefulHashMap<Utf8, T> fromByteBuffer(ByteBuffer byteBuffer) {
-    if (byteBuffer == null) {
-      return null;
-    }
-    StatefulHashMap<Utf8, T> map = new StatefulHashMap<Utf8, T>();
-int i = 0;
-    while (true) {
-      Utf8 key = null;
-      T value = null;
-      try {
-        int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
-        byte[] bytes = new byte[n];
-        byteBuffer.get(bytes, 0, n);
-        key = Utf8Serializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
-
-        if (size > 0) {
-          value = valueSerializer.fromByteBuffer(byteBuffer);
-        }
-        else {
-          n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
-          bytes = new byte[n];
-          byteBuffer.get(bytes, 0, n);
-          value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
-        }
-      } catch (BufferUnderflowException e) {
-        break;
-      }
-      if (key == null) {
-        break;
-      }
-      if (value == null) {
-        break;
-      }
-      map.put(key, value);
-    }
-    return map;
-  }
-
-  @Override
-  public ComparatorType getComparatorType() {
-    return valueSerializer.getComparatorType();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
index 48973d0..c5db72b 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
@@ -19,16 +19,15 @@
 package org.apache.gora.cassandra.serializers;
 
 import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.ListGenericArray;
 import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.StatefulHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,13 +40,13 @@ public class TypeUtils {
   public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);
 
   // @SuppressWarnings({ "rawtypes", "unchecked" })
-  public static Class getClass(Object value) {
+  public static Class<? extends Object> getClass(Object value) {
     return value.getClass();
   }
 
   public static Schema getSchema(Object value) {
     if (value instanceof GenericArray) {
-      return Schema.createArray( getElementSchema((GenericArray)value) );
+      return Schema.createArray( getElementSchema((GenericArray<?>)value) );
     } else {
       return getSchema( getClass(value) );
     }
@@ -72,9 +71,9 @@ public class TypeUtils {
       return Type.INT;
     } else if (clazz.equals(Long.class) || clazz.equals(long.class)) {
       return Type.LONG;
-    } else if (clazz.equals(ListGenericArray.class)) {
+    } else if (clazz.isAssignableFrom(List.class)) {
       return Type.ARRAY;
-    } else if (clazz.equals(StatefulHashMap.class)) {
+    } else if (clazz.isAssignableFrom(Map.class)) {
       return Type.MAP;
     } else if (clazz.equals(Persistent.class)) {
       return Type.RECORD;
@@ -85,7 +84,7 @@ public class TypeUtils {
     }
   }
 
-  public static Class getClass(Type type) {
+  public static Class<?> getClass(Type type) {
     if (type == Type.STRING) {
       return Utf8.class;
     } else if (type == Type.BOOLEAN) {
@@ -101,9 +100,9 @@ public class TypeUtils {
     } else if (type == Type.LONG) {
       return Long.class;
     } else if (type == Type.ARRAY) {
-      return ListGenericArray.class;
+      return List.class;
     } else if (type == Type.MAP) {
-      return StatefulHashMap.class;
+      return Map.class;
     } else if (type == Type.RECORD) {
       return Persistent.class;
     } else if (type == Type.FIXED) {
@@ -114,7 +113,7 @@ public class TypeUtils {
     }
   }
 
-  public static Schema getSchema(Class clazz) {
+  public static Schema getSchema(Class<?> clazz) {
     Type type = getType(clazz);
     if (type == null) {
       return null;
@@ -157,7 +156,7 @@ public class TypeUtils {
     }
   }
 
-  public static Class getClass(Schema schema) {
+  public static Class<?> getClass(Schema schema) {
     Type type = schema.getType();
     if (type == null) {
       return null;
@@ -198,7 +197,7 @@ public class TypeUtils {
     }
   }
 
-  public static int getFixedSize(Class clazz) {
+  public static int getFixedSize(Class<?> clazz) {
     Type type = getType(clazz);
     if (type == Type.FIXED) {
       try {
@@ -215,15 +214,11 @@ public class TypeUtils {
     }
   }
 
-  public static Schema getElementSchema(GenericArray array) {
+  public static Schema getElementSchema(GenericArray<?> array) {
     Schema schema = array.getSchema();
     return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema;
   }
 
-  public static Type getElementType(ListGenericArray array) {
-    return getElementSchema(array).getType();
-  }
-
   /*
   public static Schema getValueSchema(StatefulHashMap map) {
     return map.getSchema().getValueType();

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
index 19e3668..e69de29 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.ByteBuffer;
-
-import me.prettyprint.cassandra.serializers.AbstractSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
-
-import org.apache.avro.util.Utf8;
-
-/**
- * A Utf8Serializer translates the byte[] to and from Utf8 object of Avro.
- */
-public final class Utf8Serializer extends AbstractSerializer<Utf8> {
-
-  private static final Utf8Serializer instance = new Utf8Serializer();
-
-  public static Utf8Serializer get() {
-    return instance;
-  }
-
-  @Override
-  public ByteBuffer toByteBuffer(Utf8 obj) {
-    if (obj == null) {
-      return null;
-    }
-    return StringSerializer.get().toByteBuffer(obj.toString());
-  }
-
-  @Override
-  public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
-    if (byteBuffer == null) {
-      return null;
-    }
-    return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
-  }
-
-  @Override
-  public ComparatorType getComparatorType() {
-    return UTF8TYPE;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index ddf3de0..416c017 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -18,16 +18,11 @@
 
 package org.apache.gora.cassandra.store;
 
-import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
-import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
-import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -52,62 +47,27 @@ import me.prettyprint.hector.api.HConsistencyLevel;
 import me.prettyprint.hector.api.Serializer;
 
 import org.apache.avro.generic.GenericArray;
-import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.apache.gora.mapreduce.GoraRecordReader;
 import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * CassandraClient is where all of the primary datastore functionality is 
- * executed. Typically CassandraClient is invoked by calling 
- * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}.
- * CassandraClient deals with Cassandra data model definition, mutation, 
- * and general/specific mappings.
- * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} 
- *
- * @param <K>
- * @param <T>
- */
 public class CassandraClient<K, T extends PersistentBase> {
-
-  /** The logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
-
+  
   private Cluster cluster;
   private Keyspace keyspace;
   private Mutator<K> mutator;
   private Class<K> keyClass;
   private Class<T> persistentClass;
-
-  /** Object containing the XML mapping for Cassandra. */
+  
   private CassandraMapping cassandraMapping = null;
 
-  /** Hector client default column family consistency level. */
-  public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
-
-  /** Cassandra serializer to be used for serializing Gora's keys. */
   private Serializer<K> keySerializer;
-
-  /**
-   * Given our key, persistentClass from 
-   * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
-   * we make best efforts to dictate our data model. 
-   * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String)
-   * to see if our keyspace has already been invented, this simple check prevents us from 
-   * recreating the keyspace if it already exists. 
-   * We then simple specify (based on the input keyclass) an appropriate serializer
-   * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
-   * defining a mutator from and by which we can mutate this object.
-   * @param keyClass the Key by which we wish o assign a record object
-   * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data.
-   * @throws Exception
-   */
+  
   public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception {
     this.keyClass = keyClass;
 
@@ -115,16 +75,14 @@ public class CassandraClient<K, T extends PersistentBase> {
     this.persistentClass = persistentClass;
     this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass);
 
-    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), 
-        new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
-
+    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+    
     // add keyspace to cluster
     checkKeyspace();
-
-    // Just create a Keyspace object on the client side, corresponding to an 
-    // already existing keyspace with already created column families.
+    
+    // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
     this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
-
+    
     this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
     this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
   }
@@ -136,21 +94,13 @@ public class CassandraClient<K, T extends PersistentBase> {
     KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
     return (keyspaceDefinition != null);
   }
-
+  
   /**
    * Check if keyspace already exists. If not, create it.
-   * In this method, we also utilize Hector's 
-   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel}
-   * logic. 
-   * It is set by passing a 
-   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right 
-   * when the {@link me.prettyprint.hector.api.Keyspace} is created. 
-   * If we cannot find a consistency level within <code>gora.properites</code>, 
-   * then column family consistency level is set to QUORUM (by default) which permits 
-   * consistency to wait for a quorum of replicas to respond regardless of data center.
-   * QUORUM is Hector Client's default setting and we respect that here as well.
-   * 
-   * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html
+   * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel}
+   * logic. It is set by passing a ConfigurableConsistencyLevel object right 
+   * when the Keyspace is created. Currently consistency level is .ONE which 
+   * permits consistency to wait until one replica has responded. 
    */
   public void checkKeyspace() {
     // "describe keyspace <keyspaceName>;" query
@@ -164,25 +114,24 @@ public class CassandraClient<K, T extends PersistentBase> {
       }
 
       keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), 
-          "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
+          "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);      
       this.cluster.addKeyspace(keyspaceDefinition, true);
+      // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
+      
+      // Create a customized Consistency Level
+      ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
+      Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
 
-      // GORA-167 Create a customized Consistency Level
-      ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
-      Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions);
-      // Column family consistency levels
-      ccl.setReadCfConsistencyLevels(clmap);
-      ccl.setWriteCfConsistencyLevels(clmap);
-      // Operations consistency levels
-      String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
-      ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
-      LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'.");
-      opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
-      ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
-      LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'.");
+      // Define CL.ONE for ColumnFamily "ColumnFamily"
+      clmap.put("ColumnFamily", HConsistencyLevel.ONE);
+
+      // In this we use CL.ONE for read and writes. But you can use different CLs if needed.
+      configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);
+      configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);
 
       // Then let the keyspace know
-      HFactory.createKeyspace("Keyspace", this.cluster, ccl);
+      HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel);
+
       keyspaceDefinition = null;
     }
     else {
@@ -196,10 +145,11 @@ public class CassandraClient<K, T extends PersistentBase> {
           if (! comparatorType.equals(ComparatorType.BYTESTYPE)) {
             // GORA-197
             LOG.warn("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName()
-                + ", not BytesType. It may cause a fatal error on column validation later.");
+              + ", not BytesType. It may cause a fatal error on column validation later.");
           }
           else {
-            // LOG.info("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName() + ".");
+            LOG.debug("The comparator type of " + cfDef.getName() + " column family is " 
+              + comparatorType.getTypeName() + ".");
           }
         }
       }
@@ -207,26 +157,9 @@ public class CassandraClient<K, T extends PersistentBase> {
   }
 
   /**
-   * Method in charge of setting the consistency level for defined column families.
-   * @param pColFams  Column families
-   * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level.
-   */
-  private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) {
-    Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>();
-    // Get columnFamily consistency level.
-    String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
-    LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'.");
-    // Define consistency for ColumnFamily "ColumnFamily"
-    for (ColumnFamilyDefinition colFamDef : pColFams)
-      clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl));
-    return clMap;
-  }
-
-  /**
    * Drop keyspace.
    */
   public void dropKeyspace() {
-    // "drop keyspace <keyspaceName>;" query
     this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
   }
 
@@ -245,21 +178,40 @@ public class CassandraClient<K, T extends PersistentBase> {
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String columnName = this.cassandraMapping.getColumn(fieldName);
     String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(fieldName);
-    if (ttlAttr == null) {
+    if (ttlAttr == null)
       ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL;
-    }
 
     if (columnName == null) {
       LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString());
       return;
     }
-
     synchronized(mutator) {
       HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer, ttlAttr);
     }
   }
 
   /**
+   * Delete a row within the keyspace.
+   * @param key
+   * @param fieldName
+   * @param columnName
+   */
+  public void deleteColumn(K key, String familyName, ByteBuffer columnName) {
+    synchronized(mutator) {
+      HectorUtils.deleteColumn(mutator, key, familyName, columnName);
+    }
+  }
+
+  /**
+   * Deletes an entry based on its key.
+   * @param key
+   */
+  public void deleteByKey(K key) {
+    Map<String, String> familyMap = this.cassandraMapping.getFamilyMap();
+    deleteColumn(key, familyMap.values().iterator().next().toString(), null);
+  }
+
+  /**
    * Insert a member in a super column. This is used for map and record Avro types.
    * @param key the row key
    * @param fieldName the field name
@@ -272,14 +224,12 @@ public class CassandraClient<K, T extends PersistentBase> {
     }
 
     ByteBuffer byteBuffer = toByteBuffer(value);
-
+    
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String superColumnName = this.cassandraMapping.getColumn(fieldName);
     String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(fieldName);
-    if (ttlAttr  == null) {
+    if (ttlAttr == null)
       ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL;
-    }
-
     synchronized(mutator) {
       HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer, ttlAttr);
     }
@@ -318,43 +268,39 @@ public class CassandraClient<K, T extends PersistentBase> {
 
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String superColumnName = this.cassandraMapping.getColumn(fieldName);
-
+    
     synchronized(mutator) {
       HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, columnName);
     }
   }
 
   /**
-   * Deletes a subColumn which is a field inside a column.
-   * @param key Identifying the row.
-   * @param fieldName The field's name.
-   * @param columnName The column's name.
+   * Deletes a subColumn 
+   * @param key
+   * @param fieldName
+   * @param columnName
    */
   public void deleteSubColumn(K key, String fieldName, String columnName) {
     deleteSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName));
   }
 
   /**
-   * Delete a row within the keyspace.
-    * @param key
-    * @param fieldName
-    * @param columnName
-    */
-  public void deleteColumn(K key, String familyName, ByteBuffer columnName) {
+   * Deletes all subcolumns from a super column.
+   * @param key the row key.
+   * @param fieldName the field name.
+   */
+  public void deleteSubColumn(K key, String fieldName) {
+    String columnFamily = this.cassandraMapping.getFamily(fieldName);
+    String superColumnName = this.cassandraMapping.getColumn(fieldName);
     synchronized(mutator) {
-      HectorUtils.deleteColumn(mutator, key, familyName, columnName);
-      }
+      HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, null);
     }
-
-  /**
-   * Delete all content related to a key.
-   * @param key
-   */
-  public void deleteByKey(K key) {
-    Map<String, String> familyMap = this.cassandraMapping.getFamilyMap();
-    deleteColumn(key, familyMap.values().iterator().next().toString(), null);
   }
 
+  public void deleteGenericArray(K key, String fieldName) {
+    //TODO Verify this. Everything that goes inside a genericArray will go inside a column so let's just delete that.
+    deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName));
+  }
   public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
     if (isSuper( cassandraMapping.getFamily(fieldName) )) {
       int i= 0;
@@ -362,14 +308,15 @@ public class CassandraClient<K, T extends PersistentBase> {
 
         // TODO: hack, do not store empty arrays
         if (itemValue instanceof GenericArray<?>) {
-          if (((GenericArray)itemValue).size() == 0) {
+          if (((List<?>)itemValue).size() == 0) {
             continue;
           }
-        } else if (itemValue instanceof StatefulHashMap<?,?>) {
-          if (((StatefulHashMap)itemValue).size() == 0) {
+        } else if (itemValue instanceof Map<?,?>) {
+          if (((Map<?, ?>)itemValue).size() == 0) {
             continue;
           }
         }
+
         addSubColumn(key, fieldName, i++, itemValue);
       }
     }
@@ -378,27 +325,35 @@ public class CassandraClient<K, T extends PersistentBase> {
     }
   }
 
-  public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) {
+  public void deleteStatefulHashMap(K key, String fieldName) {
     if (isSuper( cassandraMapping.getFamily(fieldName) )) {
-      int i= 0;
-      for (Utf8 mapKey: map.keySet()) {
-        if (map.getState(mapKey) == State.DELETED) {
-          deleteSubColumn(key, fieldName, mapKey.toString());
-          continue;
-        }
+      deleteSubColumn(key, fieldName);
+    } else {
+      deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName));
+    }
+  }
 
-        // TODO: hack, do not store empty arrays
-        Object mapValue = map.get(mapKey);
-        if (mapValue instanceof GenericArray<?>) {
-          if (((GenericArray)mapValue).size() == 0) {
-            continue;
-          }
-        } else if (mapValue instanceof StatefulHashMap<?,?>) {
-          if (((StatefulHashMap)mapValue).size() == 0) {
-            continue;
+  public void addStatefulHashMap(K key, String fieldName, Map<CharSequence,Object> map) {
+    if (isSuper( cassandraMapping.getFamily(fieldName) )) {
+      // as we don't know what has changed inside the map or If it's an empty map, then delete its content.
+      deleteSubColumn(key, fieldName);
+      // update if there is anything to update.
+      if (!map.isEmpty()) {
+        // If it's not empty, then update its content.
+        for (CharSequence mapKey: map.keySet()) {
+          // TODO: hack, do not store empty arrays
+          Object mapValue = map.get(mapKey);
+          if (mapValue instanceof GenericArray<?>) {
+            if (((List<?>)mapValue).size() == 0) {
+              continue;
+            }
+          } else if (mapValue instanceof Map<?,?>) {
+            if (((Map<?, ?>)mapValue).size() == 0) {
+              continue;
+            }
           }
+          addSubColumn(key, fieldName, mapKey.toString(), mapValue);
         }
-        addSubColumn(key, fieldName, mapKey.toString(), mapValue);
       }
     }
     else {
@@ -435,7 +390,7 @@ public class CassandraClient<K, T extends PersistentBase> {
    * @return a list of family rows
    */
   public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
-
+    
     String[] columnNames = cassandraQuery.getColumns(family);
     ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
     for (int i = 0; i < columnNames.length; i++) {
@@ -448,87 +403,98 @@ public class CassandraClient<K, T extends PersistentBase> {
     }
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
-
-    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = 
-        HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+    
+    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSlicesQuery.setColumnFamily(family);
     rangeSlicesQuery.setKeys(startKey, endKey);
     rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
     rangeSlicesQuery.setRowCount(limit);
     rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
-
+    
     QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
     OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
-
+    
     return orderedRows.getList();
   }
-
+  
   private String getMappingFamily(String pField){
     String family = null;
-    // TODO checking if it was a UNION field the one we are retrieving
-    family = this.cassandraMapping.getFamily(pField);
-    return family;
-  }
-
+    // checking if it was a UNION field the one we are retrieving
+    if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0)
+      family = this.cassandraMapping.getFamily(pField.substring(0,pField.indexOf(CassandraStore.UNION_COL_SUFIX)));
+    else
+      family = this.cassandraMapping.getFamily(pField);
+     return family;
+   }
+ 
   private String getMappingColumn(String pField){
     String column = null;
-    // TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
-    column = this.cassandraMapping.getColumn(pField);
-    return column;
-  }
+    if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0)
+      column = pField;
+    else
+      column = this.cassandraMapping.getColumn(pField);
+      return column;
+    }
 
   /**
    * Select the families that contain at least one column mapped to a query field.
    * @param query indicates the columns to select
-   * @return a map which keys are the family names and values the corresponding column 
-   * names required to get all the query fields.
+   * @return a map which keys are the family names and values the 
+   * corresponding column names required to get all the query fields.
    */
   public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
     Map<String, List<String>> map = new HashMap<String, List<String>>();
     for (String field: query.getFields()) {
       String family = this.getMappingFamily(field);
       String column = this.getMappingColumn(field);
-
+      
       // check if the family value was already initialized 
       List<String> list = map.get(family);
       if (list == null) {
         list = new ArrayList<String>();
         map.put(family, list);
       }
-
       if (column != null) {
         list.add(column);
       }
-
     }
-
+    
     return map;
   }
 
   /**
-   * Retrieves the cassandraMapping which holds whatever was mapped from the gora-cassandra-mapping.xml
-   * @return
+   * Retrieves the cassandraMapping which holds whatever was mapped 
+   * from the gora-cassandra-mapping.xml
+   * @return 
    */
   public CassandraMapping getCassandraMapping(){
     return this.cassandraMapping;
   }
-
+  
   /**
-   * Select the field names according to the column names, which format if fully qualified: "family:column"
+   * Select the field names according to the column names, which format 
+   * if fully qualified: "family:column"
    * @param query
-   * @return a map which keys are the fully qualified column names and values the query fields
+   * @return a map which keys are the fully qualified column 
+   * names and values the query fields
    */
   public Map<String, String> getReverseMap(Query<K, T> query) {
     Map<String, String> map = new HashMap<String, String>();
     for (String field: query.getFields()) {
       String family = this.getMappingFamily(field);
       String column = this.getMappingColumn(field);
-
+      
       map.put(family + ":" + column, field);
     }
+    
     return map;
   }
 
+  /**
+   * Determines if a column is a superColumn or not.
+   * @param family
+   * @return boolean
+   */
   public boolean isSuper(String family) {
     return this.cassandraMapping.isSuper(family);
   }
@@ -542,19 +508,20 @@ public class CassandraClient<K, T extends PersistentBase> {
     }
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
-
-    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = 
-        HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), 
-            ByteBufferSerializer.get(), ByteBufferSerializer.get());
+    
+    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSuperSlicesQuery.setColumnFamily(family);    
     rangeSuperSlicesQuery.setKeys(startKey, endKey);
     rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
     rangeSuperSlicesQuery.setRowCount(limit);
     rangeSuperSlicesQuery.setColumnNames(columnNames);
-
+    
+    
     QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
     OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
     return orderedRows.getList();
+
+
   }
 
   /**
@@ -562,6 +529,6 @@ public class CassandraClient<K, T extends PersistentBase> {
    * @return Keyspace
    */
   public String getKeyspaceName() {
-    return this.cassandraMapping.getKeyspaceName();
+	return this.cassandraMapping.getKeyspaceName();
   }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index 9a0f492..f8d5315 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -53,6 +53,7 @@ public class CassandraMapping {
   private String clusterName;
   private String keyspaceName;
   
+  
   /**
    * List of the super column families.
    */
@@ -168,9 +169,10 @@ public class CassandraMapping {
         LOG.warn("Using default set to: " + DEFAULT_GCGRACE_SECONDS);
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Located gc_grace_seconds: '" + gcgrace_scs + "'" );
+        LOG.debug("Located gc_grace_seconds: '" + gcgrace_scs + "'" );
         }
       }
+
       String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
       if (superAttribute != null) {
         if (LOG.isDebugEnabled()) {
@@ -183,11 +185,12 @@ public class CassandraMapping {
         cfDef.setColumnType(ColumnType.SUPER);
         cfDef.setSubComparatorType(ComparatorType.BYTESTYPE);
       }
-      
+
       cfDef.setKeyspaceName(this.keyspaceName);
       cfDef.setName(familyName);
       cfDef.setComparatorType(ComparatorType.BYTESTYPE);
       cfDef.setDefaultValidationClass(ComparatorType.BYTESTYPE.getClassName());
+
       cfDef.setGcGraceSeconds(gcgrace_scs!=null?Integer.parseInt(gcgrace_scs):DEFAULT_GCGRACE_SECONDS);
       this.columnFamilyDefinitions.put(familyName, cfDef);
 
@@ -200,7 +203,6 @@ public class CassandraMapping {
       String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
       String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
       String ttlValue = element.getAttributeValue(COLUMNS_TTL_ATTRIBUTE);
-
       if (fieldName == null) {
        LOG.error("Field name is not declared.");
         continue;
@@ -221,55 +223,37 @@ public class CassandraMapping {
       if (columnFamilyDefinition == null) {
         LOG.warn("Family " + familyName + " was not declared in the keyspace.");
       }
-      
+
       this.familyMap.put(fieldName, familyName);
       this.columnMap.put(fieldName, columnName);
-      // TODO we should find a way of storing more values into this map i.e. more column attributes
+      // TODO we should find a way of storing more values into this map
       this.columnAttrMap.put(columnName, ttlValue!=null?ttlValue:DEFAULT_COLUMNS_TTL);
     }
   }
 
   /**
-   * Add new column to CassandraMapping using the self-explanatory parameters
-   * @param pFamilyName
-   * @param pFieldName
-   * @param pColumnName
+   * Add new column to the CassandraMapping using the the below parameters
+   * @param pFamilyName the column family name
+   * @param pFieldName the Avro field from the Schema
+   * @param pColumnName the column name within the column family.
    */
   public void addColumn(String pFamilyName, String pFieldName, String pColumnName){
     this.familyMap.put(pFieldName, pFamilyName);
     this.columnMap.put(pFieldName, pColumnName);
   }
 
-  /**
-   * Gets the columnFamily related to the column name.
-   * @param name
-   * @return
-   */
   public String getFamily(String name) {
     return this.familyMap.get(name);
   }
 
-  /**
-   * Gets the column related to a field.
-   * @param name
-   * @return
-   */
   public String getColumn(String name) {
     return this.columnMap.get(name);
   }
 
-  /**
-   * Gets all the columnFamilies defined.
-   * @return
-   */
   public Map<String,String> getFamilyMap(){
     return this.familyMap;
   }
 
-  /**
-   * Gets all attributes related to a column.
-   * @return
-   */
   public Map<String, String> getColumnsAttribs(){
     return this.columnAttrMap;
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
index 0c77abc..536c8ce 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
@@ -49,7 +49,7 @@ public class CassandraMappingManager {
     return manager;
   }
 
-  /**
+ /**
   * Objects to maintain mapped keyspaces
   */
   private Map<String, Element> keyspaceMap = null;
@@ -78,7 +78,7 @@ public class CassandraMappingManager {
     }
     String keyspaceName = mappingElement.getAttributeValue(KEYSPACE_ELEMENT);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("className=" + className + " -> keyspaceName=" + keyspaceName);
+      LOG.debug("persistentClassName=" + className + " -> keyspaceName=" + keyspaceName);
     }
     Element keyspaceElement = keyspaceMap.get(keyspaceName);
     if (keyspaceElement == null) {