You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2017/08/23 20:55:22 UTC

[24/37] gora git commit: Refactored the code

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
index 6f8284d..7deb49a 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
@@ -25,43 +25,30 @@ import java.util.Map;
  */
 public class KeySpace {
 
-  public enum PlacementStrategy {
-    SimpleStrategy,
-    NetworkTopologyStrategy,
-  }
-
   private String name;
-
   private PlacementStrategy placementStrategy;
-
   private boolean durableWritesEnabled;
-
   private int replicationFactor;
-
   private Map<String, Integer> dataCenters;
 
   public String getName() {
     return name;
   }
 
-  public boolean isDurableWritesEnabled() {
-    return durableWritesEnabled;
-  }
-
-  public PlacementStrategy getPlacementStrategy() {
-    return placementStrategy;
+  public void setName(String name) {
+    this.name = name;
   }
 
-  public int getReplicationFactor() {
-    return replicationFactor;
+  public boolean isDurableWritesEnabled() {
+    return durableWritesEnabled;
   }
 
-  public Map<String, Integer> getDataCenters() {
-    return dataCenters;
+  public void setDurableWritesEnabled(boolean durableWritesEnabled) {
+    this.durableWritesEnabled = durableWritesEnabled;
   }
 
-  public void addDataCenter(String key, Integer value) {
-    this.dataCenters.put(key, value);
+  public PlacementStrategy getPlacementStrategy() {
+    return placementStrategy;
   }
 
   public void setPlacementStrategy(PlacementStrategy placementStrategy) {
@@ -71,15 +58,24 @@ public class KeySpace {
     }
   }
 
+  public int getReplicationFactor() {
+    return replicationFactor;
+  }
+
   public void setReplicationFactor(int replicationFactor) {
     this.replicationFactor = replicationFactor;
   }
 
-  public void setName(String name) {
-    this.name = name;
+  public Map<String, Integer> getDataCenters() {
+    return dataCenters;
   }
 
-  public void setDurableWritesEnabled(boolean durableWritesEnabled) {
-    this.durableWritesEnabled = durableWritesEnabled;
+  public void addDataCenter(String key, Integer value) {
+    this.dataCenters.put(key, value);
+  }
+
+  public enum PlacementStrategy {
+    SimpleStrategy,
+    NetworkTopologyStrategy,
   }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
index a0d9c4c..3aa4e36 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
@@ -35,7 +35,7 @@ public class PartitionKeyField extends Field {
 
   public void setComposite(boolean composite) {
     isComposite = composite;
-    if(isComposite && fields == null) {
+    if (isComposite && fields == null) {
       fields = new ArrayList<>();
     }
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
index 5247ecc..3ad9186 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
@@ -5,15 +5,16 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /**
  * This package contains Casandra datastore related all classes.
  */

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
index bd17dcd..9d6e103 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
@@ -21,7 +21,6 @@ import com.datastax.driver.mapping.annotations.Transient;
 import org.apache.avro.Schema;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.Tombstone;
-import org.apache.gora.persistency.impl.PersistentBase;
 
 import java.util.List;
 
@@ -61,12 +60,6 @@ public abstract class CassandraNativePersistent implements Persistent {
 
   @Transient
   @Override
-  public void setDirty(String field) {
-
-  }
-
-  @Transient
-  @Override
   public void clearDirty(int fieldIndex) {
 
   }
@@ -103,6 +96,12 @@ public abstract class CassandraNativePersistent implements Persistent {
 
   @Transient
   @Override
+  public void setDirty(String field) {
+
+  }
+
+  @Transient
+  @Override
   public void clearDirty() {
 
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
index d6ba99c..1479686 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
@@ -70,7 +70,7 @@ public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> {
 
   @Override
   public String[] getFields() {
-    if(updateFields.size() == 0) {
+    if (updateFields.size() == 0) {
       return super.getFields();
     } else {
       String[] updateFieldsArray = new String[updateFields.size()];

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
index 7ab3726..c3b2e59 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,16 +19,13 @@
 package org.apache.gora.cassandra.query;
 
 import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
 import org.apache.gora.store.DataStore;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * CassandraResult specific implementation of the {@link org.apache.gora.query.Result}
@@ -45,7 +42,6 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
   private int position = 0;
 
   /**
-   *
    * @param dataStore
    * @param query
    */
@@ -54,34 +50,37 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
   }
 
   /**
-   *{@inheritDoc}
+   * {@inheritDoc}
+   *
    * @return
    * @throws IOException
    */
   @Override
   protected boolean nextInner() throws IOException {
-    if(offset < size) {
+    if (offset < size) {
       persistent = persistentObject.get(position);
       key = persistentKey.get(position);
-      position ++;
+      position++;
       return true;
     }
     return false;
   }
 
   /**
-   *{@inheritDoc}
+   * {@inheritDoc}
+   *
    * @return
    * @throws IOException
    * @throws InterruptedException
    */
   @Override
   public float getProgress() throws IOException, InterruptedException {
-    return ((float)position)/size;
+    return ((float) position) / size;
   }
 
   /**
-   *{@inheritDoc}
+   * {@inheritDoc}
+   *
    * @return
    */
   @Override
@@ -91,6 +90,7 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
 
   /**
    * {@inheritDoc}
+   *
    * @return
    */
   @Override
@@ -99,7 +99,6 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
   }
 
   /**
-   *
    * @param key
    * @param token
    */

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
index 49faefa..275c8d9 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
@@ -5,15 +5,16 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /**
  * This package contains all the Cassandra store query representation class as well as Result set representing class
  * when query is executed over the Cassandra dataStore.

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
index 5383949..7baa1b1 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
@@ -24,6 +24,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.bean.CassandraKey;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.hbase.util.HBaseByteInterface;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.DirtyListWrapper;
 import org.apache.gora.persistency.impl.DirtyMapWrapper;
@@ -31,6 +32,7 @@ import org.apache.gora.persistency.impl.PersistentBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,32 +54,26 @@ class AvroCassandraUtils {
 
   static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) {
     CassandraKey cassandraKey = cassandraMapping.getCassandraKey();
-    if (cassandraMapping.isPartitionKeyDefined()) {
-      if (cassandraKey != null) {
-        if (key instanceof PersistentBase) {
-          PersistentBase keyBase = (PersistentBase) key;
-          for (Schema.Field field : keyBase.getSchema().getFields()) {
-            if (cassandraMapping.getFieldFromFieldName(field.name()) != null) {
-              keys.add(field.name());
-              Object value = keyBase.get(field.pos());
-              value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value);
-              values.add(value);
-            } else {
-              LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()});
-            }
+    if (cassandraKey != null) {
+      if (key instanceof PersistentBase) {
+        PersistentBase keyBase = (PersistentBase) key;
+        for (Schema.Field field : keyBase.getSchema().getFields()) {
+          Field mappedField = cassandraKey.getFieldFromFieldName(field.name());
+          if (mappedField != null) {
+            keys.add(field.name());
+            Object value = keyBase.get(field.pos());
+            value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value, mappedField);
+            values.add(value);
+          } else {
+            LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()});
           }
-        } else {
-          LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
         }
       } else {
-        for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) {
-          keys.add(field.getFieldName());
-          values.add(key);
-        }
+        LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
       }
     } else {
-      keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName());
-      values.add(key.toString());
+      keys.add(cassandraMapping.getInlinedDefinedPartitionKey().getFieldName());
+      values.add(key);
     }
   }
 
@@ -91,21 +87,33 @@ class AvroCassandraUtils {
    * @param fieldValue  the field value.
    * @return field value
    */
-  static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) {
+  static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue, Field field) {
     switch (type) {
+      // Record can be persist with two ways, udt and bytes
       case RECORD:
         PersistentBase persistent = (PersistentBase) fieldValue;
-        PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
-        for (Schema.Field member : fieldSchema.getFields()) {
-          if (member.pos() == 0 || !persistent.isDirty()) {
-            continue;
+        if (field.getType().contains("frozen")) {
+          PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
+          for (Schema.Field member : fieldSchema.getFields()) {
+            if (member.pos() == 0 || !persistent.isDirty()) {
+              continue;
+            }
+            Schema memberSchema = member.schema();
+            Schema.Type memberType = memberSchema.getType();
+            Object memberValue = persistent.get(member.pos());
+            newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue, field));
+          }
+          fieldValue = newRecord;
+        } else if (field.getType().contains("blob")) {
+          try {
+            byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, fieldSchema);
+            fieldValue = ByteBuffer.wrap(serializedBytes);
+          } catch (IOException e) {
+            LOG.error("Error occurred when serializing {} field. {}", new Object[]{field.getFieldName(), e.getMessage()});
           }
-          Schema memberSchema = member.schema();
-          Schema.Type memberType = memberSchema.getType();
-          Object memberValue = persistent.get(member.pos());
-          newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue));
+        } else {
+          throw new RuntimeException("");
         }
-        fieldValue = newRecord;
         break;
       case MAP:
         Schema valueSchema = fieldSchema.getValueType();
@@ -114,7 +122,7 @@ class AvroCassandraUtils {
         for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) {
           String mapKey = e.getKey().toString();
           Object mapValue = e.getValue();
-          mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue);
+          mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue, field);
           map.put(mapKey, mapValue);
         }
         fieldValue = map;
@@ -124,7 +132,7 @@ class AvroCassandraUtils {
         valuetype = valueSchema.getType();
         ArrayList<Object> list = new ArrayList<>();
         for (Object item : (Collection<?>) fieldValue) {
-          Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item);
+          Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item, field);
           list.add(value);
         }
         fieldValue = list;
@@ -136,7 +144,7 @@ class AvroCassandraUtils {
           int schemaPos = getUnionSchema(fieldValue, fieldSchema);
           Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
           Schema.Type unionType = unionSchema.getType();
-          fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue);
+          fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue, field);
         }
         break;
       case STRING:
@@ -154,7 +162,7 @@ class AvroCassandraUtils {
    * If no data type can be inferred then we return a default value
    * of position 0.
    *
-   * @param pValue Object
+   * @param pValue       Object
    * @param pUnionSchema avro Schema
    * @return the unionSchemaPosition.
    */
@@ -183,74 +191,84 @@ class AvroCassandraUtils {
         return unionSchemaPos;
       else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD))
         return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.STRING))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.INT))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.LONG))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.DOUBLE))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FLOAT))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.MAP))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ARRAY))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ENUM))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FIXED))
+        return unionSchemaPos;
+      else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.RECORD))
+        return unionSchemaPos;
       unionSchemaPos++;
     }
     // if we weren't able to determine which data type it is, then we return the default
     return DEFAULT_UNION_SCHEMA;
   }
 
-  static String encodeFieldKey(final String key) {
-    if (key == null) {
-      return null;
-    }
-    return key.replace(".", "\u00B7")
-            .replace(":", "\u00FF")
-            .replace(";", "\u00FE")
-            .replace(" ", "\u00FD")
-            .replace("%", "\u00FC")
-            .replace("=", "\u00FB");
-  }
-
-  static String decodeFieldKey(final String key) {
-    if (key == null) {
-      return null;
-    }
-    return key.replace("\u00B7", ".")
-            .replace("\u00FF", ":")
-            .replace("\u00FE", ";")
-            .replace("\u00FD", " ")
-            .replace("\u00FC", "%")
-            .replace("\u00FB", "=");
-  }
-
   static Object getAvroFieldValue(Object value, Schema schema) {
     Object result;
     switch (schema.getType()) {
 
       case MAP:
         Map<String, Object> rawMap = (Map<String, Object>) value;
-        Map<Utf8, Object> deserializableMap = new HashMap<>();
+        Map<Utf8, Object> utf8ObjectHashMap = new HashMap<>();
         if (rawMap == null) {
-          result = new DirtyMapWrapper(deserializableMap);
+          result = new DirtyMapWrapper(utf8ObjectHashMap);
           break;
         }
         for (Map.Entry<?, ?> e : rawMap.entrySet()) {
           Schema innerSchema = schema.getValueType();
           Object obj = getAvroFieldValue(e.getValue(), innerSchema);
           if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
-            deserializableMap.put((Utf8) e.getKey(), obj);
+            utf8ObjectHashMap.put((Utf8) e.getKey(), obj);
           } else {
-            deserializableMap.put(new Utf8((String) e.getKey()), obj);
+            utf8ObjectHashMap.put(new Utf8((String) e.getKey()), obj);
           }
         }
-        result = new DirtyMapWrapper<>(deserializableMap);
+        result = new DirtyMapWrapper<>(utf8ObjectHashMap);
         break;
 
       case ARRAY:
         List<Object> rawList = (List<Object>) value;
-        List<Object> deserializableList = new ArrayList<>();
+        List<Object> objectArrayList = new ArrayList<>();
         if (rawList == null) {
-          return new DirtyListWrapper(deserializableList);
+          return new DirtyListWrapper(objectArrayList);
         }
         for (Object item : rawList) {
           Object obj = getAvroFieldValue(item, schema.getElementType());
-          deserializableList.add(obj);
+          objectArrayList.add(obj);
         }
-        result = new DirtyListWrapper<>(deserializableList);
+        result = new DirtyListWrapper<>(objectArrayList);
         break;
 
       case RECORD:
-        result = (PersistentBase) value;
+        if (value != null && ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          ByteBuffer buffer = (ByteBuffer) value;
+          byte[] arr = new byte[buffer.remaining()];
+          buffer.get(arr);
+          try {
+            result = (PersistentBase) HBaseByteInterface.fromBytes(schema, arr);
+          } catch (IOException e) {
+            LOG.error("");
+            result = null;
+          }
+        } else {
+          result = (PersistentBase) value;
+        }
         break;
 
       case UNION:

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index 4498caf..57d03f1 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -22,6 +22,8 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import org.apache.avro.Schema;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.cassandra.bean.CassandraKey;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.query.CassandraResultSet;
 import org.apache.gora.cassandra.store.CassandraClient;
@@ -50,7 +52,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
   private DataStore<K, T> cassandraDataStore;
 
   AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) {
-    super(cassandraClient, dataStore.getKeyClass(), dataStore.getKeyClass(), mapping);
+    super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
     this.cassandraDataStore = dataStore;
   }
 
@@ -70,7 +72,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     if (iterator.hasNext()) {
       obj = cassandraDataStore.newPersistent();
       Row row = iterator.next();
-      populateValuesToPersistent(row, definitions, obj);
+      populateValuesToPersistent(row, definitions, obj, fields);
     }
     return obj;
   }
@@ -85,13 +87,14 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
         AvroCassandraUtils.processKeys(mapping, key, fields, values);
         for (Schema.Field f : persistentBase.getSchema().getFields()) {
           String fieldName = f.name();
-          if (mapping.getFieldFromFieldName(fieldName) == null) {
+          Field field = mapping.getFieldFromFieldName(fieldName);
+          if (field == null) {
             LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass});
             continue;
           }
-          if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKeys().contains(mapping.getFieldFromFieldName(fieldName))) {
+          if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) {
             Object value = persistentBase.get(f.pos());
-            value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value);
+            value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field);
             values.add(value);
             fields.add(fieldName);
           }
@@ -120,7 +123,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     if (iterator.hasNext()) {
       obj = cassandraDataStore.newPersistent();
       Row row = iterator.next();
-      populateValuesToPersistent(row, definitions, obj);
+      populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames());
     }
     return obj;
   }
@@ -128,104 +131,129 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
   /**
    * This method wraps result set data in to DataEntry and creates a list of DataEntry.
    **/
-  private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base) {
-
+  private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) {
     Object paramValue;
-    for (Schema.Field avrofield : base.getSchema().getFields()) {
-
-      Field field = mapping.getFieldFromFieldName(avrofield.name());
+    for (String fieldName : fields) {
+      Schema.Field avroField = base.getSchema().getField(fieldName);
+      Field field = mapping.getFieldFromFieldName(fieldName);
       //to ignore unspecified fields in the mapping
-      if (field == null) {
+      if (field == null || avroField == null) {
         continue;
       }
-      Schema fieldSchema = avrofield.schema();
+      Schema fieldSchema = avroField.schema();
       String columnName = field.getColumnName();
-      DataType columnType = columnDefinitions.getType(columnName);
-
-      switch (columnType.getName()) {
-        case ASCII:
-          paramValue = row.getString(columnName);
-          break;
-        case BIGINT:
-          paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
-          break;
-        case BLOB:
-          paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
-          break;
-        case BOOLEAN:
-          paramValue = row.isNull(columnName) ? null : row.getBool(columnName);
-          break;
-        case COUNTER:
-          paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
-          break;
-        case DECIMAL:
-          paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName);
-          break;
-        case DOUBLE:
-          paramValue = row.isNull(columnName) ? null : row.getDouble(columnName);
-          break;
-        case FLOAT:
-          paramValue = row.isNull(columnName) ? null : row.getFloat(columnName);
-          break;
-        case INET:
-          paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString();
-          break;
-        case INT:
-          paramValue = row.isNull(columnName) ? null : row.getInt(columnName);
-          break;
-        case TEXT:
-          paramValue = row.getString(columnName);
-          break;
-        case TIMESTAMP:
-          paramValue = row.isNull(columnName) ? null : row.getDate(columnName);
-          break;
-        case UUID:
-          paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
-          break;
-        case VARCHAR:
-          paramValue = row.getString(columnName);
-          break;
-        case VARINT:
-          paramValue = row.isNull(columnName) ? null : row.getVarint(columnName);
-          break;
-        case TIMEUUID:
-          paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
-          break;
-        case LIST:
-          String dataType = field.getType();
-          dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
-          paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
-          break;
-        case SET:
-          dataType = field.getType();
-          dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
-          paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
-          break;
-        case MAP:
-          dataType = field.getType();
-          dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
-          dataType = dataType.split(",")[1];
-          // Avro supports only String for keys
-          paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType));
-          break;
-        case UDT:
-          paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName).toString();
-          break;
-        case TUPLE:
-          paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString();
-          break;
-        case CUSTOM:
-          paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
-          break;
-        default:
-          paramValue = row.getString(columnName);
-          break;
-      }
+      paramValue = getValue(row, columnDefinitions, columnName);
       Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema);
-      base.put(avrofield.pos(), value);
+      base.put(avroField.pos(), value);
+    }
+  }
+
+  private Object getValue(Row row, ColumnDefinitions columnDefinitions, String columnName) {
+    Object paramValue;
+    Field field = mapping.getFieldFromColumnName(columnName);
+    DataType columnType = columnDefinitions.getType(columnName);
+    switch (columnType.getName()) {
+      case ASCII:
+        paramValue = row.getString(columnName);
+        break;
+      case BIGINT:
+        paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+        break;
+      case BLOB:
+        paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+        break;
+      case BOOLEAN:
+        paramValue = row.isNull(columnName) ? null : row.getBool(columnName);
+        break;
+      case COUNTER:
+        paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+        break;
+      case DECIMAL:
+        paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName);
+        break;
+      case DOUBLE:
+        paramValue = row.isNull(columnName) ? null : row.getDouble(columnName);
+        break;
+      case FLOAT:
+        paramValue = row.isNull(columnName) ? null : row.getFloat(columnName);
+        break;
+      case INET:
+        paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString();
+        break;
+      case INT:
+        paramValue = row.isNull(columnName) ? null : row.getInt(columnName);
+        break;
+      case TEXT:
+        paramValue = row.getString(columnName);
+        break;
+      case TIMESTAMP:
+        paramValue = row.isNull(columnName) ? null : row.getDate(columnName);
+        break;
+      case UUID:
+        paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+        break;
+      case VARCHAR:
+        paramValue = row.getString(columnName);
+        break;
+      case VARINT:
+        paramValue = row.isNull(columnName) ? null : row.getVarint(columnName);
+        break;
+      case TIMEUUID:
+        paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+        break;
+      case LIST:
+        String dataType = field.getType();
+        dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+        paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
+        break;
+      case SET:
+        dataType = field.getType();
+        dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+        paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
+        break;
+      case MAP:
+        dataType = field.getType();
+        dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+        dataType = dataType.split(",")[1];
+        // Avro supports only String for keys
+        paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType));
+        break;
+      case UDT:
+        paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName);
+        break;
+      case TUPLE:
+        paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString();
+        break;
+      case CUSTOM:
+        paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+        break;
+      default:
+        paramValue = row.getString(columnName);
+        break;
     }
+    return paramValue;
   }
 
+/*  public Collection<Object> getFieldValues(Object o) {
+    UDTValue udtValue = (UDTValue) o;
+    UserType type = udtValue.getType();
+
+    Collection<Object> values = new ArrayList<Object>(type.size());
+
+ *//*   for (UserType.Field field : type) {
+      udtValue.
+      ByteBuffer bytes = udtValue.getBytesUnsafe(field.getName());
+      DataType value = field.getType();
+      for(DataType type1 : value.getTypeArguments()) {
+        type1.
+      }
+      values.add(value);
+    }*//*
+
+    return values;
+  }*/
+
+
   private Class getRelevantClassForCassandraDataType(String dataType) {
     switch (dataType) {
       //// TODO: 7/25/17 support all the datatypes 
@@ -254,8 +282,14 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
   @Override
   public Result execute(DataStore dataStore, Query query) {
     List<Object> objectArrayList = new ArrayList<>();
+    String[] fields = query.getFields();
+    if (fields != null) {
+      fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+    } else {
+      fields = mapping.getAllFieldsIncludingKeys();
+    }
     CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
-    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList);
+    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
     ResultSet results;
     if (objectArrayList.size() == 0) {
       results = client.getSession().execute(cqlQuery);
@@ -266,12 +300,18 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
     ColumnDefinitions definitions = results.getColumnDefinitions();
     T obj;
     K keyObject;
+    CassandraKey cassandraKey = mapping.getCassandraKey();
     while (iterator.hasNext()) {
       Row row = iterator.next();
       obj = cassandraDataStore.newPersistent();
       keyObject = cassandraDataStore.newKey();
-      populateValuesToPersistent(row, definitions, obj);
-      populateValuesToPersistent(row, definitions, (PersistentBase) keyObject);
+      populateValuesToPersistent(row, definitions, obj, fields);
+      if (cassandraKey != null) {
+        populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames());
+      } else {
+        Field key = mapping.getInlinedDefinedPartitionKey();
+        keyObject = (K) getValue(row, definitions, key.getColumnName());
+      }
       cassandraResult.addResultElement(keyObject, obj);
     }
     return cassandraResult;

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
index 4362a04..10c8f68 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class is used create Cassandra Queries.
@@ -97,29 +97,17 @@ class CassandraQueryFactory {
     stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" (");
     boolean isCommaNeeded = false;
     CassandraKey cassandraKey = mapping.getCassandraKey();
-    // appending Cassandra key columns into db schema
-    for (Field field : mapping.getFieldList()) {
-      if (isCommaNeeded) {
-        stringBuffer.append(", ");
-      }
-      stringBuffer.append(field.getColumnName()).append(" ").append(field.getType());
-      boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
-      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
-      if (isStaticColumn) {
-        stringBuffer.append(" STATIC");
-      }
-      if (isPrimaryKey) {
-        stringBuffer.append("  PRIMARY KEY ");
-      }
-      isCommaNeeded = true;
-    }
+    // appending Cassandra Persistent columns into db schema
+    processFieldsForCreateTableQuery(mapping.getFieldList(), isCommaNeeded, stringBuffer);
 
     if (cassandraKey != null) {
-      List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields();
-      if (pkey != null) {
+      isCommaNeeded = true;
+      processFieldsForCreateTableQuery(cassandraKey.getFieldList(), isCommaNeeded, stringBuffer);
+      List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields();
+      if (partitionKeys != null) {
         stringBuffer.append(", PRIMARY KEY (");
         boolean isCommaNeededToApply = false;
-        for (PartitionKeyField keyField : pkey) {
+        for (PartitionKeyField keyField : partitionKeys) {
           if (isCommaNeededToApply) {
             stringBuffer.append(",");
           }
@@ -141,11 +129,6 @@ class CassandraQueryFactory {
         }
         stringBuffer.append(")");
       }
-    } else {
-      if (!stringBuffer.toString().toLowerCase(Locale.ENGLISH).contains("primary key")) {
-        Field field = mapping.getDefaultCassandraKey();
-        stringBuffer.append(", ").append(field.getFieldName()).append(" ").append(field.getType()).append("  PRIMARY KEY ");
-      }
     }
 
     stringBuffer.append(")");
@@ -191,6 +174,24 @@ class CassandraQueryFactory {
     return stringBuffer.toString();
   }
 
+  private static void processFieldsForCreateTableQuery(List<Field> fields, boolean isCommaNeeded, StringBuilder stringBuilder) {
+    for (Field field : fields) {
+      if (isCommaNeeded) {
+        stringBuilder.append(", ");
+      }
+      stringBuilder.append(field.getColumnName()).append(" ").append(field.getType());
+      boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
+      boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
+      if (isStaticColumn) {
+        stringBuilder.append(" STATIC");
+      }
+      if (isPrimaryKey) {
+        stringBuilder.append("  PRIMARY KEY ");
+      }
+      isCommaNeeded = true;
+    }
+  }
+
   /**
    * This method returns the CQL query to drop table.
    * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
@@ -221,7 +222,7 @@ class CassandraQueryFactory {
    * @return CQL query
    */
   static String getTruncateTableQuery(CassandraMapping mapping) {
-    return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
+    return QueryBuilder.truncate(mapping.getKeySpace().getName(), mapping.getCoreName()).getQueryString();
   }
 
   /**
@@ -371,9 +372,7 @@ class CassandraQueryFactory {
    * @param objects        object list
    * @return CQL Query
    */
-  static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
-    String[] fields = cassandraQuery.getFields();
-    fields = fields != null ? fields : mapping.getFieldNames();
+  static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) {
     Object startKey = cassandraQuery.getStartKey();
     Object endKey = cassandraQuery.getEndKey();
     Object key = cassandraQuery.getKey();
@@ -470,14 +469,17 @@ class CassandraQueryFactory {
     ArrayList<String> columnNames = new ArrayList<>();
     for (String field : fields) {
       Field fieldBean = mapping.getFieldFromFieldName(field);
+      CassandraKey cassandraKey = mapping.getCassandraKey();
+      Field keyBean = null;
+      if (cassandraKey != null) {
+        keyBean = cassandraKey.getFieldFromFieldName(field);
+      }
       if (fieldBean != null) {
         columnNames.add(fieldBean.getColumnName());
+      } else if (keyBean != null) {
+        columnNames.add(keyBean.getColumnName());
       } else {
-        if (mapping.getDefaultCassandraKey().getFieldName().equals(field)) {
-          columnNames.add(field);
-        } else {
-          LOG.warn("{} field is ignored, couldn't find relavant field in the persistent mapping", field);
-        }
+        LOG.warn("{} field is ignored, couldn't find relevant field in the persistent mapping", field);
       }
     }
     return columnNames.toArray(new String[0]);
@@ -504,7 +506,7 @@ class CassandraQueryFactory {
    */
   static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
     String[] columns = null;
-    if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) {
+    if (cassandraQuery.getFields() != null) {
       columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
     }
     Object startKey = cassandraQuery.getStartKey();
@@ -549,17 +551,17 @@ class CassandraQueryFactory {
           String[] columnKeys = getColumnNames(mapping, cassandraKeys);
           for (int i = 0; i < cassandraKeys.size(); i++) {
             if (isWhereNeeded) {
-              query = delete.where(QueryBuilder.gte(columnKeys[i], "?"));
+              query = delete.where(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
               objects.add(cassandraValues.get(i));
               isWhereNeeded = false;
             } else {
-              query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
+              query = query.and(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
               objects.add(cassandraValues.get(i));
             }
           }
         } else {
           primaryKey = getPKey(mapping.getFieldList());
-          query = delete.where(QueryBuilder.gte(primaryKey, "?"));
+          query = delete.where(QueryBuilder.gte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
           objects.add(startKey);
           isWhereNeeded = false;
         }
@@ -572,20 +574,20 @@ class CassandraQueryFactory {
           String[] columnKeys = getColumnNames(mapping, cassandraKeys);
           for (int i = 0; i < cassandraKeys.size(); i++) {
             if (isWhereNeeded) {
-              query = delete.where(QueryBuilder.lte(columnKeys[i], "?"));
+              query = delete.where(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
               objects.add(cassandraValues.get(i));
               isWhereNeeded = false;
             } else {
-              query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
+              query = query.and(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
               objects.add(cassandraValues.get(i));
             }
           }
         } else {
           primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
           if (isWhereNeeded) {
-            query = delete.where(QueryBuilder.lte(primaryKey, "?"));
+            query = delete.where(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
           } else {
-            query = query.and(QueryBuilder.lte(primaryKey, "?"));
+            query = query.and(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
           }
           objects.add(endKey);
         }
@@ -611,7 +613,7 @@ class CassandraQueryFactory {
     Update.Assignments updateAssignments = null;
     if (cassandraQuery instanceof CassandraQuery) {
       String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
-      if(CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) {
+      if (CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) {
         for (String column : columnNames) {
           updateAssignments = update.with(QueryBuilder.set(column, "?"));
           objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
@@ -619,12 +621,12 @@ class CassandraQueryFactory {
       } else {
         for (String column : columnNames) {
           updateAssignments = update.with(QueryBuilder.set(column, "?"));
-          String field = mapping.getFieldFromColumnName(column).getFieldName();
-          Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field);
+          Field field = mapping.getFieldFromColumnName(column);
+          Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName());
           try {
             Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null);
-            Schema schemaField = schema.getField(field).schema();
-            objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value));
+            Schema schemaField = schema.getField(field.getFieldName()).schema();
+            objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field));
           } catch (IllegalAccessException | NoSuchFieldException e) {
             throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler.");
           } catch (NullPointerException e) {
@@ -717,4 +719,111 @@ class CassandraQueryFactory {
     return query.getQueryString();
   }
 
+  /**
+   * This method returns create Type CQL query to create user define types.
+   * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlRefcreateType.html
+   *
+   * @param fieldSchema avroSchema {@link Schema}
+   * @param mapping Cassandra mapping {@link CassandraMapping}
+   * @return CQL Query
+   */
+  static String getCreateUDTType(Schema fieldSchema, CassandraMapping mapping, Set<String> udtQueryStack) {
+    StringBuilder stringBuffer = new StringBuilder();
+    if (fieldSchema.getType().equals(Schema.Type.UNION)) {
+      for (Schema fieldTypeSchema : fieldSchema.getTypes()) {
+        if (fieldTypeSchema.getType().equals(Schema.Type.RECORD)) {
+          fieldSchema = fieldTypeSchema;
+          break;
+        }
+      }
+    }
+    stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(fieldSchema.getName()).append(" (");
+    processRecord(fieldSchema, stringBuffer, mapping, udtQueryStack);
+    stringBuffer.append(")");
+    return stringBuffer.toString();
+  }
+
+  private static void processRecord(Schema recordSchema, StringBuilder stringBuilder, CassandraMapping mapping, Set<String> udtQueryStack) {
+    boolean isCommaNeeded = false;
+    for (Schema.Field field : recordSchema.getFields()) {
+      if (isCommaNeeded) {
+        stringBuilder.append(", ");
+      }
+      String fieldName = field.name();
+      stringBuilder.append(fieldName).append(" ");
+      try {
+        populateFieldsToQuery(field.schema(), stringBuilder, mapping, udtQueryStack);
+        isCommaNeeded = true;
+      } catch (Exception e) {
+        int i = stringBuilder.indexOf(fieldName);
+        if (i != -1) {
+          stringBuilder.delete(i, i + fieldName.length());
+          isCommaNeeded = false;
+        }
+      }
+    }
+  }
+
+  private static void populateFieldsToQuery(Schema schema, StringBuilder builder, CassandraMapping mapping, Set<String> udtQueryStack) throws Exception {
+    switch (schema.getType()) {
+      case INT:
+        builder.append("int");
+        break;
+      case MAP:
+        builder.append("map<text,");
+        populateFieldsToQuery(schema.getValueType(), builder, mapping, udtQueryStack);
+        builder.append(">");
+        break;
+      case ARRAY:
+        builder.append("list<");
+        populateFieldsToQuery(schema.getElementType(), builder, mapping, udtQueryStack);
+        builder.append(">");
+        break;
+      case LONG:
+        builder.append("bigint");
+        break;
+      case FLOAT:
+        builder.append("float");
+        break;
+      case DOUBLE:
+        builder.append("double");
+        break;
+      case BOOLEAN:
+        builder.append("boolean");
+        break;
+      case BYTES:
+        builder.append("blob");
+        break;
+      case RECORD:
+        builder.append("frozen<").append(schema.getName()).append(">");
+        String query = getCreateUDTType(schema, mapping, udtQueryStack);
+        udtQueryStack.add(query);
+        break;
+      case STRING:
+      case FIXED:
+      case ENUM:
+        builder.append("text");
+        break;
+      case UNION:
+        for (Schema unionElementSchema : schema.getTypes()) {
+          if (unionElementSchema.getType().equals(Schema.Type.RECORD)) {
+            String recordName = unionElementSchema.getName();
+            if (!builder.toString().contains(recordName)) {
+              builder.append("frozen<").append(recordName).append(">");
+              query = getCreateUDTType(unionElementSchema, mapping, udtQueryStack);
+              udtQueryStack.add(query);
+            } else {
+              LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping.");
+              throw new Exception("Same Field Type has mapped recursively");
+            }
+            break;
+          } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) {
+            populateFieldsToQuery(unionElementSchema, builder, mapping, udtQueryStack);
+            break;
+          }
+        }
+        break;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index ac4da42..17e0568 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -20,6 +20,7 @@ package org.apache.gora.cassandra.serializers;
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.TableMetadata;
+import org.apache.avro.Schema;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.store.CassandraClient;
 import org.apache.gora.cassandra.store.CassandraMapping;
@@ -32,22 +33,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 
 /**
  * This is the abstract Cassandra Serializer class.
  */
 public abstract class CassandraSerializer<K, T extends Persistent> {
-  CassandraClient client;
-
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
   protected Class<K> keyClass;
 
   protected Class<T> persistentClass;
 
   protected CassandraMapping mapping;
-
-  private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
+  CassandraClient client;
 
   CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) {
     this.keyClass = keyClass;
@@ -56,9 +57,35 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
     this.mapping = mapping;
   }
 
+  /**
+   * This method returns the Cassandra Serializer according the Cassandra serializer property.
+   *
+   * @param cc        Cassandra Client
+   * @param type      Serialization type
+   * @param dataStore Cassandra DataStore
+   * @param mapping   Cassandra Mapping
+   * @param <K>       key class
+   * @param <T>       persistent class
+   * @return Serializer
+   */
+  public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) {
+    CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
+    CassandraSerializer serializer;
+    switch (serType) {
+      case AVRO:
+        serializer = new AvroSerializer(cc, dataStore, mapping);
+        break;
+      case NATIVE:
+      default:
+        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
+    }
+    return serializer;
+  }
+
   public void createSchema() {
     LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
     this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
+    processUDTSchemas(); //TODO complete functionality
     LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
     this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
   }
@@ -89,29 +116,30 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
     }
   }
 
-  /**
-   * This method returns the Cassandra Serializer according the Cassandra serializer property.
-   *
-   * @param cc              Cassandra Client
-   * @param type            Serialization type
-   * @param dataStore        Cassandra DataStore
-   * @param mapping         Cassandra Mapping
-   * @param <K>             key class
-   * @param <T>             persistent class
-   * @return Serializer
-   */
-  public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K,T> dataStore, CassandraMapping mapping) {
-    CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
-    CassandraSerializer serializer;
-    switch (serType) {
-      case AVRO:
-        serializer = new AvroSerializer(cc, dataStore, mapping);
-        break;
-      case NATIVE:
-      default:
-        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
+  private void processUDTSchemas() {
+    Set<String> schemaStack = new LinkedHashSet<>();
+    for (Field field : mapping.getFieldList()) {
+      if (field.getType().contains("frozen")) {
+        try {
+          Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null);
+          Schema schemaField = schema.getField(field.getFieldName()).schema();
+          String cqlQuery = CassandraQueryFactory.getCreateUDTType(schemaField, mapping, schemaStack);
+          schemaStack.add(cqlQuery);
+        } catch (IllegalAccessException | NoSuchFieldException e) {
+          throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler.");
+        } catch (NullPointerException e) {
+          throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + ".");
+        }
+      }
+    }
+    createUserDefineTypes(schemaStack);
+
+  }
+
+  private void createUserDefineTypes(Set<String> queries) {
+    for (String cqlQuery : queries) {
+      this.client.getSession().execute(cqlQuery);
     }
-    return serializer;
   }
 
   protected String[] getFields() {
@@ -146,14 +174,22 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
 
   public long deleteByQuery(Query query) {
     List<Object> objectArrayList = new ArrayList<>();
-    String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
-    ResultSet results;
-    if (objectArrayList.size() == 0) {
-      results = client.getSession().execute(cqlQuery);
+    if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) {
+      if (query.getFields() == null) {
+        client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+      } else {
+        LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields.");
+      }
     } else {
-      results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+      String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
+      ResultSet results;
+      if (objectArrayList.size() == 0) {
+        results = client.getSession().execute(cqlQuery);
+      } else {
+        results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+      }
+      LOG.debug("Delete by Query was applied : " + results.wasApplied());
     }
-    LOG.debug("Delete by Query was applied : " + results.wasApplied());
     LOG.info("Delete By Query method doesn't return the deleted element count.");
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index 4695498..f8bb066 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.datastax.driver.mapping.Result;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
 import org.apache.gora.cassandra.query.CassandraResultSet;
@@ -46,16 +47,23 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
 
   private Mapper<T> mapper;
 
+  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
+    super(cassandraClient, keyClass, persistentClass, mapping);
+    this.createSchema();
+    MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
+    mapper = mappingManager.mapper(persistentClass);
+  }
+
   @Override
   public void put(Object key, Persistent value) {
-    LOG.debug("Object is saved with key : {} and value : {}",key,value);
+    LOG.debug("Object is saved with key : {} and value : {}", key, value);
     mapper.save((T) value);
   }
 
   @Override
   public T get(Object key) {
     T object = mapper.get(key);
-    if(object != null) {
+    if (object != null) {
       LOG.debug("Object is found for key : {}", key);
     } else {
       LOG.debug("Object is not found for key : {}", key);
@@ -72,7 +80,7 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
 
   @Override
   public Persistent get(Object key, String[] fields) {
-    if(fields == null) {
+    if (fields == null) {
       fields = getFields();
     }
     String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields);
@@ -83,15 +91,21 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
       LOG.debug("Object is found for key : {}", key);
       return objectList.get(0);
     }
-    LOG.debug("Object is not found for key : {}" , key);
+    LOG.debug("Object is not found for key : {}", key);
     return null;
   }
 
   @Override
   public org.apache.gora.query.Result execute(DataStore dataStore, Query query) {
     List<Object> objectArrayList = new ArrayList<>();
+    String[] fields = query.getFields();
+    if (fields != null) {
+      fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+    } else {
+      fields = mapping.getAllFieldsIncludingKeys();
+    }
     CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
-    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList);
+    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
     ResultSet results;
     if (objectArrayList.size() == 0) {
       results = client.getSession().execute(cqlQuery);
@@ -108,13 +122,6 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
     return cassandraResult;
   }
 
-  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
-    super(cassandraClient, keyClass, persistentClass, mapping);
-    this.createSchema();
-    MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
-    mapper = mappingManager.mapper(persistentClass);
-  }
-
   private K getKey(T object) {
     String keyField = null;
     for (Field field : mapping.getFieldList()) {

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
index 5d22d94..ce1e3e7 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
@@ -5,15 +5,16 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /**
  * This package contains Cassandra store related util classes for serializer.
  */

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index 196f6a3..c973fe4 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.gora.cassandra.store;
 
 import com.datastax.driver.core.Cluster;
@@ -53,6 +70,8 @@ public class CassandraClient {
 
 
   private Cluster cluster;
+  private Session session;
+  private CassandraMapping mapping;
 
   public Session getSession() {
     return session;
@@ -62,11 +81,6 @@ public class CassandraClient {
     return cluster;
   }
 
-  private Session session;
-
-  private CassandraMapping mapping;
-
-
   void initialize(Properties properties, CassandraMapping mapping) throws Exception {
     Cluster.Builder builder = Cluster.builder();
     List<String> codecs = readCustomCodec(properties);

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index 5699355..ac46a30 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -31,54 +31,80 @@ import java.util.Map;
  */
 public class CassandraMapping {
 
+  private static final String PRIMARY_KEY = "primarykey";
   private CassandraKey cassandraKey;
-
   private Map<String, String> tableProperties;
-
   private Class keyClass;
-
   private Class persistentClass;
-
   private KeySpace keySpace;
-
   private List<Field> fieldList;
-
-  private List<Field> inlinedDefinedPartitionKeys;
-
-  private static final String PRIMARY_KEY = "primarykey";
-
+  private Field inlinedDefinedPartitionKey;
   private String coreName;
 
+  /**
+   * Constructor of the class
+   */
+  CassandraMapping() {
+    this.fieldList = new ArrayList<>();
+    this.tableProperties = new HashMap<>();
+  }
+
+  /**
+   * This method returns the KeySpace in the mapping file,
+   * @return Key space {@link KeySpace}
+   */
   public KeySpace getKeySpace() {
     return keySpace;
   }
 
-  public void setKeySpace(KeySpace keySpace) {
+  /**
+   * This method set the KeySpace in the Cassandra mapping.
+   * @param keySpace Key space {@link KeySpace}
+   */
+  void setKeySpace(KeySpace keySpace) {
     this.keySpace = keySpace;
   }
 
+  /**
+   * Thi method returns only the fields which belongs only for the Persistent Object.
+   * @return List of Fields
+   */
   public List<Field> getFieldList() {
     return fieldList;
   }
 
+  /**
+   * This method returns the Persistent Object's Field from the mapping, according to the FieldName.
+   * @param fieldName Field Name
+   * @return Field {@link Field}
+   */
   public Field getFieldFromFieldName(String fieldName) {
     for (Field field1 : fieldList) {
-      if (field1.getFieldName().equals(fieldName)) {
+      if (field1.getFieldName().equalsIgnoreCase(fieldName)) {
         return field1;
       }
     }
     return null;
   }
 
+  /**
+   * This method returns the Persistent Object's Field from the mapping, according to the ColumnName.
+   * @param columnName Column Name
+   * @return Field {@link Field}
+   */
   public Field getFieldFromColumnName(String columnName) {
     for (Field field1 : fieldList) {
-      if (field1.getColumnName().equals(columnName)) {
+      if (field1.getColumnName().equalsIgnoreCase(columnName)) {
         return field1;
       }
     }
     return null;
   }
 
+  /**
+   * This method returns the Field Names
+   * @return array of Field Names
+   */
   public String[] getFieldNames() {
     List<String> fieldNames = new ArrayList<>(fieldList.size());
     for (Field field : fieldList) {
@@ -88,60 +114,79 @@ public class CassandraMapping {
     return fieldNames.toArray(fieldNameArray);
   }
 
+  /**
+   * This method returns
+   * @return
+   */
+  public String[] getAllFieldsIncludingKeys() {
+    List<String> fieldNames = new ArrayList<>(fieldList.size());
+    for (Field field : fieldList) {
+      fieldNames.add(field.getFieldName());
+    }
+    if (cassandraKey != null) {
+      for (Field field : cassandraKey.getFieldList()) {
+        fieldNames.add(field.getFieldName());
+      }
+    }
+    String[] fieldNameArray = new String[fieldNames.size()];
+    return fieldNames.toArray(fieldNameArray);
+  }
+
+  /**
+   *
+   * @return
+   */
+  public String[] getAllKeys() {
+    List<String> fieldNames = new ArrayList<>();
+    Field keyField = getInlinedDefinedPartitionKey();
+    if (cassandraKey != null) {
+      for (Field field : cassandraKey.getFieldList()) {
+        fieldNames.add(field.getFieldName());
+      }
+    } else {
+      fieldNames.add(keyField.getFieldName());
+    }
+    String[] fieldNameArray = new String[fieldNames.size()];
+    return fieldNames.toArray(fieldNameArray);
+  }
+
   public CassandraKey getCassandraKey() {
     return cassandraKey;
   }
 
   void setCassandraKey(CassandraKey cassandraKey) {
     this.cassandraKey = cassandraKey;
-    this.fieldList.addAll(cassandraKey.getFieldList());
   }
 
-  CassandraMapping() {
-    this.fieldList = new ArrayList<>();
-    this.tableProperties = new HashMap<>();
+  public String getCoreName() {
+    return coreName;
   }
 
-  public void setCoreName(String coreName) {
+  void setCoreName(String coreName) {
     this.coreName = coreName;
   }
 
-  public String getCoreName() {
-    return coreName;
-  }
-
-  public void addCassandraField(Field field) {
+  void addCassandraField(Field field) {
     this.fieldList.add(field);
   }
 
-  public void addProperty(String key, String value) {
-        this.tableProperties.put(key,value);
+  void addProperty(String key, String value) {
+    this.tableProperties.put(key, value);
   }
 
   public String getProperty(String key) {
     return this.tableProperties.get(key);
   }
 
-  public Field getDefaultCassandraKey() {
+  private Field getDefaultCassandraKey() {
     Field field = new Field();
     field.setFieldName("defaultId");
     field.setColumnName("defaultId");
-    field.setType("text");
+    field.setType("varchar");
+    field.addProperty("primarykey", "true");
     return field;
   }
 
-  public boolean isPartitionKeyDefined() {
-    if (cassandraKey == null) {
-      for (Field field : fieldList) {
-        if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
-          return true;
-        }
-      }
-      return false;
-    }
-    return true;
-  }
-
   public Class getKeyClass() {
     return keyClass;
   }
@@ -154,21 +199,38 @@ public class CassandraMapping {
     return persistentClass;
   }
 
-  public void setPersistentClass(Class persistentClass) {
+  void setPersistentClass(Class persistentClass) {
     this.persistentClass = persistentClass;
   }
 
-  public List<Field> getInlinedDefinedPartitionKeys() {
-    if(inlinedDefinedPartitionKeys != null) {
-      return inlinedDefinedPartitionKeys;
+  /**
+   * This method return the Inlined defined Partition Key,
+   * If there isn't any inlined define partition keys,
+   * this method returns default predefined partition key "defaultId".
+   *
+   * @return Partition Key {@link Field}
+   */
+  public Field getInlinedDefinedPartitionKey() {
+    if (inlinedDefinedPartitionKey != null) {
+      return inlinedDefinedPartitionKey;
     } else {
-      inlinedDefinedPartitionKeys = new ArrayList<>();
       for (Field field : fieldList) {
         if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
-          inlinedDefinedPartitionKeys.add(field);
+          inlinedDefinedPartitionKey = field;
+          break;
         }
       }
-      return inlinedDefinedPartitionKeys;
+      if (inlinedDefinedPartitionKey == null) {
+        return getDefaultCassandraKey();
+      }
+      return inlinedDefinedPartitionKey;
+    }
+  }
+
+  void finalized() {
+    Field field = getInlinedDefinedPartitionKey();
+    if (!fieldList.contains(field) && cassandraKey == null) {
+      fieldList.add(field);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
index c501cc5..f151458 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.gora.cassandra.store;
 
 import org.apache.gora.cassandra.bean.CassandraKey;
@@ -203,6 +220,7 @@ class CassandraMappingBuilder<K, T extends Persistent> {
     } catch (Exception ex) {
       throw new IOException(ex);
     }
+    cassandraMapping.finalized();
     return cassandraMapping;
   }
 
@@ -219,6 +237,10 @@ class CassandraMappingBuilder<K, T extends Persistent> {
           fieldKey.setColumnName(attributeValue);
           break;
         case "type":
+          // replace UDT into frozen
+          if (attributeValue.contains("udt(")) {
+            attributeValue = attributeValue.replace("udt(", "frozen(");
+          }
           fieldKey.setType(attributeValue.replace("(", "<").replace(")", ">"));
           break;
         default:
@@ -229,8 +251,8 @@ class CassandraMappingBuilder<K, T extends Persistent> {
   }
 
   private int getReplicationFactor(Element element) {
-    String value  = element.getAttributeValue("replication_factor");
-    if(value == null) {
+    String value = element.getAttributeValue("replication_factor");
+    if (value == null) {
       return 1;
     } else {
       return Integer.parseInt(value);

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 74d3862..c481610 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -17,11 +17,13 @@
 
 package org.apache.gora.cassandra.store;
 
+import org.apache.avro.data.RecordBuilder;
 import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.serializers.CassandraSerializer;
 import org.apache.gora.persistency.BeanFactory;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -58,16 +60,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
 
   private CassandraSerializer cassandraSerializer;
 
-  public enum SerializerType {
-    AVRO("AVRO"), NATIVE("NATIVE");
-    String val;
-
-    SerializerType(String v) {
-      this.val = v;
-    }
-  }
-
-
   public CassandraStore() {
     super();
   }
@@ -96,6 +88,12 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     }
   }
 
+  @SuppressWarnings("all")
+  @Override
+  public Class<T> getPersistentClass() {
+    return (Class<T>) this.persistentClass;
+  }
+
   /**
    * {@inheritDoc}
    * <p>
@@ -110,12 +108,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     this.persistentClass = persistentClass;
   }
 
-  @SuppressWarnings("all")
-  @Override
-  public Class<T> getPersistentClass() {
-    return (Class<T>) this.persistentClass;
-  }
-
   @Override
   public String getSchemaName() {
     return mapping.getCoreName();
@@ -148,7 +140,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
       if (beanFactory != null) {
         return beanFactory.newKey();
       } else {
-        LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory.");
         return keyClass.newInstance();
       }
     } catch (Exception ex) {
@@ -162,8 +153,10 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     try {
       if (beanFactory != null) {
         return this.beanFactory.newPersistent();
+      } else if (PersistentBase.class.isAssignableFrom(persistentClass)) {
+        RecordBuilder builder = (RecordBuilder) persistentClass.getMethod("newBuilder").invoke(null);
+        return (T) RecordBuilder.class.getMethod("build").invoke(builder);
       } else {
-        LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory.");
         return persistentClass.newInstance();
       }
     } catch (Exception ex) {
@@ -171,10 +164,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     }
   }
 
-  @Override
-  public void setBeanFactory(BeanFactory<K, T> beanFactory) {
-    this.beanFactory = beanFactory;
-  }
 
   @Override
   public BeanFactory<K, T> getBeanFactory() {
@@ -182,6 +171,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
   }
 
   @Override
+  public void setBeanFactory(BeanFactory<K, T> beanFactory) {
+    this.beanFactory = beanFactory;
+  }
+
+  @Override
   public void close() {
     this.cassandraSerializer.close();
   }
@@ -229,7 +223,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
   @Override
   public Query<K, T> newQuery() {
     Query<K, T> query = new CassandraQuery(this);
-    query.setFields(mapping.getFieldNames());
     return query;
   }
 
@@ -262,4 +255,13 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
     return cassandraSerializer.schemaExists();
   }
 
+  public enum SerializerType {
+    AVRO("AVRO"), NATIVE("NATIVE");
+    String val;
+
+    SerializerType(String v) {
+      this.val = v;
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
index 95e1c0f..bb758f6 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
@@ -28,13 +28,13 @@ public class CassandraStoreParameters {
    */
   public static final String CASSANDRA_SERVERS = "gora.cassandrastore.cassandraServers";
   /**
-   *Property pointing to the Cassandra keyspace.
+   * Property pointing to the Cassandra keyspace.
    * string
    */
   public static final String KEYSPACE = "gora.cassandrastore.keyspace";
   /**
-   *  Property pointing to the port to use to connect to the Cassandra hosts.
-   *  integer
+   * Property pointing to the port to use to connect to the Cassandra hosts.
+   * integer
    */
   public static final String PORT = "gora.cassandrastore.port";
 
@@ -97,7 +97,7 @@ public class CassandraStoreParameters {
    * Property pointing to set local host new connection threshold.
    * integer
    */
-  public static final String LOCAL_NEW_CONNECTION_THRESHOLD= "gora.cassandrastore.localNewConnectionThreshold";
+  public static final String LOCAL_NEW_CONNECTION_THRESHOLD = "gora.cassandrastore.localNewConnectionThreshold";
   /**
    * Property pointing to set remote host new connection threshold.
    * integer

http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
index e6d0176..2cd9003 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
@@ -5,15 +5,16 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /**
  * This package contains all the Cassandra store related classes.
  */