You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/03/06 00:06:42 UTC

svn commit: r1574715 - in /gora/branches/GORA_94: gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-cassa...

Author: lewismc
Date: Wed Mar  5 23:06:41 2014
New Revision: 1574715

URL: http://svn.apache.org/r1574715
Log:
GORA-245-v6.1

Added:
    gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
Modified:
    gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
    gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties
    gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
    gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
    gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java

Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Wed Mar  5 23:06:41 2014
@@ -30,6 +30,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,19 +49,6 @@ public abstract class CassandraColumn {
   private Field field;
   private int unionType;
   
-  public static final ThreadLocal<BinaryDecoder> decoders =
-      new ThreadLocal<BinaryDecoder>();
-
-  /*
-   * Create a threadlocal map for the datum readers and writers, because
-   * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
-   * When they are thread safe, it is possible to maintain a single reader and
-   * writer pair for every schema, instead of one for every thread.
-   */
-  
-  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = 
-      new ConcurrentHashMap<String, SpecificDatumReader<?>>();
-  
   public void setUnionType(int pUnionType){
     this.unionType = pUnionType;
   }
@@ -92,7 +80,6 @@ public abstract class CassandraColumn {
   public abstract ByteBuffer getName();
   public abstract Object getValue();
   
-  @SuppressWarnings({ "rawtypes" })
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
     Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
@@ -101,34 +88,14 @@ public abstract class CassandraColumn {
           + "could be found. Please report this to dev@gora.apache.org");
     } else {
       value = serializer.fromByteBuffer(byteBuffer);
-      if (schema.getType().equals(Type.RECORD)){
-        String schemaId = schema.getFullName();      
-        
-        SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
-        if (reader == null) {
-          reader = new SpecificDatumReader(schema);// ignore dirty bits
-          SpecificDatumReader localReader=null;
-          if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
-            reader = localReader;
-          }
-        }
-        
-        // initialize a decoder, possibly reusing previous one
-        BinaryDecoder decoderFromCache = decoders.get();
-        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder((byte[])value, null);
-        // put in threadlocal cache if the initial get was empty
-        if (decoderFromCache==null) {
-          decoders.set(decoder);
-        }
+      if (schema.getType().equals(Type.RECORD) || schema.getType().equals(Type.MAP) ){
         try {
-          value = reader.read(null, decoder);
+          value = AvroSerializerUtil.deserializer(value, schema);
         } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }        
+          LOG.warn(field.name() + " named field could not be deserialized.");
+        }
       }
     }
     return value;
   }
-
 }

Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Wed Mar  5 23:06:41 2014
@@ -52,12 +52,12 @@ public class CassandraSubColumn extends 
       List<?> genericArray = serializer.fromByteBuffer(byteBuffer);
       value = genericArray;
     } else if (type.equals(Type.MAP)) {
-      MapSerializer<?> serializer = MapSerializer.get(fieldSchema.getValueType());
-      Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
-      value = map;
+//      MapSerializer<?> serializer = MapSerializer.get(fieldSchema.getValueType());
+//      Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
+//      value = map;
+      value = fromByteBuffer(fieldSchema, byteBuffer);
     } else if (type.equals(Type.RECORD)){
       value = fromByteBuffer(fieldSchema, byteBuffer);
-      //TODO: Avro dan geri getirmek lazim.
     } else if (type.equals(Type.UNION)){
       // the selected union schema is obtained
       Schema unionFieldSchema = getUnionSchema(super.getUnionType(), fieldSchema);

Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Wed Mar  5 23:06:41 2014
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -31,6 +32,7 @@ import me.prettyprint.hector.api.beans.H
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.serializers.CharSequenceSerializer;
 import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.impl.PersistentBase;
@@ -63,11 +65,25 @@ public class CassandraSuperColumn extend
         break;
       case MAP:
         Map<CharSequence, Object> map = new HashMap<CharSequence, Object>();
-        
+
         for (HColumn<ByteBuffer, ByteBuffer> hColumn : this.hSuperColumn.getColumns()) {
-          Object memberValue = null;
-          memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());
-          map.put(CharSequenceSerializer.get().fromByteBuffer(hColumn.getName()), memberValue);      
+          CharSequence mapKey = CharSequenceSerializer.get().fromByteBuffer(hColumn.getName());
+          if (mapKey.toString().indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+            Object memberValue = null;
+            // We need detect real type for UNION Fields
+            if (fieldSchema.getValueType().getType().equals(Type.UNION)){
+              
+              HColumn<ByteBuffer, ByteBuffer> cc = getUnionTypeColumn(mapKey
+                  + CassandraStore.UNION_COL_SUFIX, this.hSuperColumn.getColumns());
+              Integer unionIndex = getUnionIndex(mapKey.toString(), cc);
+              Schema realSchema = fieldSchema.getValueType().getTypes().get(unionIndex);
+              memberValue = fromByteBuffer(realSchema, hColumn.getValue());
+              
+            }else{
+              memberValue = fromByteBuffer(fieldSchema.getValueType(), hColumn.getValue());            
+            }            
+            map.put(mapKey, memberValue);      
+          }
         }
         value = map;
         
@@ -116,17 +132,8 @@ public class CassandraSuperColumn extend
             if (memberType.equals(Type.UNION)){
               HColumn<ByteBuffer, ByteBuffer> hc = getUnionTypeColumn(memberField.name()
                   + CassandraStore.UNION_COL_SUFIX, this.hSuperColumn.getColumns().toArray());
-              Field unionField = new Field(memberField.name()
-                  + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
-                  null, null);
-              
-              CassandraSubColumn unionColumn = new CassandraSubColumn();
-              
-              // get value of UNION stored type
-              unionColumn.setField(unionField);
-              unionColumn.setValue(hc);
-              Object val = unionColumn.getValue();
-              cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+              Integer unionIndex = getUnionIndex(memberField.name(),hc);
+              cassandraColumn.setUnionType(unionIndex);
             }
             
             record.put(record.getSchema().getField(memberName).pos(), cassandraColumn.getValue());
@@ -152,6 +159,16 @@ public class CassandraSuperColumn extend
     return value;
   }
 
+ private Integer getUnionIndex(String fieldName, HColumn<ByteBuffer, ByteBuffer> uc){
+   Integer val = IntegerSerializer.get().fromByteBuffer(uc.getValue());
+   return Integer.parseInt(val.toString());
+ }
+ 
+  private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName,
+    List<HColumn<ByteBuffer, ByteBuffer>> columns) {
+    return getUnionTypeColumn(fieldName, columns.toArray());
+}
+
   private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName, Object[] hColumns) {
     for (int iCnt = 0; iCnt < hColumns.length; iCnt++){
       @SuppressWarnings("unchecked")

Added: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java?rev=1574715&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java (added)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializerUtil.java Wed Mar  5 23:06:41 2014
@@ -0,0 +1,94 @@
+package org.apache.gora.cassandra.serializers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+public class AvroSerializerUtil {
+
+  /**
+   * Threadlocals maintaining reusable binary decoders and encoders.
+   */
+  private static ThreadLocal<ByteArrayOutputStream> outputStream =
+      new ThreadLocal<ByteArrayOutputStream>();
+  
+  public static final ThreadLocal<BinaryEncoder> encoders =
+      new ThreadLocal<BinaryEncoder>();
+
+  public static final ThreadLocal<BinaryDecoder> decoders =
+      new ThreadLocal<BinaryDecoder>();
+  
+  /**
+   * Create a {@link java.util.concurrent.ConcurrentHashMap} for the 
+   * datum readers and writers. 
+   * This is necessary because they are not thread safe, at least not before 
+   * Avro 1.4.0 (See AVRO-650).
+   * When they are thread safe, it is possible to maintain a single reader and
+   * writer pair for every schema, instead of one for every thread.
+   * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a>
+   */
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = 
+      new ConcurrentHashMap<String, SpecificDatumWriter<?>>();  
+  
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = 
+      new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static <T> byte[] serializer(T value, Schema schema) throws IOException{
+    SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
+    if (writer == null) {
+      writer = new SpecificDatumWriter(schema);// ignore dirty bits
+      writerMap.put(schema.getFullName(),writer);
+    }
+    
+    BinaryEncoder encoderFromCache = encoders.get();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    outputStream.set(bos);
+    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
+    if (encoderFromCache == null) {
+      encoders.set(encoder);
+    }
+    
+    //reset the buffers
+    ByteArrayOutputStream os = outputStream.get();
+    os.reset();
+    
+    writer.write(value, encoder);
+    encoder.flush();
+    byte[] byteValue = os.toByteArray();
+    return byteValue;
+  }
+  
+  public static Object deserializer(Object value, Schema schema) throws IOException{
+    String schemaId = schema.getFullName();      
+    
+    SpecificDatumReader<?> reader = (SpecificDatumReader<?>)readerMap.get(schemaId);
+    if (reader == null) {
+      reader = new SpecificDatumReader(schema);// ignore dirty bits
+      SpecificDatumReader localReader=null;
+      if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+        reader = localReader;
+      }
+    }
+    
+    // initialize a decoder, possibly reusing previous one
+    BinaryDecoder decoderFromCache = decoders.get();
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder((byte[])value, null);
+    // put in threadlocal cache if the initial get was empty
+    if (decoderFromCache==null) {
+      decoders.set(decoder);
+    }
+
+    Object result = reader.read(null, decoder);
+    return result;
+
+  }
+}

Modified: gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Wed Mar  5 23:06:41 2014
@@ -18,10 +18,10 @@
 
 package org.apache.gora.cassandra.store;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -44,7 +44,6 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData.Array;
 import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
@@ -63,6 +62,7 @@ import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.PartitionQueryImpl;
 import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +83,7 @@ public class CassandraStore<K, T extends
    * Fixed string with value "UnionIndex" used to generate an extra column based on 
    * the original field's name
    */
-  public static String UNION_COL_SUFIX = "UnionIndex";
+  public static String UNION_COL_SUFIX = "_UnionIndex";
 
   /**
    * Default schema index with value "0" used when AVRO Union data types are stored
@@ -99,12 +99,6 @@ public class CassandraStore<K, T extends
    */
   private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>());
 
-  /**
-   * Threadlocals maintaining reusable binary decoders and encoders.
-   */
-  private static ThreadLocal<ByteArrayOutputStream> outputStream =
-      new ThreadLocal<ByteArrayOutputStream>();
-  
   public static final ThreadLocal<BinaryEncoder> encoders =
       new ThreadLocal<BinaryEncoder>();
   
@@ -317,7 +311,9 @@ public class CassandraStore<K, T extends
     query.setDataStore(this);
     query.setKeyRange(key, key);
     
-    
+    if (fields == null){
+      fields = this.getFields();
+    }
     // Generating UnionFields
     ArrayList<String> unionFields = new ArrayList<String>();
     for (String field: fields){
@@ -332,13 +328,13 @@ public class CassandraStore<K, T extends
     String[] both = (String[]) ArrayUtils.addAll(fields, arr);
     
     query.setFields(both);
+
     query.setLimit(1);
     Result<K,T> result = execute(query);
     boolean hasResult = false;
     try {
       hasResult = result.next();
     } catch (Exception e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
     return hasResult ? result.get() : null;
@@ -347,9 +343,7 @@ public class CassandraStore<K, T extends
   @Override
   public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
       throws IOException {
-    // TODO right now this just obtains a single partition
-    // we need to obtain the correct splits for partitions in 
-    // order to achieve data locality.
+    // TODO GORA-298 Implement CassandraStore#getPartitions
     List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
     PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<K, T>(query);
     pqi.setConf(getConf());
@@ -483,11 +477,11 @@ public class CassandraStore<K, T extends
    * Add a field to Cassandra according to its type.
    * @param key     the key of the row where the field should be added
    * @param field   the Avro field representing a datum
+   * @param schema  the schema belonging to the particular Avro field
    * @param value   the field value
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void addOrUpdateField(K key, Field field, Schema schema, Object value) {
-    //Schema schema = field.schema();
     Type type = schema.getType();
     // checking if the value to be updated is used for saving union schema
     if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
@@ -505,75 +499,13 @@ public class CassandraStore<K, T extends
       case RECORD:
         if (value != null) {
           if (value instanceof PersistentBase) {
-            PersistentBase persistentBase = (PersistentBase) value;
-            
-            SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap.get(schema.getFullName());
-            if (writer == null) {
-              writer = new SpecificDatumWriter(schema);// ignore dirty bits
-              writerMap.put(schema.getFullName(),writer);
-            }
-            
-            BinaryEncoder encoderFromCache = encoders.get();
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            outputStream.set(bos);
-            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(bos, null);
-            if (encoderFromCache == null) {
-              encoders.set(encoder);
-            }
-            
-            //reset the buffers
-            ByteArrayOutputStream os = outputStream.get();
-            os.reset();
-            
+            PersistentBase persistentBase = (PersistentBase) value;            
             try {
-              writer.write(persistentBase, encoder);
-              encoder.flush();
+              byte[] byteValue = AvroSerializerUtil.serializer(persistentBase, schema);
+              this.cassandraClient.addColumn(key, field.name(), byteValue);
             } catch (IOException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
+              LOG.warn(field.name() + " named record could not be serialized.");
             }
-            byte[] byteValue = os.toByteArray();
-          
-            //String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
-//            if (this.cassandraClient.isSuper( familyName )){
-//              this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);            
-//            }else{
-//              
-//              
-//            }            
-            this.cassandraClient.addColumn(key, field.name(), byteValue);
-            
-//            for (Field member: schema.getFields()) {
-//              if (member.pos() == 0) {
-//                continue;
-//              }
-//              // TODO: hack, do not store empty arrays
-//              Object memberValue = persistentBase.get(member.pos());
-//              if (memberValue instanceof List<?>) {
-//                if (((List<?>)memberValue).size() == 0) {
-//                  continue;
-//                }
-//              } else if (memberValue instanceof Map<?,?>) {
-//                if (((Map<?, ?>)memberValue).size() == 0) {
-//                  continue;
-//                }
-//              }
-//              if (memberValue == null){
-//                continue;
-//              }
-//              
-//              // Get type for Union Fields
-//              Schema memberSchema = member.schema();
-//              Type fieldType = memberSchema.getType();
-//              if (fieldType.equals(Type.UNION)){
-//                int schemaPos = getUnionSchema(memberValue, memberSchema);
-//                this.cassandraClient.addSubColumn(key, field.name(), 
-//                    member.name()+UNION_COL_SUFIX, schemaPos);
-//              }
-//              
-//              this.cassandraClient.addSubColumn(key, field.name(), 
-//                  member.name(), memberValue);
-//            }
           } else {
             LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name());
           }
@@ -581,8 +513,34 @@ public class CassandraStore<K, T extends
         break;
       case MAP:
         if (value != null) {
-          if (value instanceof Map<?, ?>) {
-            this.cassandraClient.addStatefulHashMap(key, field.name(), (Map<CharSequence,Object>)value);
+          if (value instanceof Map<?, ?>) {            
+            Map<CharSequence,Object> map = (Map<CharSequence,Object>)value;
+            Schema valueSchema = schema.getValueType();
+            Type valueType = valueSchema.getType();
+            if (Type.UNION.equals(valueType)){
+              Map<CharSequence,Object> valueMap = new HashMap<CharSequence, Object>();
+              for (CharSequence mapKey: map.keySet()) {
+                Object mapValue = map.get(mapKey);
+                int valueUnionIndex = getUnionSchema(mapValue, valueSchema);
+                valueMap.put((mapKey+UNION_COL_SUFIX), valueUnionIndex);
+                valueMap.put(mapKey, mapValue);
+              }
+              map = valueMap;
+            }
+            
+            String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
+            
+            // If map is not super column. We using Avro serializer. 
+            if (!this.cassandraClient.isSuper( familyName )){
+              try {
+                byte[] byteValue = AvroSerializerUtil.serializer(map, schema);
+                this.cassandraClient.addColumn(key, field.name(), byteValue);
+              } catch (IOException e) {
+                LOG.warn(field.name() + " named map could not be serialized.");
+              }
+            }else{
+              this.cassandraClient.addStatefulHashMap(key, field.name(), map);              
+            }
           } else {
             LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name());
           }
@@ -611,15 +569,15 @@ public class CassandraStore<K, T extends
           String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name());
           this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
           if (this.cassandraClient.isSuper( familyName )){
-            this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);            
+            this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos);
           }else{
             this.cassandraClient.addColumn(key, columnName, schemaPos);
             
           }
 //          this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName);
           // adding union value
-          Schema unioSchema = schema.getTypes().get(schemaPos);
-          addOrUpdateField(key, field, unioSchema, value);
+          Schema unionSchema = schema.getTypes().get(schemaPos);
+          addOrUpdateField(key, field, unionSchema, value);
           //this.cassandraClient.addColumn(key, field.name(), value);
         } else {
           LOG.warn("Union with 'null' value not supported for field: " + field.name());
@@ -661,11 +619,11 @@ public class CassandraStore<K, T extends
       else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
         return unionSchemaPos;
       else if (pValue instanceof Map && schemaType.equals(Type.MAP))
-        return unionSchemaPos;  
+        return unionSchemaPos;
       else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
-        return unionSchemaPos;        
+        return unionSchemaPos;
       else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
-        return unionSchemaPos;          
+        return unionSchemaPos;
       unionSchemaPos ++;
     }
     // if we weren't able to determine which data type it is, then we return the default

Modified: gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (original)
+++ gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Wed Mar  5 23:06:41 2014
@@ -47,6 +47,7 @@
     <field name="content" family="p" qualifier="p:cnt:c"/>
     <field name="parsedContent" family="sc" qualifier="p:parsedContent"/>
     <field name="outlinks" family="sc" qualifier="p:outlinks"/>
+    <field name="headers" family="sc" qualifier="p:headers"/>
     <field name="metadata" family="p" qualifier="c:mt"/>
   </class>
 

Modified: gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties (original)
+++ gora/branches/GORA_94/gora-cassandra/src/test/conf/log4j-server.properties Wed Mar  5 23:06:41 2014
@@ -42,3 +42,6 @@ log4j.appender.R.File=/var/log/cassandra
 # Adding this to avoid thrift logging disconnect errors.
 log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR
 
+# Add for gora-cassandra specific logging duing tests
+log4j.logger.org.apache.gora.cassandra=DEBUG
+

Modified: gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java (original)
+++ gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java Wed Mar  5 23:06:41 2014
@@ -68,24 +68,27 @@ public class TestCassandraStore extends 
   }
 
 
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetWebPageDefaultFields() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQuery() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryStartKey() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryEndKey() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryKeyRange() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+  @Override
+  public void testQueryWebPageSingleKey() throws IOException {}
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
   @Override
   public void testQueryWebPageSingleKeyDefaultFields() throws IOException {}
+  @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));")
+  @Override
+  public void testQueryWebPageQueryEmptyResults() throws IOException {}
   @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0")
   @Override
   public void testDelete() throws IOException {}
@@ -98,17 +101,4 @@ public class TestCassandraStore extends 
   @Ignore("GORA-298 Implement CassandraStore#getPartitions")
   @Override
   public void testGetPartitions() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetRecursive() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetDoubleRecursive() throws IOException{}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGetNested() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
-  @Override
-  public void testGet3UnionField() throws IOException {}
-
 }

Modified: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java (original)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java Wed Mar  5 23:06:41 2014
@@ -39,15 +39,15 @@ public class GoraTestDriver {
 
   protected static final Logger log = LoggerFactory.getLogger(GoraTestDriver.class);
 
-  protected Class<? extends DataStore> dataStoreClass;
+  protected Class<? extends DataStore<?, ?>> dataStoreClass;
   protected Configuration conf = new Configuration();
 
   @SuppressWarnings("rawtypes")
   protected HashSet<DataStore> dataStores;
 
-  @SuppressWarnings("rawtypes")
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   protected GoraTestDriver(Class<? extends DataStore> dataStoreClass) {
-    this.dataStoreClass = dataStoreClass;
+    this.dataStoreClass = (Class<? extends DataStore<?, ?>>) dataStoreClass;
     this.dataStores = new HashSet<DataStore>();
   }
 
@@ -70,7 +70,7 @@ public class GoraTestDriver {
    */
   public void setUp() throws Exception {
     log.info("setting up test");
-    for(DataStore store : dataStores) {
+    for(DataStore<?, ?> store : dataStores) {
       store.truncateSchema();
     }
   }

Modified: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java (original)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java Wed Mar  5 23:06:41 2014
@@ -587,7 +587,8 @@ public class DataStoreTestUtil {
     for (int i = 0; i < urls.length; i++) {
       WebPage webPage = WebPage.newBuilder().build();
       webPage.setUrl(new Utf8(urls[i]));
-      //test put for nullable map field
+      //test put for nullable map field 
+      // we put data to the 'headers' field which is a Map with default value of 'null'
       webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
       for (int j = 0; j < headers.length; j += 2) {
         webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
@@ -670,7 +671,6 @@ public class DataStoreTestUtil {
 
     String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
         "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
-    String anchor = "anchor";
     String header = "header";
     String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
         "fourthHeader", "fifthHeader", "sixthHeader" };

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java Wed Mar  5 23:06:41 2014
@@ -21,7 +21,6 @@ package org.apache.gora.hbase.query;
 import java.io.IOException;
 
 import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.hadoop.hbase.client.Get;

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java Wed Mar  5 23:06:41 2014
@@ -23,7 +23,6 @@ import static org.apache.gora.hbase.util
 import java.io.IOException;
 
 import org.apache.gora.hbase.store.HBaseStore;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1574715&r1=1574714&r2=1574715&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Wed Mar  5 23:06:41 2014
@@ -36,7 +36,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.util.Utf8;
-
 import org.apache.gora.hbase.query.HBaseGetResult;
 import org.apache.gora.hbase.query.HBaseQuery;
 import org.apache.gora.hbase.query.HBaseScannerResult;
@@ -51,7 +50,6 @@ import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.PartitionQueryImpl;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.impl.DataStoreBase;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -66,11 +64,9 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-
 import org.jdom.Document;
 import org.jdom.Element;
 import org.jdom.input.SAXBuilder;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -235,7 +231,6 @@ implements Configurable {
    * @param persistent
    *          Record to be persisted in HBase
    */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void put(K key, T persistent) {
     try {
@@ -297,8 +292,9 @@ implements Configurable {
       }
       break;
     case MAP:
+      @SuppressWarnings({ "rawtypes", "unchecked" })
       Set<Entry> set = ((Map) o).entrySet();
-      for (Entry entry : set) {
+      for (@SuppressWarnings("rawtypes") Entry entry : set) {
         byte[] qual = toBytes(entry.getKey());
         addPutsAndDeletes(put, delete, entry.getValue(), schema.getValueType()
             .getType(), schema.getValueType(), hcol, qual);
@@ -597,7 +593,6 @@ implements Configurable {
     }
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   /**
    * Creates a new Persistent instance with the values in 'result' for the fields listed.
    * @param result result form a HTable#get()
@@ -649,7 +644,7 @@ implements Configurable {
         return;
       }
       Schema valueSchema = fieldSchema.getValueType();
-      Map map = new HashMap();
+      Map<Utf8, Object> map = new HashMap<Utf8, Object>();
       for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
         map.put(new Utf8(Bytes.toString(e.getKey())),
             fromBytes(valueSchema, e.getValue()));
@@ -662,8 +657,8 @@ implements Configurable {
         return;
       }
       valueSchema = fieldSchema.getElementType();
-      ArrayList arrayList = new ArrayList();
-      DirtyListWrapper dirtyListWrapper = new DirtyListWrapper(arrayList);
+      ArrayList<Object> arrayList = new ArrayList<Object>();
+      DirtyListWrapper<Object> dirtyListWrapper = new DirtyListWrapper<Object>(arrayList);
       for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
         dirtyListWrapper.add(fromBytes(valueSchema, e.getValue()));
       }