You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2012/06/01 15:24:52 UTC

svn commit: r1345164 - in /gora/trunk: ./ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/

Author: lewismc
Date: Fri Jun  1 13:24:51 2012
New Revision: 1345164

URL: http://svn.apache.org/viewvc?rev=1345164&view=rev
Log:
commit to address GORA-138 & 81 and update to CHANGES.txt

Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jun  1 13:24:51 2012
@@ -6,13 +6,15 @@ Gora Change Log
 
 0.3 (trunk) Current Development:
 
+* GORA-138 gora-cassandra array type support: Double fix for GORA-81 Replace CassandraStore#addOrUpdateField with TypeInferringSerializer to take advantage of when the value is already of type ByteBuffer. (Kazuomi Kashii via lewismc)
+
 * GORA-139  Creates Cassandra column family with BytesType for column value validator (and comparators), instead of UTF8Type (Kazuomi Kashii via lewismc)
 
 * GORA-131 gora-cassandra should support other key types than String (Kazuomi Kashii via lewismc)
 
 * GORA-132 Uses ByteBufferSerializer for column value to support various data types rather than StringSerializer (Kazuomi Kashii via lewismc)
 
-* GORA-77 Replace commons logging with Log4j (Renato Javier Marroquín Mogrovejo via lewismc)
+* GORA-77 Replace commons logging with Slf4j (Renato Javier Marroquín Mogrovejo via lewismc)
 
 * GORA-134 ListGenericArray's hashCode causes StackOverflowError (Kazuomi Kashii via lewismc)
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jun  1 13:24:51 2012
@@ -18,13 +18,29 @@
 
 package org.apache.gora.cassandra.query;
 
-import org.apache.avro.Schema.Field;
+import java.nio.ByteBuffer;
 
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents a unit of data: a key value pair tagged by a family name
  */
 public abstract class CassandraColumn {
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraColumn.class);
+
   public static final int SUB = 0;
   public static final int SUPER = 1;
   
@@ -52,8 +68,37 @@ public abstract class CassandraColumn {
     return this.field;
   }
   
-  public abstract String getName();
+  public abstract ByteBuffer getName();
   public abstract Object getValue();
   
 
+  protected Object fromByteBuffer(Type type, ByteBuffer byteBuffer) {
+    Object value = null;
+    switch (type) {
+      case STRING:
+        value = new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
+        break;
+      case BYTES:
+        value = byteBuffer;
+        break;
+      case INT:
+        value = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+        break;
+      case LONG:
+        value = LongSerializer.get().fromByteBuffer(byteBuffer);
+        break;
+      case FLOAT:
+        value = FloatSerializer.get().fromByteBuffer(byteBuffer);
+        break;
+      case DOUBLE:
+        value = DoubleSerializer.get().fromByteBuffer(byteBuffer);
+        break;
+
+      default:
+        LOG.info("Type is not supported: " + type);
+
+    }
+    return value;
+  }
+
 }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jun  1 13:24:51 2012
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import me.prettyprint.cassandra.serializers.StringSerializer;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.gora.persistency.Persistent;
@@ -76,7 +78,7 @@ public class CassandraResult<K, T extend
       
       // get field name
       String family = cassandraColumn.getFamily();
-      String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
+      String fieldName = this.reverseMap.get(family + ":" + StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
       
       // get field
       int pos = this.persistent.getFieldIndex(fieldName);

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jun  1 13:24:51 2012
@@ -51,9 +51,9 @@ public class CassandraSubColumn extends 
   /**
    * Key-value pair containing the raw data.
    */
-  private HColumn<String, ByteBuffer> hColumn;
+  private HColumn<ByteBuffer, ByteBuffer> hColumn;
 
-  public String getName() {
+  public ByteBuffer getName() {
     return hColumn.getName();
   }
 
@@ -65,56 +65,35 @@ public class CassandraSubColumn extends 
     Field field = getField();
     Schema fieldSchema = field.schema();
     Type type = fieldSchema.getType();
-    ByteBuffer valueByteBuffer = hColumn.getValue();
+    ByteBuffer byteBuffer = hColumn.getValue();
     Object value = null;
-    
-    switch (type) {
-      case STRING:
-        value = new Utf8(StringSerializer.get().fromByteBuffer(valueByteBuffer));
-        break;
-      case BYTES:
-        value = valueByteBuffer;
-        break;
-      case INT:
-        value = IntegerSerializer.get().fromByteBuffer(valueByteBuffer);
-        break;
-      case LONG:
-        value = LongSerializer.get().fromByteBuffer(valueByteBuffer);
-        break;
-      case FLOAT:
-        value = FloatSerializer.get().fromByteBuffer(valueByteBuffer);
-        break;
-      case DOUBLE:
-        value = DoubleSerializer.get().fromByteBuffer(valueByteBuffer);
-        break;
-      case ARRAY:
-        // convert string to array
-        String valueString = StringSerializer.get().fromByteBuffer(valueByteBuffer);
-        valueString = valueString.substring(1, valueString.length()-1);
-        String[] elements = valueString.split(", ");
-        
-        Type elementType = fieldSchema.getElementType().getType();
-        if (elementType == Schema.Type.STRING) {
-          // the array type is String
-          GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
-          for (String element: elements) {
-            genericArray.add(element);
-          }
-          
-          value = genericArray;
-        } else {
-          LOG.info("Element type not supported: " + elementType);
+    if (type == Type.ARRAY) {
+      // convert string to array
+      String valueString = StringSerializer.get().fromByteBuffer(byteBuffer);
+      valueString = valueString.substring(1, valueString.length()-1);
+      String[] elements = valueString.split(", ");
+
+      Type elementType = fieldSchema.getElementType().getType();
+      if (elementType == Schema.Type.STRING) {
+        // the array type is String
+        GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
+        for (String element: elements) {
+          genericArray.add(element);
         }
-        break;
-      default:
-        LOG.info("Type not supported: " + type);
+
+        value = genericArray;
+      } else {
+        LOG.info("Element type not supported: " + elementType);
+      }
+    }
+    else {
+      value = fromByteBuffer(type, byteBuffer);
     }
-    
-    return value;
 
+    return value;
   }
 
-  public void setValue(HColumn<String, ByteBuffer> hColumn) {
+  public void setValue(HColumn<ByteBuffer, ByteBuffer> hColumn) {
     this.hColumn = hColumn;
   }
 }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jun  1 13:24:51 2012
@@ -21,10 +21,7 @@ package org.apache.gora.cassandra.query;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import me.prettyprint.cassandra.serializers.FloatSerializer;
-import me.prettyprint.cassandra.serializers.DoubleSerializer;
 import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -32,7 +29,9 @@ import me.prettyprint.hector.api.beans.H
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.ListGenericArray;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.slf4j.Logger;
@@ -41,10 +40,10 @@ import org.slf4j.LoggerFactory;
 public class CassandraSuperColumn extends CassandraColumn {
   public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
 
-  private HSuperColumn<String, String, ByteBuffer> hSuperColumn;
+  private HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn;
   
-  public String getName() {
-    return hSuperColumn.getName();
+  public ByteBuffer getName() {
+    return StringSerializer.get().toByteBuffer(hSuperColumn.getName());
   }
 
   public Object getValue() {
@@ -55,37 +54,27 @@ public class CassandraSuperColumn extend
     Object value = null;
     
     switch (type) {
+      case ARRAY:
+        Type elementType = fieldSchema.getElementType().getType();
+        GenericArray array = new ListGenericArray(Schema.create(elementType));
+        
+        for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+          ByteBuffer memberByteBuffer = hColumn.getValue();
+          Object memberValue = fromByteBuffer(elementType, hColumn.getValue());
+          // int i = IntegerSerializer().get().fromByteBuffer(hColumn.getName());
+          array.add(memberValue);      
+        }
+        value = array;
+        
+        break;
       case MAP:
         Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
         Type valueType = fieldSchema.getValueType().getType();
         
-        for (HColumn<String, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+        for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
           ByteBuffer memberByteBuffer = hColumn.getValue();
-          Object memberValue = null;
-          switch (valueType) {
-            case STRING:
-              memberValue = new Utf8(StringSerializer.get().fromByteBuffer(memberByteBuffer));
-              break;
-            case BYTES:
-              memberValue = memberByteBuffer;
-              break;
-            case INT:
-              memberValue = IntegerSerializer.get().fromByteBuffer(memberByteBuffer);
-              break;
-            case LONG:
-              memberValue = LongSerializer.get().fromByteBuffer(memberByteBuffer);
-              break;
-            case FLOAT:
-              memberValue = FloatSerializer.get().fromByteBuffer(memberByteBuffer);
-              break;
-            case DOUBLE:
-              memberValue = DoubleSerializer.get().fromByteBuffer(memberByteBuffer);
-              break;
-            default:
-              LOG.info("Type for the map value is not supported: " + valueType);
-                
-          }
-          map.put(new Utf8(hColumn.getName()), memberValue);      
+          Object memberValue = fromByteBuffer(valueType, hColumn.getValue());
+          map.put(new Utf8(StringSerializer.get().fromByteBuffer(hColumn.getName())), memberValue);      
         }
         value = map;
         
@@ -115,12 +104,13 @@ public class CassandraSuperColumn extend
         if (value instanceof PersistentBase) {
           PersistentBase record = (PersistentBase) value;
 
-          for (HColumn<String, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
-            Field memberField = fieldSchema.getField(hColumn.getName());
+          for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
+            String memberName = StringSerializer.get().fromByteBuffer(hColumn.getName());
+            Field memberField = fieldSchema.getField(memberName);
             CassandraSubColumn cassandraColumn = new CassandraSubColumn();
             cassandraColumn.setField(memberField);
             cassandraColumn.setValue(hColumn);
-            record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
+            record.put(record.getFieldIndex(memberName), cassandraColumn.getValue());
           }
         }
         break;
@@ -130,8 +120,8 @@ public class CassandraSuperColumn extend
     
     return value;
   }
-  
-  public void setValue(HSuperColumn<String, String, ByteBuffer> hSuperColumn) {
+
+  public void setValue(HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn) {
     this.hSuperColumn = hSuperColumn;
   }
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jun  1 13:24:51 2012
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
 import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
 import me.prettyprint.cassandra.serializers.DoubleSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
@@ -67,8 +68,6 @@ public class CassandraClient<K, T extend
   
   private CassandraMapping cassandraMapping = new CassandraMapping();
 
-  private StringSerializer stringSerializer = new StringSerializer();
-  private ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer();
   private Serializer<K> keySerializer;
   
   public void initialize(Class<K> keyClass) throws Exception {
@@ -118,7 +117,6 @@ public class CassandraClient<K, T extend
 
       keyspaceDefinition = null;
     }
-    
 
   }
   
@@ -141,52 +139,53 @@ public class CassandraClient<K, T extend
       return;
     }
 
-    ByteBuffer byteBuffer = null;
-    if (value instanceof ByteBuffer) {
-      byteBuffer = (ByteBuffer) value;
-    }
-    else if (value instanceof Utf8) {
-      byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
-    }
-    else if (value instanceof Float) {
-      // workaround for hector-core-1.0-1.jar
-      // because SerializerTypeInferer.getSerializer(Float ) returns ObjectSerializer !?
-      byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
-    }
-    else if (value instanceof Double) {
-      // workaround for hector-core-1.0-1.jar
-      // because SerializerTypeInferer.getSerializer(Double ) returns ObjectSerializer !?
-      byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
-    }
-    else {
-      byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
-    }
+    ByteBuffer byteBuffer = toByteBuffer(value);
     
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String columnName = this.cassandraMapping.getColumn(fieldName);
     
-    this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer, stringSerializer, byteBufferSerializer));
+    this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer, StringSerializer.get(), ByteBufferSerializer.get()));
   }
 
   /**
    * Insert a member in a super column. This is used for map and record Avro types.
    * @param key the row key
    * @param fieldName the field name
-   * @param memberName the member name
+   * @param columnName the column name (the member name, or the index of array)
    * @param value the member value
    */
   @SuppressWarnings("unchecked")
-public void addSubColumn(K key, String fieldName, String memberName, Object value) {
+  public void addSubColumn(K key, String fieldName, ByteBuffer columnName, Object value) {
     if (value == null) {
       return;
     }
     
+    ByteBuffer byteBuffer = toByteBuffer(value);
+    
+    String columnFamily = this.cassandraMapping.getFamily(fieldName);
+    String superColumnName = this.cassandraMapping.getColumn(fieldName);
+    
+    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(columnName, byteBuffer, ByteBufferSerializer.get(), ByteBufferSerializer.get())), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()));
+    
+  }
+
+  /**
+   * Serialize value to ByteBuffer.
+   * @param value the member value
+   * @return ByteBuffer object
+   */
+  @SuppressWarnings("unchecked")
+  public ByteBuffer toByteBuffer(Object value) {
+    if (value == null) {
+      return null;
+    }
+    
     ByteBuffer byteBuffer = null;
     if (value instanceof ByteBuffer) {
       byteBuffer = (ByteBuffer) value;
     }
     else if (value instanceof Utf8) {
-      byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
+      byteBuffer = StringSerializer.get().toByteBuffer(((Utf8)value).toString());
     }
     else if (value instanceof Float) {
       // workaround for hector-core-1.0-1.jar
@@ -202,22 +201,22 @@ public void addSubColumn(K key, String f
       byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
     }
     
-    String columnFamily = this.cassandraMapping.getFamily(fieldName);
-    String superColumnName = this.cassandraMapping.getColumn(fieldName);
-    
-    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(memberName, byteBuffer, stringSerializer, byteBufferSerializer)), this.stringSerializer, this.stringSerializer, this.byteBufferSerializer));
-    
+    return byteBuffer;
   }
-  
+
   /**
    * Select a family column in the keyspace.
    * @param cassandraQuery a wrapper of the query
    * @param family the family name to be queried
    * @return a list of family rows
    */
-  public List<Row<K, String, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
+  public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
     
     String[] columnNames = cassandraQuery.getColumns(family);
+    ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
+    for (int i = 0; i < columnNames.length; i++) {
+      columnNameByteBuffers[i] = StringSerializer.get().toByteBuffer(columnNames[i]);
+    }
     Query<K, T> query = cassandraQuery.getQuery();
     int limit = (int) query.getLimit();
     if (limit < 1) {
@@ -226,16 +225,15 @@ public void addSubColumn(K key, String f
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
     
-    RangeSlicesQuery<K, String, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, stringSerializer, byteBufferSerializer);
+    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSlicesQuery.setColumnFamily(family);
     rangeSlicesQuery.setKeys(startKey, endKey);
-    rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
+    rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
     rangeSlicesQuery.setRowCount(limit);
-    rangeSlicesQuery.setColumnNames(columnNames);
+    rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
     
-
-    QueryResult<OrderedRows<K, String, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
-    OrderedRows<K, String, ByteBuffer> orderedRows = queryResult.get();
+    QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
+    OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
     
     
     return orderedRows.getList();
@@ -290,7 +288,7 @@ public void addSubColumn(K key, String f
     return this.cassandraMapping.isSuper(family);
   }
 
-  public List<SuperRow<K, String, String, ByteBuffer>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
+  public List<SuperRow<K, String, ByteBuffer, ByteBuffer>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
     String[] columnNames = cassandraQuery.getColumns(family);
     Query<K, T> query = cassandraQuery.getQuery();
     int limit = (int) query.getLimit();
@@ -300,7 +298,7 @@ public void addSubColumn(K key, String f
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
     
-    RangeSuperSlicesQuery<K, String, String, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, this.stringSerializer, this.stringSerializer, this.byteBufferSerializer);
+    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSuperSlicesQuery.setColumnFamily(family);    
     rangeSuperSlicesQuery.setKeys(startKey, endKey);
     rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -308,8 +306,8 @@ public void addSubColumn(K key, String f
     rangeSuperSlicesQuery.setColumnNames(columnNames);
     
     
-    QueryResult<OrderedSuperRows<K, String, String, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
-    OrderedSuperRows<K, String, String, ByteBuffer> orderedRows = queryResult.get();
+    QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
+    OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
     return orderedRows.getList();
 
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1345164&r1=1345163&r2=1345164&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jun  1 13:24:51 2012
@@ -21,12 +21,15 @@ package org.apache.gora.cassandra.store;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -45,6 +48,7 @@ import org.apache.gora.cassandra.query.C
 import org.apache.gora.cassandra.query.CassandraRow;
 import org.apache.gora.cassandra.query.CassandraSubColumn;
 import org.apache.gora.cassandra.query.CassandraSuperColumn;
+import org.apache.gora.persistency.ListGenericArray;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.persistency.impl.PersistentBase;
@@ -131,29 +135,28 @@ public class CassandraStore<K, T extends
     
     // We query Cassandra keyspace by families.
     for (String family : familyMap.keySet()) {
+      if (family == null) {
+        continue;
+      }
       if (this.cassandraClient.isSuper(family)) {
         addSuperColumns(family, cassandraQuery, cassandraResultSet);
          
       } else {
         addSubColumns(family, cassandraQuery, cassandraResultSet);
-      
       }
-      
     }
     
     cassandraResult.setResultSet(cassandraResultSet);
     
-    
     return cassandraResult;
-
   }
 
   private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
       CassandraResultSet cassandraResultSet) {
     // select family columns that are included in the query
-    List<Row<K, String, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
+    List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
     
-    for (Row<K, String, ByteBuffer> row : rows) {
+    for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
       K key = row.getKey();
       
       // find associated row in the resultset
@@ -164,9 +167,9 @@ public class CassandraStore<K, T extends
         cassandraRow.setKey(key);
       }
       
-      ColumnSlice<String, ByteBuffer> columnSlice = row.getColumnSlice();
+      ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
       
-      for (HColumn<String, ByteBuffer> hColumn : columnSlice.getColumns()) {
+      for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) {
         CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
         cassandraSubColumn.setValue(hColumn);
         cassandraSubColumn.setFamily(family);
@@ -179,8 +182,8 @@ public class CassandraStore<K, T extends
   private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery, 
       CassandraResultSet cassandraResultSet) {
     
-    List<SuperRow<K, String, String, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
-    for (SuperRow<K, String, String, ByteBuffer> superRow: superRows) {
+    List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
+    for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
       K key = superRow.getKey();
       CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
@@ -189,8 +192,8 @@ public class CassandraStore<K, T extends
         cassandraRow.setKey(key);
       }
       
-      SuperSlice<String, String, ByteBuffer> superSlice = superRow.getSuperSlice();
-      for (HSuperColumn<String, String, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) {
+      SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = superRow.getSuperSlice();
+      for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) {
         CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
         cassandraSuperColumn.setValue(hSuperColumn);
         cassandraSuperColumn.setFamily(family);
@@ -277,7 +280,7 @@ public class CassandraStore<K, T extends
       if (value.isDirty(field.pos())) {
         Object fieldValue = value.get(field.pos());
         
-        // check if field has a nested structure (map or record)
+        // check if field has a nested structure (array, map, or record)
         Schema fieldSchema = field.schema();
         Type type = fieldSchema.getType();
         switch(type) {
@@ -294,6 +297,16 @@ public class CassandraStore<K, T extends
             StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
             fieldValue = newMap;
             break;
+          case ARRAY:
+            GenericArray array = (GenericArray) fieldValue;
+            Type elementType = fieldSchema.getElementType().getType();
+            GenericArray newArray = new ListGenericArray(Schema.create(elementType));
+            Iterator iter = array.iterator();
+            while (iter.hasNext()) {
+              newArray.add(iter.next());
+            }
+            fieldValue = newArray;
+            break;
         }
         
         p.put(field.pos(), fieldValue);
@@ -340,7 +353,7 @@ public class CassandraStore<K, T extends
               if (memberValue instanceof Utf8) {
                 memberValue = memberValue.toString();
               }
-              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+              this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(member.name()), memberValue);
             }
           } else {
             LOG.info("Record not supported: " + value.toString());
@@ -367,13 +380,29 @@ public class CassandraStore<K, T extends
               if (keyValue instanceof Utf8) {
                 keyValue = keyValue.toString();
               }
-              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);              
+              this.cassandraClient.addSubColumn(key, field.name(), StringSerializer.get().toByteBuffer(mapKey.toString()), keyValue);              
             }
           } else {
             LOG.info("Map not supported: " + value.toString());
           }
         }
         break;
+      case ARRAY:
+        if (value != null) {
+          if (value instanceof GenericArray<?>) {
+            GenericArray<Object> array = (GenericArray<Object>) value;
+            int i= 0;
+            for (Object itemValue: array) {
+              if (itemValue instanceof Utf8) {
+                itemValue = itemValue.toString();
+              }
+              this.cassandraClient.addSubColumn(key, field.name(), IntegerSerializer.get().toByteBuffer(i++), itemValue);              
+            }
+          } else {
+            LOG.info("Array not supported: " + value.toString());
+          }
+        }
+        break;
       default:
         LOG.info("Type not considered: " + type.name());      
     }