You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by ka...@apache.org on 2012/07/13 19:11:38 UTC

svn commit: r1361302 - in /gora/trunk: ./ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/

Author: kazk
Date: Fri Jul 13 17:11:38 2012
New Revision: 1361302

URL: http://svn.apache.org/viewvc?rev=1361302&view=rev
Log:
Adds serializer packages based on Hector's ones with GORA-142 spec

Added:
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jul 13 17:11:38 2012
@@ -6,6 +6,8 @@ Gora Change Log
 
 0.3 (trunk) Current Development:
 
+* GORA-142 Creates org.apache.gora.cassandra.serializers package in order to clean the code of store and query packages and to support additional types in future. (kazk)
+
 * GORA-148 CassandraMapping supports only (first) keyspace and class in gora-cassandra-mapping.xml (kazk)
 
 * GORA-143 GoraCompiler needs to add "import FixedSize" statement for FIXED type (kazk)

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 13 17:11:38 2012
@@ -20,18 +20,12 @@ package org.apache.gora.cassandra.query;
 
 import java.nio.ByteBuffer;
 
-import me.prettyprint.cassandra.serializers.FloatSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Serializer;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,32 +65,13 @@ public abstract class CassandraColumn {
   public abstract ByteBuffer getName();
   public abstract Object getValue();
   
-
-  protected Object fromByteBuffer(Type type, ByteBuffer byteBuffer) {
+  protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
-    switch (type) {
-      case STRING:
-        value = new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
-        break;
-      case BYTES:
-        value = byteBuffer;
-        break;
-      case INT:
-        value = IntegerSerializer.get().fromByteBuffer(byteBuffer);
-        break;
-      case LONG:
-        value = LongSerializer.get().fromByteBuffer(byteBuffer);
-        break;
-      case FLOAT:
-        value = FloatSerializer.get().fromByteBuffer(byteBuffer);
-        break;
-      case DOUBLE:
-        value = DoubleSerializer.get().fromByteBuffer(byteBuffer);
-        break;
-
-      default:
-        LOG.info("Type is not supported: " + type);
-
+    Serializer serializer = GoraSerializerTypeInferer.getSerializer(schema);
+    if (serializer == null) {
+      LOG.info("Schema is not supported: " + schema.toString());
+    } else {
+      value = serializer.fromByteBuffer(byteBuffer);
     }
     return value;
   }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 13 17:11:38 2012
@@ -26,6 +26,7 @@ import me.prettyprint.cassandra.serializ
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.specific.SpecificFixed;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 13 17:11:38 2012
@@ -37,6 +37,8 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.serializers.GenericArraySerializer;
+import org.apache.gora.cassandra.serializers.TypeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,28 +68,16 @@ public class CassandraSubColumn extends 
     Schema fieldSchema = field.schema();
     Type type = fieldSchema.getType();
     ByteBuffer byteBuffer = hColumn.getValue();
+    if (byteBuffer == null) {
+      return null;
+    }
     Object value = null;
     if (type == Type.ARRAY) {
-      // convert string to array
-      String valueString = StringSerializer.get().fromByteBuffer(byteBuffer);
-      valueString = valueString.substring(1, valueString.length()-1);
-      String[] elements = valueString.split(", ");
-
-      Type elementType = fieldSchema.getElementType().getType();
-      if (elementType == Schema.Type.STRING) {
-        // the array type is String
-        GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
-        for (String element: elements) {
-          genericArray.add(element);
-        }
-
-        value = genericArray;
-      } else {
-        LOG.info("Element type not supported: " + elementType);
-      }
-    }
-    else {
-      value = fromByteBuffer(type, byteBuffer);
+      GenericArraySerializer serializer = GenericArraySerializer.get(fieldSchema.getElementType());
+      GenericArray genericArray = serializer.fromByteBuffer(byteBuffer);
+      value = genericArray;
+    } else {
+      value = fromByteBuffer(fieldSchema, byteBuffer);
     }
 
     return value;

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 13 17:11:38 2012
@@ -31,6 +31,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.serializers.Utf8Serializer;
 import org.apache.gora.persistency.ListGenericArray;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.persistency.impl.PersistentBase;
@@ -55,12 +56,11 @@ public class CassandraSuperColumn extend
     
     switch (type) {
       case ARRAY:
-        Type elementType = fieldSchema.getElementType().getType();
-        GenericArray array = new ListGenericArray(Schema.create(elementType));
+        ListGenericArray array = new ListGenericArray(fieldSchema.getElementType());
         
         for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
           ByteBuffer memberByteBuffer = hColumn.getValue();
-          Object memberValue = fromByteBuffer(elementType, hColumn.getValue());
+          Object memberValue = fromByteBuffer(fieldSchema.getElementType(), hColumn.getValue());
           // int i = IntegerSerializer().get().fromByteBuffer(hColumn.getName());
           array.add(memberValue);      
         }
@@ -69,12 +69,12 @@ public class CassandraSuperColumn extend
         break;
       case MAP:
         Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
-        Type valueType = fieldSchema.getValueType().getType();
         
         for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
           ByteBuffer memberByteBuffer = hColumn.getValue();
-          Object memberValue = fromByteBuffer(valueType, hColumn.getValue());
-          map.put(new Utf8(StringSerializer.get().fromByteBuffer(hColumn.getName())), memberValue);      
+          Object memberValue = null;
+          memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());
+          map.put(Utf8Serializer.get().fromByteBuffer(hColumn.getName()), memberValue);      
         }
         value = map;
         
@@ -106,6 +106,10 @@ public class CassandraSuperColumn extend
 
           for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
             String memberName = StringSerializer.get().fromByteBuffer(hColumn.getName());
+            if (memberName == null || memberName.length() == 0) {
+              LOG.warn("member name is null or empty.");
+              continue;
+            }
             Field memberField = fieldSchema.getField(memberName);
             CassandraSubColumn cassandraColumn = new CassandraSubColumn();
             cassandraColumn.setField(memberField);

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.ListGenericArray;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro.
+ */
+public class GenericArraySerializer<T> extends AbstractSerializer<GenericArray<T>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(GenericArraySerializer.class);
+
+  private static Map<Type, GenericArraySerializer> elementTypeToSerializerMap = new HashMap<Type, GenericArraySerializer>();
+  private static Map<Class, GenericArraySerializer> fixedClassToSerializerMap = new HashMap<Class, GenericArraySerializer>();
+
+  public static GenericArraySerializer get(Type elementType) {
+    GenericArraySerializer serializer = elementTypeToSerializerMap.get(elementType);
+    if (serializer == null) {
+      serializer = new GenericArraySerializer(elementType);
+      elementTypeToSerializerMap.put(elementType, serializer);
+    }
+    return serializer;
+  }
+
+  public static GenericArraySerializer get(Type elementType, Class clazz) {
+    if (elementType != Type.FIXED) {
+      return null;
+    }
+    GenericArraySerializer serializer = elementTypeToSerializerMap.get(clazz);
+    if (serializer == null) {
+      serializer = new GenericArraySerializer(clazz);
+      fixedClassToSerializerMap.put(clazz, serializer);
+    }
+    return serializer;
+  }
+
+  public static GenericArraySerializer get(Schema elementSchema) {
+    Type type = elementSchema.getType();
+    if (type == Type.FIXED) {
+      return get(Type.FIXED, TypeUtils.getClass(elementSchema));
+    } else {
+      return get(type);
+    }
+  }
+
+  private Schema elementSchema = null;
+  private Type elementType = null;
+  private int size = -1;
+  private Class<T> clazz = null;
+  private Serializer<T> elementSerializer = null;
+
+  public GenericArraySerializer(Serializer<T> elementSerializer) {
+    this.elementSerializer = elementSerializer;
+  }
+
+  public GenericArraySerializer(Schema elementSchema) {
+    this.elementSchema = elementSchema;
+    elementType = elementSchema.getType();
+    size = TypeUtils.getFixedSize(elementSchema);
+    elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema);
+  }
+
+  public GenericArraySerializer(Type elementType) {
+    this.elementType = elementType;
+    if (elementType != Type.FIXED) {
+      elementSchema = Schema.create(elementType);
+    }
+    clazz = TypeUtils.getClass(elementType);
+    size = TypeUtils.getFixedSize(elementType);
+    elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+  }
+
+  public GenericArraySerializer(Class<T> clazz) {
+    this.clazz = clazz;
+    elementType = TypeUtils.getType(clazz);
+    size = TypeUtils.getFixedSize(clazz);
+    if (elementType == null || elementType == Type.FIXED) {
+      elementType = Type.FIXED;
+      elementSchema = TypeUtils.getSchema(clazz);
+      elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz);
+    } else {
+      elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+    }
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(GenericArray<T> array) {
+    if (array == null) {
+      return null;
+    }
+    if (size > 0) {
+      return toByteBufferWithFixedLengthElements(array);
+    } else {
+      return toByteBufferWithVariableLengthElements(array);
+    }
+  }
+
+  private ByteBuffer toByteBufferWithFixedLengthElements(GenericArray<T> array) {
+    ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size);
+    for (T element : array) {
+      byteBuffer.put(elementSerializer.toByteBuffer(element));
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  private ByteBuffer toByteBufferWithVariableLengthElements(GenericArray<T> array) {
+    int n = (int) array.size();
+    List<byte[]> list = new ArrayList<byte[]>(n);
+    n *= 4;
+    for (T element : array) {
+      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element));
+      list.add(bytes);
+      n += bytes.length;
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+    for (byte[] bytes : list) {
+      byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  @Override
+  public GenericArray<T> fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+    GenericArray<T> array = new ListGenericArray<T>(elementSchema);
+int i = 0;
+    while (true) {
+      T element = null;
+      try {
+        if (size > 0) {
+          element = elementSerializer.fromByteBuffer(byteBuffer);
+        }
+        else {
+          int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+          byte[] bytes = new byte[n];
+          byteBuffer.get(bytes, 0, n);
+          element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+        }
+      } catch (BufferUnderflowException e) {
+        break;
+      }
+      if (element == null) {
+        break;
+      }
+      array.add(element);
+    }
+    return array;
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return elementSerializer.getComparatorType();
+  }
+
+}

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.ByteBuffer;
+
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BooleanSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
+import me.prettyprint.hector.api.Serializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that infers the concrete Serializer needed to turn a value into
+ * its binary representation
+ */
+public class GoraSerializerTypeInferer {
+
+  public static final Logger LOG = LoggerFactory.getLogger(GoraSerializerTypeInferer.class);
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <T> Serializer<T> getSerializer(Object value) {
+    Serializer serializer = null;
+    if (value == null) {
+      serializer = ByteBufferSerializer.get();
+    } else if (value instanceof Utf8) {
+      serializer = Utf8Serializer.get();
+    } else if (value instanceof Boolean) {
+      serializer = BooleanSerializer.get();
+    } else if (value instanceof ByteBuffer) {
+      serializer = ByteBufferSerializer.get();
+    } else if (value instanceof byte[]) {
+      serializer = BytesArraySerializer.get();
+    } else if (value instanceof Double) {
+      serializer = DoubleSerializer.get();
+    } else if (value instanceof Float) {
+      serializer = FloatSerializer.get();
+    } else if (value instanceof Integer) {
+      serializer = IntegerSerializer.get();
+    } else if (value instanceof Long) {
+      serializer = LongSerializer.get();
+    } else if (value instanceof String) {
+      serializer = StringSerializer.get();
+    } else if (value instanceof SpecificFixed) {
+      serializer = SpecificFixedSerializer.get(value.getClass());
+    } else if (value instanceof GenericArray) {
+      Schema schema = ((GenericArray)value).getSchema();
+      if (schema.getType() == Type.ARRAY) {
+        schema = schema.getElementType();
+      }
+      serializer = GenericArraySerializer.get(schema);
+    } else {
+      serializer = SerializerTypeInferer.getSerializer(value);
+    }
+    return serializer;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <T> Serializer<T> getSerializer(Class<?> valueClass) {
+    Serializer serializer = null;
+    if (valueClass.equals(Utf8.class)) {
+      serializer = Utf8Serializer.get();
+    } else if (valueClass.equals(Boolean.class) || valueClass.equals(boolean.class)) {
+      serializer = BooleanSerializer.get();
+    } else if (valueClass.equals(ByteBuffer.class)) {
+      serializer = ByteBufferSerializer.get();
+    } else if (valueClass.equals(Double.class) || valueClass.equals(double.class)) {
+      serializer = DoubleSerializer.get();
+    } else if (valueClass.equals(Float.class) || valueClass.equals(float.class)) {
+      serializer = FloatSerializer.get();
+    } else if (valueClass.equals(Integer.class) || valueClass.equals(int.class)) {
+      serializer = IntegerSerializer.get();
+    } else if (valueClass.equals(Long.class) || valueClass.equals(long.class)) {
+      serializer = LongSerializer.get();
+    } else if (valueClass.equals(String.class)) {
+      serializer = StringSerializer.get();
+    } else {
+      serializer = SerializerTypeInferer.getSerializer(valueClass);
+    }
+    return serializer;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <T> Serializer<T> getSerializer(Schema schema) {
+    Serializer serializer = null;
+    Type type = schema.getType();
+    if (type == Type.STRING) {
+      serializer = Utf8Serializer.get();
+    } else if (type == Type.BOOLEAN) {
+      serializer = BooleanSerializer.get();
+    } else if (type == Type.BYTES) {
+      serializer = ByteBufferSerializer.get();
+    } else if (type == Type.DOUBLE) {
+      serializer = DoubleSerializer.get();
+    } else if (type == Type.FLOAT) {
+      serializer = FloatSerializer.get();
+    } else if (type == Type.INT) {
+      serializer = IntegerSerializer.get();
+    } else if (type == Type.LONG) {
+      serializer = LongSerializer.get();
+    } else if (type == Type.FIXED) {
+      Class clazz = TypeUtils.getClass(schema);
+      serializer = SpecificFixedSerializer.get(clazz);
+      // serializer = SpecificFixedSerializer.get(schema);
+    } else if (type == Type.ARRAY) {
+      serializer = GenericArraySerializer.get(schema.getElementType());
+    } else {
+      serializer = null;
+    }
+    return serializer;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <T> Serializer<T> getSerializer(Type type) {
+    Serializer serializer = null;
+    if (type == Type.STRING) {
+      serializer = Utf8Serializer.get();
+    } else if (type == Type.BOOLEAN) {
+      serializer = BooleanSerializer.get();
+    } else if (type == Type.BYTES) {
+      serializer = ByteBufferSerializer.get();
+    } else if (type == Type.DOUBLE) {
+      serializer = DoubleSerializer.get();
+    } else if (type == Type.FLOAT) {
+      serializer = FloatSerializer.get();
+    } else if (type == Type.INT) {
+      serializer = IntegerSerializer.get();
+    } else if (type == Type.LONG) {
+      serializer = LongSerializer.get();
+    } else if (type == Type.FIXED) {
+      serializer = SpecificFixedSerializer.get();
+    } else {
+      serializer = null;
+    }
+    return serializer;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <T> Serializer<T> getSerializer(Type type, Type elementType) {
+    Serializer serializer = null;
+    if (type == null) {
+      if (elementType == null) {
+        serializer = null;
+      } else {
+        serializer = getSerializer(elementType);
+      }
+    } else {
+      if (elementType == null) {
+        serializer = getSerializer(type);
+      }
+    }
+
+    if (type == Type.ARRAY) {
+      serializer = GenericArraySerializer.get(elementType);
+    } else {
+      serializer = null;
+    }
+    return serializer;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <T> Serializer<T> getSerializer(Type type, Class<T> clazz) {
+    Serializer serializer = null;
+    if (type != Type.FIXED) {
+      serializer = null;
+    }
+    if (clazz == null) {
+      serializer = null;
+    } else {
+      serializer = SpecificFixedSerializer.get(clazz);
+    }
+    return serializer;
+  }
+
+}

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.BYTESTYPE;
+
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SpecificFixedSerializer translates the byte[] to and from SpecificFixed of Avro.
+ */
+public class SpecificFixedSerializer extends AbstractSerializer<SpecificFixed> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(SpecificFixedSerializer.class);
+
+  // for toByteBuffer
+  private static SpecificFixedSerializer serializer = new SpecificFixedSerializer(SpecificFixed.class);
+
+  // for fromByteBuffer, requiring Class info
+  public static SpecificFixedSerializer get() {
+    return serializer;
+  }
+
+  private static Map<Class, SpecificFixedSerializer> classToSerializerMap = new HashMap<Class, SpecificFixedSerializer>();
+
+  public static SpecificFixedSerializer get(Class clazz) {
+    SpecificFixedSerializer serializer = classToSerializerMap.get(clazz);
+    if (serializer == null) {
+      serializer = new SpecificFixedSerializer(clazz);
+      classToSerializerMap.put(clazz, serializer);
+    }
+    return serializer;
+  }
+
+  private Class<? extends SpecificFixed> clazz;
+
+  public SpecificFixedSerializer(Class<? extends SpecificFixed> clazz) {
+    this.clazz = clazz;
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(SpecificFixed fixed) {
+    if (fixed == null) {
+      return null;
+    }
+    byte[] bytes = fixed.bytes();
+    if (bytes.length < 1) {
+      return null;
+    }
+    return BytesArraySerializer.get().toByteBuffer(bytes);
+  }
+
+  @Override
+  public SpecificFixed fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+
+    Object value = null;
+    try {
+      value = clazz.newInstance();
+    } catch (InstantiationException ie) {
+      LOG.warn("Instantiation error for class=" + clazz, ie);
+      return null;
+    } catch (IllegalAccessException iae) {
+      LOG.warn("Illegal access error for class=" + clazz, iae);
+      return null;
+    }
+
+    if (! (value instanceof SpecificFixed)) {
+      LOG.warn("Not an instance of SpecificFixed");
+      return null;
+    }
+
+    SpecificFixed fixed = (SpecificFixed) value;
+    byte[] bytes = fixed.bytes();
+    try {
+      byteBuffer.get(bytes, 0, bytes.length);
+    }
+    catch (BufferUnderflowException e) {
+      // LOG.info(e.toString() + " : class=" + clazz.getName() + " length=" + bytes.length);
+      throw e;
+    }
+    fixed.bytes(bytes);
+    return fixed;
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return BYTESTYPE;
+  }
+
+}

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.ListGenericArray;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.StatefulHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that infers the concrete Serializer needed to turn a value into
+ * its binary representation
+ */
+public class TypeUtils {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);
+
+  // @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static Class getClass(Object value) {
+    return value.getClass();
+  }
+
+  public static Schema getSchema(Object value) {
+    if (value instanceof GenericArray) {
+      return Schema.createArray( getElementSchema((GenericArray)value) );
+    } else {
+      return getSchema( getClass(value) );
+    }
+  }
+
+  public static Type getType(Object value) {
+    return getType( getClass(value) );
+  }
+
+  public static Type getType(Class<?> clazz) {
+    if (clazz.equals(Utf8.class)) {
+      return Type.STRING;
+    } else if (clazz.equals(Boolean.class) || clazz.equals(boolean.class)) {
+      return Type.BOOLEAN;
+    } else if (clazz.equals(ByteBuffer.class)) {
+      return Type.BYTES;
+    } else if (clazz.equals(Double.class) || clazz.equals(double.class)) {
+      return Type.DOUBLE;
+    } else if (clazz.equals(Float.class) || clazz.equals(float.class)) {
+      return Type.FLOAT;
+    } else if (clazz.equals(Integer.class) || clazz.equals(int.class)) {
+      return Type.INT;
+    } else if (clazz.equals(Long.class) || clazz.equals(long.class)) {
+      return Type.LONG;
+    } else if (clazz.equals(ListGenericArray.class)) {
+      return Type.ARRAY;
+    } else if (clazz.equals(StatefulHashMap.class)) {
+      return Type.MAP;
+    } else if (clazz.equals(Persistent.class)) {
+      return Type.RECORD;
+    } else if (clazz.getSuperclass().equals(SpecificFixed.class)) {
+      return Type.FIXED;
+    } else {
+      return null;
+    }
+  }
+
+  public static Class getClass(Type type) {
+    if (type == Type.STRING) {
+      return Utf8.class;
+    } else if (type == Type.BOOLEAN) {
+      return Boolean.class;
+    } else if (type == Type.BYTES) {
+      return ByteBuffer.class;
+    } else if (type == Type.DOUBLE) {
+      return Double.class;
+    } else if (type == Type.FLOAT) {
+      return Float.class;
+    } else if (type == Type.INT) {
+      return Integer.class;
+    } else if (type == Type.LONG) {
+      return Long.class;
+    } else if (type == Type.ARRAY) {
+      return ListGenericArray.class;
+    } else if (type == Type.MAP) {
+      return StatefulHashMap.class;
+    } else if (type == Type.RECORD) {
+      return Persistent.class;
+    } else if (type == Type.FIXED) {
+      // return SpecificFixed.class;
+      return null;
+    } else {
+      return null;
+    }
+  }
+
+  public static Schema getSchema(Class clazz) {
+    Type type = getType(clazz);
+    if (type == null) {
+      return null;
+    } else if (type == Type.FIXED) {
+      int size = getFixedSize(clazz);
+      String name = clazz.getName();
+      String space = null;
+      int n = name.lastIndexOf(".");
+      if (n < 0) {
+        space = name.substring(0,n);
+        name = name.substring(n+1);
+      } else {
+        space = null;
+      }
+      String doc = null; // ?
+      // LOG.info(Schema.createFixed(name, doc, space, size).toString());
+      return Schema.createFixed(name, doc, space, size);
+    } else if (type == Type.ARRAY) {
+      Object obj = null;
+      try {
+        obj = clazz.newInstance();
+      } catch (InstantiationException e) {
+        LOG.warn(e.toString());
+        return null;
+      } catch (IllegalAccessException e) {
+        LOG.warn(e.toString());
+        return null;
+      }
+      return getSchema(obj);
+    } else if (type == Type.MAP) {
+      // TODO
+      // return Schema.createMap(...);
+      return null;
+    } else if (type == Type.RECORD) {
+      // TODO
+      // return Schema.createRecord(...);
+      return null;
+    } else {
+      return Schema.create(type);
+    }
+  }
+
+  public static Class getClass(Schema schema) {
+    Type type = schema.getType();
+    if (type == null) {
+      return null;
+    } else if (type == Type.FIXED) {
+      try {
+        return Class.forName( schema.getFullName() );
+      } catch (ClassNotFoundException e) {
+        LOG.warn(e.toString() + " : " + schema);
+        return null;
+      }
+    } else {
+      return getClass(type);
+    }
+  }
+
+  public static int getFixedSize(Type type) {
+    if (type == Type.BOOLEAN) {
+      return 1;
+    } else if (type == Type.DOUBLE) {
+      return 8;
+    } else if (type == Type.FLOAT) {
+      return 4;
+    } else if (type == Type.INT) {
+      return 4;
+    } else if (type == Type.LONG) {
+      return 8;
+    } else {
+      return -1;
+    }
+  }
+
+  public static int getFixedSize(Schema schema) {
+    Type type = schema.getType();
+    if (type == Type.FIXED) {
+      return schema.getFixedSize();
+    } else {
+      return getFixedSize(type);
+    }
+  }
+
+  public static int getFixedSize(Class clazz) {
+    Type type = getType(clazz);
+    if (type == Type.FIXED) {
+      try {
+        return ((SpecificFixed)clazz.newInstance()).bytes().length;
+      } catch (InstantiationException e) {
+        LOG.warn(e.toString());
+        return -1;
+      } catch (IllegalAccessException e) {
+        LOG.warn(e.toString());
+        return -1;
+      }
+    } else {
+      return getFixedSize(type);
+    }
+  }
+
+  public static Schema getElementSchema(GenericArray array) {
+    Schema schema = array.getSchema();
+    return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema;
+  }
+
+  public static Type getElementType(ListGenericArray array) {
+    return getElementSchema(array).getType();
+  }
+
+  /*
+  public static Schema getValueSchema(StatefulHashMap map) {
+    return map.getSchema().getValueType();
+  }
+
+  public static Type getValueType(StatefulHashMap map) {
+    return getValueSchema(map).getType();
+  }
+  */
+
+}

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.ByteBuffer;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import org.apache.avro.util.Utf8;
+
+/**
+ * A Utf8Serializer translates the byte[] to and from Utf8 object of Avro.
+ */
+public final class Utf8Serializer extends AbstractSerializer<Utf8> {
+
+  private static final Utf8Serializer instance = new Utf8Serializer();
+
+  public static Utf8Serializer get() {
+    return instance;
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(Utf8 obj) {
+    if (obj == null) {
+      return null;
+    }
+    return StringSerializer.get().toByteBuffer(obj.toString());
+  }
+
+  @Override
+  public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+    return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return UTF8TYPE;
+  }
+
+}

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 13 17:11:38 2012
@@ -25,12 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.FloatSerializer;
 import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
 import me.prettyprint.cassandra.service.CassandraHostConfigurator;
 import me.prettyprint.hector.api.Cluster;
 import me.prettyprint.hector.api.Keyspace;
@@ -45,12 +43,17 @@ import me.prettyprint.hector.api.mutatio
 import me.prettyprint.hector.api.query.QueryResult;
 import me.prettyprint.hector.api.query.RangeSlicesQuery;
 import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 import me.prettyprint.hector.api.HConsistencyLevel;
 import me.prettyprint.hector.api.Serializer;
 
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
+import org.apache.gora.cassandra.serializers.GenericArraySerializer;
+import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
+import org.apache.gora.cassandra.serializers.TypeUtils;
 import org.apache.gora.mapreduce.GoraRecordReader;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
@@ -86,7 +89,7 @@ public class CassandraClient<K, T extend
     // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
     this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
     
-    this.keySerializer = SerializerTypeInferer.getSerializer(keyClass);
+    this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
     this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
   }
   
@@ -148,8 +151,12 @@ public class CassandraClient<K, T extend
     
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String columnName = this.cassandraMapping.getColumn(fieldName);
+    if (columnName == null) {
+      LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString());
+      return;
+    }
     
-    this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer, StringSerializer.get(), ByteBufferSerializer.get()));
+    HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer);
   }
 
   /**
@@ -164,14 +171,43 @@ public class CassandraClient<K, T extend
     if (value == null) {
       return;
     }
-    
+
     ByteBuffer byteBuffer = toByteBuffer(value);
     
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String superColumnName = this.cassandraMapping.getColumn(fieldName);
     
-    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(columnName, byteBuffer, ByteBufferSerializer.get(), ByteBufferSerializer.get())), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()));
-    
+    HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer);
+  }
+
+  public void addSubColumn(K key, String fieldName, String columnName, Object value) {
+    addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
+  }
+
+  public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
+    addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public void addGenericArray(K key, String fieldName, GenericArray array) {
+    if (isSuper( cassandraMapping.getFamily(fieldName) )) {
+      int i= 0;
+      for (Object itemValue: array) {
+
+        // TODO: hack, do not store empty arrays
+        if (itemValue instanceof GenericArray<?>) {
+          if (((GenericArray)itemValue).size() == 0) {
+            continue;
+          }
+        }
+
+        addSubColumn(key, fieldName, i++, itemValue);
+      }
+    }
+    else {
+      addColumn(key, fieldName, array);
+    }
   }
 
   /**
@@ -181,29 +217,17 @@ public class CassandraClient<K, T extend
    */
   @SuppressWarnings("unchecked")
   public ByteBuffer toByteBuffer(Object value) {
-    if (value == null) {
-      return null;
-    }
-    
     ByteBuffer byteBuffer = null;
-    if (value instanceof ByteBuffer) {
-      byteBuffer = (ByteBuffer) value;
-    }
-    else if (value instanceof Utf8) {
-      byteBuffer = StringSerializer.get().toByteBuffer(((Utf8)value).toString());
-    }
-    else if (value instanceof Float) {
-      // workaround for hector-core-1.0-1.jar
-      // because SerializerTypeInferer.getSerializer(Float ) returns ObjectSerializer !?
-      byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
-    }
-    else if (value instanceof Double) {
-      // workaround for hector-core-1.0-1.jar
-      // because SerializerTypeInferer.getSerializer(Double ) returns ObjectSerializer !?
-      byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
+    Serializer serializer = GoraSerializerTypeInferer.getSerializer(value);
+    if (serializer == null) {
+      LOG.info("Serializer not found for: " + value.toString());
     }
     else {
-      byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
+      byteBuffer = serializer.toByteBuffer(value);
+    }
+
+    if (byteBuffer == null) {
+      LOG.info("value class=" + value.getClass().getName() + " value=" + value + " -> null");
     }
     
     return byteBuffer;

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1361302&r1=1361301&r2=1361302&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 13 17:11:38 2012
@@ -41,6 +41,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraResult;
@@ -299,8 +300,7 @@ public class CassandraStore<K, T extends
             break;
           case ARRAY:
             GenericArray array = (GenericArray) fieldValue;
-            Type elementType = fieldSchema.getElementType().getType();
-            GenericArray newArray = new ListGenericArray(Schema.create(elementType));
+            ListGenericArray newArray = new ListGenericArray(fieldSchema.getElementType());
             Iterator iter = array.iterator();
             while (iter.hasNext()) {
               newArray.add(iter.next());
@@ -328,11 +328,13 @@ public class CassandraStore<K, T extends
     Type type = schema.getType();
     switch (type) {
       case STRING:
+      case BOOLEAN:
       case INT:
       case LONG:
       case BYTES:
       case FLOAT:
       case DOUBLE:
+      case FIXED:
         this.cassandraClient.addColumn(key, field.name(), value);
         break;
       case RECORD:
@@ -344,16 +346,12 @@ public class CassandraStore<K, T extends
               // TODO: hack, do not store empty arrays
               Object memberValue = persistentBase.get(member.pos());
               if (memberValue instanceof GenericArray<?>) {
-                GenericArray<String> array = (GenericArray<String>) memberValue;
-                if (array.size() == 0) {
+                if (((GenericArray)memberValue).size() == 0) {
                   continue;
                 }
               }
-              
-              if (memberValue instanceof Utf8) {
-                memberValue = memberValue.toString();
-              }
-              this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(member.name()), memberValue);
+
+              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
             }
           } else {
             LOG.info("Record not supported: " + value.toString());
@@ -371,16 +369,12 @@ public class CassandraStore<K, T extends
               // TODO: hack, do not store empty arrays
               Object keyValue = map.get(mapKey);
               if (keyValue instanceof GenericArray<?>) {
-                GenericArray<String> array = (GenericArray<String>) keyValue;
-                if (array.size() == 0) {
+                if (((GenericArray)keyValue).size() == 0) {
                   continue;
                 }
               }
-              
-              if (keyValue instanceof Utf8) {
-                keyValue = keyValue.toString();
-              }
-              this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(mapKey.toString()), keyValue);              
+
+              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
             }
           } else {
             LOG.info("Map not supported: " + value.toString());
@@ -390,14 +384,7 @@ public class CassandraStore<K, T extends
       case ARRAY:
         if (value != null) {
           if (value instanceof GenericArray<?>) {
-            GenericArray<Object> array = (GenericArray<Object>) value;
-            int i= 0;
-            for (Object itemValue: array) {
-              if (itemValue instanceof Utf8) {
-                itemValue = itemValue.toString();
-              }
-              this.cassandraClient.addSubColumn(key, field.name(), IntegerSerializer.get().toByteBuffer(i++), itemValue);              
-            }
+            this.cassandraClient.addGenericArray(key, field.name(), (GenericArray)value);
           } else {
             LOG.info("Array not supported: " + value.toString());
           }

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1361302&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java Fri Jul 13 17:11:38 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.store;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.Serializer;
+
+import org.apache.gora.persistency.Persistent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HectorUtils<K,T extends Persistent> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(HectorUtils.class);
+  
+  public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue) {
+    mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
+  }
+
+  public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, String columnName, ByteBuffer columnValue) {
+    mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
+  }
+
+
+  public static<K> HColumn<ByteBuffer,ByteBuffer> createColumn(ByteBuffer name, ByteBuffer value) {
+    return HFactory.createColumn(name, value, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+  }
+
+  public static<K> HColumn<String,ByteBuffer> createColumn(String name, ByteBuffer value) {
+    return HFactory.createColumn(name, value, StringSerializer.get(), ByteBufferSerializer.get());
+  }
+
+  public static<K> HColumn<Integer,ByteBuffer> createColumn(Integer name, ByteBuffer value) {
+    return HFactory.createColumn(name, value, IntegerSerializer.get(), ByteBufferSerializer.get());
+  }
+
+
+  public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName, ByteBuffer columnValue) {
+    mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue));
+  }
+
+  public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, String columnName, ByteBuffer columnValue) {
+    mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue));
+  }
+
+  public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, Integer columnName, ByteBuffer columnValue) {
+    mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue));
+  }
+
+
+  public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue) {
+    return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
+  }
+
+  public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue) {
+    return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
+  }
+
+  public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue) {
+    return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
+  }
+
+}