You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by ka...@apache.org on 2012/07/18 17:28:06 UTC

svn commit: r1362980 - in /gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra: query/ serializers/ store/

Author: kazk
Date: Wed Jul 18 15:28:06 2012
New Revision: 1362980

URL: http://svn.apache.org/viewvc?rev=1362980&view=rev
Log:
Fixes GORA-149 to add a serializer for MAP

Added:
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
Modified:
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Wed Jul 18 15:28:06 2012
@@ -38,7 +38,9 @@ import org.apache.avro.generic.GenericAr
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.serializers.GenericArraySerializer;
+import org.apache.gora.cassandra.serializers.StatefulHashMapSerializer;
 import org.apache.gora.cassandra.serializers.TypeUtils;
+import org.apache.gora.persistency.StatefulHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,6 +78,10 @@ public class CassandraSubColumn extends 
       GenericArraySerializer serializer = GenericArraySerializer.get(fieldSchema.getElementType());
       GenericArray genericArray = serializer.fromByteBuffer(byteBuffer);
       value = genericArray;
+    } else if (type == Type.MAP) {
+      StatefulHashMapSerializer serializer = StatefulHashMapSerializer.get(fieldSchema.getValueType());
+      StatefulHashMap map = serializer.fromByteBuffer(byteBuffer);
+      value = map;
     } else {
       value = fromByteBuffer(fieldSchema, byteBuffer);
     }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Wed Jul 18 15:28:06 2012
@@ -37,6 +37,8 @@ import org.apache.avro.generic.GenericAr
 import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
 
+import org.apache.gora.persistency.StatefulHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +81,16 @@ public class GoraSerializerTypeInferer {
         schema = schema.getElementType();
       }
       serializer = GenericArraySerializer.get(schema);
+    } else if (value instanceof StatefulHashMap) {
+      StatefulHashMap map = (StatefulHashMap)value;
+      if (map.size() == 0) {
+        serializer = ByteBufferSerializer.get();
+      }
+      else {
+        Object value0 = map.values().iterator().next();
+        Schema schema = TypeUtils.getSchema(value0);
+        serializer = StatefulHashMapSerializer.get(schema);
+      }
     } else {
       serializer = SerializerTypeInferer.getSerializer(value);
     }
@@ -134,6 +146,8 @@ public class GoraSerializerTypeInferer {
       // serializer = SpecificFixedSerializer.get(schema);
     } else if (type == Type.ARRAY) {
       serializer = GenericArraySerializer.get(schema.getElementType());
+    } else if (type == Type.MAP) {
+      serializer = StatefulHashMapSerializer.get(schema.getValueType());
     } else {
       serializer = null;
     }
@@ -182,6 +196,8 @@ public class GoraSerializerTypeInferer {
 
     if (type == Type.ARRAY) {
       serializer = GenericArraySerializer.get(elementType);
+    } else if (type == Type.MAP) {
+      serializer = StatefulHashMapSerializer.get(elementType);
     } else {
       serializer = null;
     }

Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java?rev=1362980&view=auto
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java (added)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java Wed Jul 18 15:28:06 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StatefulHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A StatefulHashMapSerializer translates the byte[] to and from StatefulHashMap of Avro.
+ */
+public class StatefulHashMapSerializer<T> extends AbstractSerializer<StatefulHashMap<Utf8, T>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(StatefulHashMapSerializer.class);
+
+  private static Map<Type, StatefulHashMapSerializer> valueTypeToSerializerMap = new HashMap<Type, StatefulHashMapSerializer>();
+  private static Map<Class, StatefulHashMapSerializer> fixedClassToSerializerMap = new HashMap<Class, StatefulHashMapSerializer>();
+
+  public static StatefulHashMapSerializer get(Type valueType) {
+    StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(valueType);
+    if (serializer == null) {
+      serializer = new StatefulHashMapSerializer(valueType);
+      valueTypeToSerializerMap.put(valueType, serializer);
+    }
+    return serializer;
+  }
+
+  public static StatefulHashMapSerializer get(Type valueType, Class clazz) {
+    if (valueType != Type.FIXED) {
+      return null;
+    }
+    StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(clazz);
+    if (serializer == null) {
+      serializer = new StatefulHashMapSerializer(clazz);
+      fixedClassToSerializerMap.put(clazz, serializer);
+    }
+    return serializer;
+  }
+
+  public static StatefulHashMapSerializer get(Schema valueSchema) {
+    Type type = valueSchema.getType();
+    if (type == Type.FIXED) {
+      return get(Type.FIXED, TypeUtils.getClass(valueSchema));
+    } else {
+      return get(type);
+    }
+  }
+
+  private Schema valueSchema = null;
+  private Type valueType = null;
+  private int size = -1;
+  private Class<T> clazz = null;
+  private Serializer<T> valueSerializer = null;
+
+  public StatefulHashMapSerializer(Serializer<T> valueSerializer) {
+    this.valueSerializer = valueSerializer;
+  }
+
+  public StatefulHashMapSerializer(Schema valueSchema) {
+    this.valueSchema = valueSchema;
+    valueType = valueSchema.getType();
+    size = TypeUtils.getFixedSize(valueSchema);
+    valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema);
+  }
+
+  public StatefulHashMapSerializer(Type valueType) {
+    this.valueType = valueType;
+    if (valueType != Type.FIXED) {
+      valueSchema = Schema.create(valueType);
+    }
+    clazz = TypeUtils.getClass(valueType);
+    size = TypeUtils.getFixedSize(valueType);
+    valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+  }
+
+  public StatefulHashMapSerializer(Class<T> clazz) {
+    this.clazz = clazz;
+    valueType = TypeUtils.getType(clazz);
+    size = TypeUtils.getFixedSize(clazz);
+    if (valueType == null || valueType == Type.FIXED) {
+      valueType = Type.FIXED;
+      valueSchema = TypeUtils.getSchema(clazz);
+      valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz);
+    } else {
+      valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+    }
+  }
+
+  @Override
+  public ByteBuffer toByteBuffer(StatefulHashMap<Utf8, T> map) {
+    if (map == null) {
+      return null;
+    }
+    if (size > 0) {
+      return toByteBufferWithFixedLengthElements(map);
+    } else {
+      return toByteBufferWithVariableLengthElements(map);
+    }
+  }
+
+  private ByteBuffer toByteBufferWithFixedLengthElements(StatefulHashMap<Utf8, T> map) {
+    int n = (int) map.size();
+    List<byte[]> list = new ArrayList<byte[]>(n);
+    n *= 4;
+    for (Utf8 key : map.keySet()) {
+      T value = map.get(key);
+      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
+      list.add(bytes);
+      n += bytes.length;
+      bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+      list.add(bytes);
+      n += bytes.length;
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+    int i = 0;
+    for (byte[] bytes : list) {
+      if (i % 2 == 0) {
+        byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+      }
+      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+      i += 1;
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  private ByteBuffer toByteBufferWithVariableLengthElements(StatefulHashMap<Utf8, T> map) {
+    int n = (int) map.size();
+    List<byte[]> list = new ArrayList<byte[]>(n);
+    n *= 8;
+    for (Utf8 key : map.keySet()) {
+      T value = map.get(key);
+      byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
+      list.add(bytes);
+      n += bytes.length;
+      bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+      list.add(bytes);
+      n += bytes.length;
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+    for (byte[] bytes : list) {
+      byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+      byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+    }
+    byteBuffer.rewind();
+    return byteBuffer;
+  }
+
+  @Override
+  public StatefulHashMap<Utf8, T> fromByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null) {
+      return null;
+    }
+    StatefulHashMap<Utf8, T> map = new StatefulHashMap<Utf8, T>();
+int i = 0;
+    while (true) {
+      Utf8 key = null;
+      T value = null;
+      try {
+        int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+        byte[] bytes = new byte[n];
+        byteBuffer.get(bytes, 0, n);
+        key = Utf8Serializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+
+        if (size > 0) {
+          value = valueSerializer.fromByteBuffer(byteBuffer);
+        }
+        else {
+          n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+          bytes = new byte[n];
+          byteBuffer.get(bytes, 0, n);
+          value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+        }
+      } catch (BufferUnderflowException e) {
+        break;
+      }
+      if (key == null) {
+        break;
+      }
+      if (value == null) {
+        break;
+      }
+      map.put(key, value);
+    }
+    return map;
+  }
+
+  @Override
+  public ComparatorType getComparatorType() {
+    return valueSerializer.getComparatorType();
+  }
+
+}

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Wed Jul 18 15:28:06 2012
@@ -56,6 +56,7 @@ import org.apache.gora.cassandra.seriali
 import org.apache.gora.cassandra.serializers.TypeUtils;
 import org.apache.gora.mapreduce.GoraRecordReader;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.query.Query;
 import org.apache.gora.util.ByteUtils;
 import org.slf4j.Logger;
@@ -107,7 +108,7 @@ public class CassandraClient<K, T extend
       List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();      
       keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);      
       this.cluster.addKeyspace(keyspaceDefinition, true);
-      LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
+      // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
       
       // Create a customized Consistency Level
       ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
@@ -200,6 +201,10 @@ public class CassandraClient<K, T extend
           if (((GenericArray)itemValue).size() == 0) {
             continue;
           }
+        } else if (itemValue instanceof StatefulHashMap<?,?>) {
+          if (((StatefulHashMap)itemValue).size() == 0) {
+            continue;
+          }
         }
 
         addSubColumn(key, fieldName, i++, itemValue);
@@ -210,6 +215,32 @@ public class CassandraClient<K, T extend
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) {
+    if (isSuper( cassandraMapping.getFamily(fieldName) )) {
+      int i= 0;
+      for (Utf8 mapKey: map.keySet()) {
+
+        // TODO: hack, do not store empty arrays
+        Object mapValue = map.get(mapKey);
+        if (mapValue instanceof GenericArray<?>) {
+          if (((GenericArray)mapValue).size() == 0) {
+            continue;
+          }
+        } else if (mapValue instanceof StatefulHashMap<?,?>) {
+          if (((StatefulHashMap)mapValue).size() == 0) {
+            continue;
+          }
+        }
+
+        addSubColumn(key, fieldName, mapKey.toString(), mapValue);
+      }
+    }
+    else {
+      addColumn(key, fieldName, map);
+    }
+  }
+
   /**
    * Serialize value to ByteBuffer.
    * @param value the member value

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Wed Jul 18 15:28:06 2012
@@ -148,7 +148,7 @@ public class CassandraMapping {
       if (superAttribute != null) {
     	// LOG.info("Located super column family");
         this.superFamilies.add(familyName);
-        LOG.info("Added super column family: '" + familyName + "'");
+        // LOG.info("Added super column family: '" + familyName + "'");
         cfDef.setColumnType(ColumnType.SUPER);
         cfDef.setSubComparatorType(ComparatorType.BYTESTYPE);
       }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java Wed Jul 18 15:28:06 2012
@@ -104,13 +104,13 @@ public class CassandraMappingManager {
       LOG.warn("Error locating Cassandra Keyspace element!");
     }
     else {
-      LOG.info("Located Cassandra Keyspace: '" + KEYSPACE_ELEMENT + "'");
+      // LOG.info("Located Cassandra Keyspace: '" + KEYSPACE_ELEMENT + "'");
       for (Element keyspace : keyspaces) {
         String keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
         if (keyspaceName == null) {
     	    LOG.warn("Error locating Cassandra Keyspace name attribute!");
         }
-    	LOG.info("Located Cassandra Keyspace name: '" + NAME_ATTRIBUTE + "'");
+    	// LOG.info("Located Cassandra Keyspace name: '" + NAME_ATTRIBUTE + "'");
         keyspaceMap.put(keyspaceName, keyspace);
       }
     }
@@ -121,14 +121,14 @@ public class CassandraMappingManager {
       LOG.warn("Error locating Cassandra Mapping element!");
     }
     else {
-      LOG.info("Located Cassandra Mapping: '" + MAPPING_ELEMENT + "'");
+      // LOG.info("Located Cassandra Mapping: '" + MAPPING_ELEMENT + "'");
       for (Element mapping : mappings) {
         String className = mapping.getAttributeValue(NAME_ATTRIBUTE);
         if (className == null) {
     	    LOG.warn("Error locating Cassandra Mapping class name attribute!");
     	    continue;
         }
-    	LOG.info("Located Cassandra Mapping class name: '" + NAME_ATTRIBUTE + "'");
+    	// LOG.info("Located Cassandra Mapping class name: '" + NAME_ATTRIBUTE + "'");
         mappingMap.put(className, mapping);
       }
     }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1362980&r1=1362979&r2=1362980&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Jul 18 15:28:06 2012
@@ -278,8 +278,9 @@ public class CassandraStore<K, T extends
     T p = (T) value.newInstance(new StateManagerImpl());
     Schema schema = value.getSchema();
     for (Field field: schema.getFields()) {
-      if (value.isDirty(field.pos())) {
-        Object fieldValue = value.get(field.pos());
+      int fieldPos = field.pos();
+      if (value.isDirty(fieldPos)) {
+        Object fieldValue = value.get(fieldPos);
         
         // check if field has a nested structure (array, map, or record)
         Schema fieldSchema = field.schema();
@@ -309,7 +310,7 @@ public class CassandraStore<K, T extends
             break;
         }
         
-        p.put(field.pos(), fieldValue);
+        p.put(fieldPos, fieldValue);
       }
     }
     
@@ -349,6 +350,10 @@ public class CassandraStore<K, T extends
                 if (((GenericArray)memberValue).size() == 0) {
                   continue;
                 }
+              } else if (memberValue instanceof StatefulHashMap<?,?>) {
+                if (((StatefulHashMap)memberValue).size() == 0) {
+                  continue;
+                }
               }
 
               this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
@@ -362,20 +367,7 @@ public class CassandraStore<K, T extends
       case MAP:
         if (value != null) {
           if (value instanceof StatefulHashMap<?, ?>) {
-            //TODO cast to stateful map and only write dirty keys
-            Map<Utf8, Object> map = (Map<Utf8, Object>) value;
-            for (Utf8 mapKey: map.keySet()) {
-              
-              // TODO: hack, do not store empty arrays
-              Object keyValue = map.get(mapKey);
-              if (keyValue instanceof GenericArray<?>) {
-                if (((GenericArray)keyValue).size() == 0) {
-                  continue;
-                }
-              }
-
-              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
-            }
+            this.cassandraClient.addStatefulHashMap(key, field.name(), (StatefulHashMap<Utf8,Object>)value);
           } else {
             LOG.info("Map not supported: " + value.toString());
           }