You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2017/08/23 20:55:29 UTC

[31/37] gora git commit: Fix code review comments

Fix code review comments


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/0de528ef
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/0de528ef
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/0de528ef

Branch: refs/heads/master
Commit: 0de528effcfa595ff9f9c5cb3d0c7eaf18392554
Parents: 2695207
Author: madhawa-gunasekara <ma...@wso2.com>
Authored: Sun Aug 13 15:52:43 2017 +0530
Committer: madhawa-gunasekara <ma...@wso2.com>
Committed: Sun Aug 13 19:28:18 2017 +0530

----------------------------------------------------------------------
 .../compiler/GoraCassandraNativeCompiler.java   |   2 +-
 .../gora/cassandra/query/CassandraQuery.java    |  27 +++++
 .../cassandra/query/CassandraResultSet.java     |   5 +-
 .../serializers/AvroCassandraUtils.java         |   2 +-
 .../cassandra/serializers/AvroSerializer.java   |  94 ++++++++++++++-
 .../serializers/CassandraQueryFactory.java      |  34 ++++--
 .../serializers/CassandraSerializer.java        | 102 ++++++++--------
 .../cassandra/serializers/NativeSerializer.java |  73 ++++++++++-
 .../gora/cassandra/store/CassandraClient.java   |   6 +-
 .../gora/cassandra/store/CassandraMapping.java  |   7 +-
 .../gora/cassandra/store/CassandraStore.java    | 120 ++++++++++++++++++-
 .../gora/cassandra/GoraCassandraTestDriver.java |   3 -
 .../store/TestAvroSerializationWithUDT.java     |   3 +
 .../cassandra/store/TestCassandraStore.java     |  44 ++-----
 .../store/TestNativeSerializationWithUDT.java   |   3 +
 15 files changed, 405 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
index 77d6777..369ff31 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
@@ -64,7 +64,7 @@ public class GoraCassandraNativeCompiler {
   }
 
   /**
-   * Generates Java classes for a schema.
+   * Generates Java classes for a mapping.
    */
   private static void compileSchema(File src, File dest) throws Exception {
     log.info("Compiling {} to {}", src, dest);

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
index 1479686..919e946 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
@@ -40,34 +40,61 @@ public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> {
     super(dataStore);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public Filter<K, T> getFilter() {
     return filter;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void setFilter(Filter<K, T> filter) {
     this.filter = filter;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public boolean isLocalFilterEnabled() {
     return localFilterEnabled;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void setLocalFilterEnabled(boolean enable) {
     localFilterEnabled = enable;
   }
 
+  /**
+   * This method adds Update field with the relevant Value
+   *
+   * @param field    field Name
+   * @param newValue New Value of the field
+   */
   public void addUpdateField(String field, Object newValue) {
     updateFields.put(field, newValue);
   }
 
+  /**
+   * This method returns the updated field value of the particular field.
+   *
+   * @param key Field Name
+   * @return Object value
+   */
   public Object getUpdateFieldValue(String key) {
     return updateFields.get(key);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public String[] getFields() {
     if (updateFields.size() == 0) {

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
index 7ad106d..f176350 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
@@ -42,8 +42,9 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
   private int position = 0;
 
   /**
-   * @param dataStore
-   * @param query
+   * Constructor of the Cassandra Result
+   * @param dataStore Cassandra Data Store
+   * @param query Cassandra Query
    */
   public CassandraResultSet(DataStore<K, T> dataStore, Query<K, T> query) {
     super(dataStore, query);

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
index 82a68ab..9d3dc89 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
@@ -254,7 +254,7 @@ class AvroCassandraUtils {
           try {
             result = (PersistentBase) HBaseByteInterface.fromBytes(schema, arr);
           } catch (IOException e) {
-            LOG.error("");
+            LOG.error("Error occurred while deserialize the Record. :" + e.getMessage());
             result = null;
           }
         } else {

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index a04974f..893de91 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -55,11 +56,75 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
 
   private DataStore<K, T> cassandraDataStore;
 
+  private Schema persistentSchema;
+
   AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping, Schema schema) {
-    super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping, schema);
+    super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
     this.cassandraDataStore = dataStore;
+    persistentSchema = schema;
+    try {
+      analyzePersistent();
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
+    }
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @throws Exception
+   */
+  protected void analyzePersistent() throws Exception {
+    userDefineTypeMaps = new HashMap<>();
+    for (Field field : mapping.getFieldList()) {
+      String fieldType = field.getType();
+      if (fieldType.contains("frozen")) {
+        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
+        if (PersistentBase.class.isAssignableFrom(persistentClass)) {
+          Schema fieldSchema = persistentSchema.getField(field.getFieldName()).schema();
+          if (fieldSchema.getType().equals(Schema.Type.UNION)) {
+            for (Schema currentSchema : fieldSchema.getTypes()) {
+              if (currentSchema.getType().equals(Schema.Type.RECORD)) {
+                fieldSchema = currentSchema;
+                break;
+              }
+            }
+          }
+          String createQuery = CassandraQueryFactory.getCreateUDTTypeForAvro(mapping, udtType, fieldSchema);
+          userDefineTypeMaps.put(udtType, createQuery);
+        } else {
+          throw new RuntimeException("Unsupported Class for User Define Types, Please use PersistentBase class. field : " + udtType);
+        }
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param query
+   * @return
+   */
+  @Override
+  public boolean updateByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getUpdateByQueryForAvro(mapping, query, objectArrayList, persistentSchema);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+    }
+    return results.wasApplied();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @param fields
+   * @return
+   */
   @Override
   public Persistent get(Object key, String[] fields) {
     if (fields == null) {
@@ -81,6 +146,12 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     return obj;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @param persistent
+   */
   @Override
   public void put(Object key, Persistent persistent) {
     if (persistent instanceof PersistentBase) {
@@ -142,6 +213,12 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @return
+   */
   @Override
   public Persistent get(Object key) {
     ArrayList<String> cassandraKeys = new ArrayList<>();
@@ -274,7 +351,12 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     return paramValue;
   }
 
-
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @return
+   */
   @Override
   public boolean delete(Object key) {
     ArrayList<String> cassandraKeys = new ArrayList<>();
@@ -285,7 +367,13 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     return resultSet.wasApplied();
   }
 
-
+  /**
+   * {@inheritDoc}
+   *
+   * @param dataStore
+   * @param query
+   * @return
+   */
   @Override
   public Result execute(DataStore dataStore, Query query) {
     List<Object> objectArrayList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
index af53d2a..928370c 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -604,17 +604,11 @@ class CassandraQueryFactory {
    * @param objects        field Objects list
    * @return CQL Query
    */
-  static String getUpdateByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) {
+  static String getUpdateByQueryForAvro(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) {
     Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName());
     Update.Assignments updateAssignments = null;
     if (cassandraQuery instanceof CassandraQuery) {
       String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
-      if (((CassandraStore) cassandraQuery.getDataStore()).getSerializationType().equalsIgnoreCase("NATIVE")) {
-        for (String column : columnNames) {
-          updateAssignments = update.with(QueryBuilder.set(column, "?"));
-          objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
-        }
-      } else {
         for (String column : columnNames) {
           updateAssignments = update.with(QueryBuilder.set(column, "?"));
           Field field = mapping.getFieldFromColumnName(column);
@@ -626,14 +620,38 @@ class CassandraQueryFactory {
             throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + ".");
           }
         }
-      }
     } else {
       throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method.");
     }
+    return processQuery(cassandraQuery, updateAssignments, mapping, objects);
+  }
 
+
+  /**
+   * This method returns the CQL Query for UpdateByQuery method
+   * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html
+   *
+   * @param mapping        Cassandra mapping {@link CassandraMapping}
+   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
+   * @param objects        field Objects list
+   * @return CQL Query
+   */
+  static String getUpdateByQueryForNative(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
+    Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName());
+    Update.Assignments updateAssignments = null;
+    if (cassandraQuery instanceof CassandraQuery) {
+      String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
+        for (String column : columnNames) {
+          updateAssignments = update.with(QueryBuilder.set(column, "?"));
+          objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
+        }
+    } else {
+      throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method.");
+    }
     return processQuery(cassandraQuery, updateAssignments, mapping, objects);
   }
 
+
   private static void populateFieldsToQuery(Schema schema, StringBuilder builder) throws Exception {
     switch (schema.getType()) {
       case INT:

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index 6871b6c..afddaef 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -26,7 +26,6 @@ import org.apache.gora.cassandra.store.CassandraClient;
 import org.apache.gora.cassandra.store.CassandraMapping;
 import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -48,21 +46,14 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
 
   protected Class<T> persistentClass;
   protected CassandraMapping mapping;
-  CassandraClient client;
-  private Map<String, String> userDefineTypeMaps;
-  private Schema persistentSchema;
+  protected CassandraClient client;
+  protected Map<String, String> userDefineTypeMaps;
 
-  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping, Schema schema) {
+  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) {
     this.keyClass = keyClass;
     this.persistentClass = persistantClass;
     this.client = cc;
     this.mapping = mapping;
-    persistentSchema = schema;
-    try {
-      analyzePersistent();
-    } catch (Exception e) {
-      throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
-    }
   }
 
   /**
@@ -85,40 +76,17 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
         break;
       case NATIVE:
       default:
-        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping, schema);
+        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
     }
     return serializer;
   }
 
-  private void analyzePersistent() throws Exception {
-    userDefineTypeMaps = new HashMap<>();
-    for (Field field : mapping.getFieldList()) {
-      String fieldType = field.getType();
-      if (fieldType.contains("frozen")) {
-        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
-        if (this instanceof AvroSerializer) {
-          if (PersistentBase.class.isAssignableFrom(persistentClass)) {
-            Schema fieldSchema = persistentSchema.getField(field.getFieldName()).schema();
-            if (fieldSchema.getType().equals(Schema.Type.UNION)) {
-              for (Schema currentSchema : fieldSchema.getTypes()) {
-                if (currentSchema.getType().equals(Schema.Type.RECORD)) {
-                  fieldSchema = currentSchema;
-                  break;
-                }
-              }
-            }
-            String createQuery = CassandraQueryFactory.getCreateUDTTypeForAvro(mapping, udtType, fieldSchema);
-            userDefineTypeMaps.put(udtType, createQuery);
-          } else {
-            throw new RuntimeException("Unsupported Class for User Define Types, Please use PersistentBase class. field : " + udtType);
-          }
-        } else {
-          String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName());
-          userDefineTypeMaps.put(udtType, createQuery);
-        }
-      }
-    }
-  }
+  /**
+   * In this method persistent class been analyzed to find inner records with UDT type, this method should call in every Cassandra serialization Constructor.
+   *
+   * @throws Exception
+   */
+  protected abstract void analyzePersistent() throws Exception;
 
 
   public void createSchema() {
@@ -166,27 +134,55 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
     return fields.toArray(new String[0]);
   }
 
+  /**
+   * Inserts the persistent Object
+   *
+   * @param key   key value
+   * @param value persistent value
+   */
   public abstract void put(K key, T value);
 
+  /**
+   * Retrieves the persistent value according to the key
+   *
+   * @param key key value
+   * @return persistent value
+   */
   public abstract T get(K key);
 
+  /**
+   * Deletes persistent value according to the key
+   *
+   * @param key key value
+   * @return isDeleted
+   */
   public abstract boolean delete(K key);
 
+  /**
+   * Retrieves the persistent value according to the key and fields
+   *
+   * @param key    key value
+   * @param fields fields
+   * @return persistent value
+   */
   public abstract T get(K key, String[] fields);
 
+  /**
+   * Executes the given query and returns the results.
+   *
+   * @param dataStore Cassandra data store
+   * @param query     Cassandra Query
+   * @return Cassandra Result
+   */
   public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query);
 
-  public boolean updateByQuery(Query query) {
-    List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList, persistentSchema);
-    ResultSet results;
-    if (objectArrayList.size() == 0) {
-      results = client.getSession().execute(cqlQuery);
-    } else {
-      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
-    }
-    return results.wasApplied();
-  }
+  /**
+   * Update the persistent objects
+   *
+   * @param query Cassandra Query
+   * @return isUpdated
+   */
+  public abstract boolean updateByQuery(Query query);
 
   public long deleteByQuery(Query query) {
     List<Object> objectArrayList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index f98c910..adb5c34 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -21,7 +21,6 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.datastax.driver.mapping.Result;
-import org.apache.avro.Schema;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.query.CassandraResultSet;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -47,19 +47,34 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
 
   private Mapper<T> mapper;
 
-  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping, Schema schema) {
-    super(cassandraClient, keyClass, persistentClass, mapping, schema);
+  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
+    super(cassandraClient, keyClass, persistentClass, mapping);
+    try {
+      analyzePersistent();
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
+    }
     this.createSchema();
     MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
     mapper = mappingManager.mapper(persistentClass);
   }
 
+  /**
+   * {@inheritDoc}
+   * @param key
+   * @param value
+   */
   @Override
   public void put(Object key, Persistent value) {
     LOG.debug("Object is saved with key : {} and value : {}", key, value);
     mapper.save((T) value);
   }
 
+  /**
+   * {@inheritDoc}
+   * @param key
+   * @return
+   */
   @Override
   public T get(Object key) {
     T object = mapper.get(key);
@@ -71,6 +86,11 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
     return object;
   }
 
+  /**
+   * {@inheritDoc}
+   * @param key
+   * @return
+   */
   @Override
   public boolean delete(Object key) {
     LOG.debug("Object is deleted for key : {}", key);
@@ -78,6 +98,12 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
     return true;
   }
 
+  /**
+   * {@inheritDoc}
+   * @param key
+   * @param fields
+   * @return
+   */
   @Override
   public Persistent get(Object key, String[] fields) {
     if (fields == null) {
@@ -95,6 +121,47 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
     return null;
   }
 
+  /**
+   * {@inheritDoc}
+   * @throws Exception
+   */
+  @Override
+  protected void analyzePersistent() throws Exception {
+    userDefineTypeMaps = new HashMap<>();
+    for (Field field : mapping.getFieldList()) {
+      String fieldType = field.getType();
+      if (fieldType.contains("frozen")) {
+        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
+        String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName());
+        userDefineTypeMaps.put(udtType, createQuery);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @param query
+   * @return
+   */
+  @Override
+  public boolean updateByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getUpdateByQueryForNative(mapping, query, objectArrayList);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+    }
+    return results.wasApplied();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @param dataStore
+   * @param query
+   * @return
+   */
   @Override
   public org.apache.gora.query.Result execute(DataStore dataStore, Query query) {
     List<Object> objectArrayList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index c973fe4..9a49a38 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -68,9 +68,10 @@ public class CassandraClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
 
-
   private Cluster cluster;
+
   private Session session;
+
   private CassandraMapping mapping;
 
   public Session getSession() {
@@ -258,7 +259,6 @@ public class CassandraClient {
     return builder;
   }
 
-
   private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) {
     String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY);
     if (loadBalancingProp != null) {
@@ -484,7 +484,6 @@ public class CassandraClient {
     return builder.withSocketOptions(options);
   }
 
-
   private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException {
     String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE);
     if (filename != null) {
@@ -500,7 +499,6 @@ public class CassandraClient {
     return null;
   }
 
-
   public void close() {
     this.session.close();
     this.cluster.close();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index 807a99d..911e782 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -121,9 +121,9 @@ public class CassandraMapping {
   }
 
   /**
-   * This method returns
+   * This method returns partition keys
    *
-   * @return
+   * @return partitionKeys
    */
   public String[] getAllFieldsIncludingKeys() {
     List<String> fieldNames = new ArrayList<>(fieldList.size());
@@ -140,7 +140,8 @@ public class CassandraMapping {
   }
 
   /**
-   * @return
+   * This method return all the fields which involves with partition keys, Including composite Keys
+   * @return field Names
    */
   public String[] getAllKeys() {
     List<String> fieldNames = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 5e31fcc..65d29b7 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -54,8 +54,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
 
   private Class<K> keyClass;
 
-  private Schema persistentSchema;
-
   private Class<T> persistentClass;
 
   private CassandraMapping mapping;
@@ -79,8 +77,10 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
    * @param persistentClass persistent class
    * @param properties      properties
    */
+  @Override
   public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
     LOG.debug("Initializing Cassandra store");
+    Schema persistentSchema;
     try {
       this.keyClass = keyClass;
       this.persistentClass = persistentClass;
@@ -104,6 +104,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     }
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @SuppressWarnings("all")
   @Override
   public Class<T> getPersistentClass() {
@@ -123,32 +128,58 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     this.persistentClass = persistentClass;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @Override
   public String getSchemaName() {
     return mapping.getCoreName();
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void createSchema() {
     cassandraSerializer.createSchema();
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void deleteSchema() {
     cassandraSerializer.deleteSchema();
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @SuppressWarnings("all")
   @Override
   public Class<K> getKeyClass() {
     return this.keyClass;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param keyClass the class of keys
+   */
   @Override
   public void setKeyClass(Class<K> keyClass) {
     this.keyClass = keyClass;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @Override
   public K newKey() {
     try {
@@ -162,6 +193,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     }
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @SuppressWarnings("all")
   @Override
   public T newPersistent() {
@@ -176,47 +212,96 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     }
   }
 
-
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @Override
   public BeanFactory<K, T> getBeanFactory() {
     return this.beanFactory;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param beanFactory the BeanFactory to use
+   */
   @Override
   public void setBeanFactory(BeanFactory<K, T> beanFactory) {
     this.beanFactory = beanFactory;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void close() {
     this.cassandraSerializer.close();
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param key the key of the object
+   * @return
+   */
   @Override
   public T get(K key) {
     return (T) cassandraSerializer.get(key);
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param key    the key of the object
+   * @param fields the fields required in the object. Pass null, to retrieve all fields
+   * @return
+   */
   @Override
   public T get(K key, String[] fields) {
     return (T) cassandraSerializer.get(key, fields);
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param key key value
+   * @param obj object value
+   */
   @Override
   public void put(K key, T obj) {
     cassandraSerializer.put(key, obj);
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param key the key of the object
+   * @return
+   */
   @Override
   public boolean delete(K key) {
     return cassandraSerializer.delete(key);
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param query matching records to this query will be deleted
+   * @return
+   */
   @Override
   public long deleteByQuery(Query<K, T> query) {
     return cassandraSerializer.deleteByQuery(query);
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param query the query to execute.
+   * @return
+   */
   @Override
   public Result<K, T> execute(Query<K, T> query) {
     return (Result<K, T>) cassandraSerializer.execute(this, query);
@@ -232,12 +317,24 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     return cassandraSerializer.updateByQuery(query);
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @Override
   public Query<K, T> newQuery() {
     Query<K, T> query = new CassandraQuery(this);
     return query;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param query cassandra Query
+   * @return
+   * @throws IOException
+   */
   @Override
   public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
     List<PartitionQuery<K, T>> partitions = new ArrayList<>();
@@ -247,21 +344,38 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     return partitions;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void flush() {
     // ignore since caching has been disabled
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param obj
+   * @return
+   */
   @Override
   public boolean equals(Object obj) {
     return super.equals(obj);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void truncateSchema() {
     cassandraSerializer.truncateSchema();
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
   @Override
   public boolean schemaExists() {
     return cassandraSerializer.schemaExists();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
index 1d454b4..e4c1ec4 100644
--- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
+++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/GoraCassandraTestDriver.java
@@ -35,8 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.Properties;
 
-// Logging imports
-
 /**
  * Helper class for third party tests using gora-cassandra backend.
  *
@@ -45,7 +43,6 @@ import java.util.Properties;
  * server. In this case we draw on Hector's @see EmbeddedServerHelper.
  * It starts (setUp) and stops (tearDown) embedded Cassandra server.
  */
-
 public class GoraCassandraTestDriver extends GoraTestDriver {
   private static Logger log = LoggerFactory.getLogger(GoraCassandraTestDriver.class);
 

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
index e37bf43..71b2b1d 100644
--- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
+++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
@@ -34,6 +34,9 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.Properties;
 
+/**
+ *This class contains tests cases to test the behaviour of Avro Serialization with UDT dataType.
+ */
 public class TestAvroSerializationWithUDT {
 
   private static GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver();

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
index 958629a..caf375e 100644
--- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
+++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
@@ -3,8 +3,8 @@
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
+ * to you under the Apache License, Version 2.0 (the"
+ * License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
@@ -14,41 +14,8 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
- * <p>
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora.
  */
 
-/**
- * Testing class for all standard gora-cassandra functionality.
- * We extend DataStoreTestBase enabling us to run the entire base test
- * suite for Gora. 
- */
 package org.apache.gora.cassandra.store;
 
 import org.apache.gora.cassandra.GoraCassandraTestDriver;
@@ -76,7 +43,12 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 /**
- * Test for CassandraStore.
+ *
+ * Testing class for all standard gora-cassandra functionality.
+ * We extend DataStoreTestBase enabling us to run the entire base test
+ * suite for Gora.
+ *
+ * Test Avro Serialization for CassandraStore.
  */
 public class TestCassandraStore extends DataStoreTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(TestCassandraStore.class);

http://git-wip-us.apache.org/repos/asf/gora/blob/0de528ef/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
index 8d8ee23..f9b5df4 100644
--- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
+++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
@@ -29,6 +29,9 @@ import org.junit.Test;
 
 import java.util.Properties;
 
+/**
+ * This class contains the tests cases to test the behaviour of Native Serialization with UDT dataType.
+ */
 public class TestNativeSerializationWithUDT {
 
   private static GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver();