You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2014/03/07 11:32:39 UTC

svn commit: r1575224 - in /gora/trunk: gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-cassandra/src/test/conf/ gora-core/src/main/java/org/apache/gora/store/ gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/

Author: rmarroquin
Date: Fri Mar  7 10:32:38 2014
New Revision: 1575224

URL: http://svn.apache.org/r1575224
Log:
GORA-167

Modified:
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/trunk/gora-cassandra/src/test/conf/gora.properties
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
    gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Mar  7 10:32:38 2014
@@ -18,12 +18,16 @@
 
 package org.apache.gora.cassandra.store;
 
+import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -47,53 +51,80 @@ import me.prettyprint.hector.api.query.R
 import me.prettyprint.hector.api.HConsistencyLevel;
 import me.prettyprint.hector.api.Serializer;
 
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
-import org.apache.gora.cassandra.serializers.GenericArraySerializer;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
-import org.apache.gora.cassandra.serializers.TypeUtils;
 import org.apache.gora.mapreduce.GoraRecordReader;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.persistency.State;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.query.Query;
-import org.apache.gora.util.ByteUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * CassandraClient is where all of the primary datastore functionality is 
+ * executed. Typically CassandraClient is invoked by calling 
+ * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}.
+ * CassandraClient deals with Cassandra data model definition, mutation, 
+ * and general/specific mappings.
+ * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} 
+ *
+ * @param <K>
+ * @param <T>
+ */
 public class CassandraClient<K, T extends PersistentBase> {
+
+  /** The logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
-  
+
   private Cluster cluster;
   private Keyspace keyspace;
   private Mutator<K> mutator;
   private Class<K> keyClass;
   private Class<T> persistentClass;
-  
+
+  /** Object containing the XML mapping for Cassandra. */
   private CassandraMapping cassandraMapping = null;
 
+  /** Hector client default column family consistency level. */
+  public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
+
+  /** Cassandra serializer to be used for serializing Gora's keys. */
   private Serializer<K> keySerializer;
-  
+
+  /**
+   * Given our key, persistentClass from 
+   * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
+   * we make best efforts to dictate our data model. 
+   * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String)
+   * to see if our keyspace has already been invented, this simple check prevents us from 
+   * recreating the keyspace if it already exists. 
+   * We then simple specify (based on the input keyclass) an appropriate serializer
+   * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
+   * defining a mutator from and by which we can mutate this object.
+   * @param keyClass the Key by which we wish o assign a record object
+   * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data.
+   * @throws Exception
+   */
   public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception {
     this.keyClass = keyClass;
 
     // get cassandra mapping with persistent class
     this.persistentClass = persistentClass;
     this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass);
-    // LOG.info("persistentClass=" + persistentClass.getName() + " -> cassandraMapping=" + cassandraMapping);
 
-    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
-    
+    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), 
+        new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+
     // add keyspace to cluster
     checkKeyspace();
-    
-    // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
+
+    // Just create a Keyspace object on the client side, corresponding to an 
+    // already existing keyspace with already created column families.
     this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
-    
+
     this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
     this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
   }
@@ -105,43 +136,53 @@ public class CassandraClient<K, T extend
     KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
     return (keyspaceDefinition != null);
   }
-  
+
   /**
    * Check if keyspace already exists. If not, create it.
-   * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel}
-   * logic. It is set by passing a ConfigurableConsistencyLevel object right 
-   * when the Keyspace is created. Currently consistency level is .ONE which 
-   * permits consistency to wait until one replica has responded. 
+   * In this method, we also utilize Hector's 
+   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel}
+   * logic. 
+   * It is set by passing a 
+   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right 
+   * when the {@link me.prettyprint.hector.api.Keyspace} is created. 
+   * If we cannot find a consistency level within <code>gora.properites</code>, 
+   * then column family consistency level is set to QUORUM (by default) which permits 
+   * consistency to wait for a quorum of replicas to respond regardless of data center.
+   * QUORUM is Hector Client's default setting and we respect that here as well.
+   * 
+   * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html
    */
   public void checkKeyspace() {
     // "describe keyspace <keyspaceName>;" query
     KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
     if (keyspaceDefinition == null) {
-      List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();      
+      List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
 
       // GORA-197
       for (ColumnFamilyDefinition cfDef : columnFamilyDefinitions) {
         cfDef.setComparatorType(ComparatorType.BYTESTYPE);
       }
 
-      keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);      
+      keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), 
+          "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
       this.cluster.addKeyspace(keyspaceDefinition, true);
-      // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
-      
-      // Create a customized Consistency Level
-      ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
-      Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
-
-      // Define CL.ONE for ColumnFamily "ColumnFamily"
-      clmap.put("ColumnFamily", HConsistencyLevel.ONE);
-
-      // In this we use CL.ONE for read and writes. But you can use different CLs if needed.
-      configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);
-      configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);
 
-      // Then let the keyspace know
-      HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel);
+      // GORA-167 Create a customized Consistency Level
+      ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
+      Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions);
+      // Column family consistency levels
+      ccl.setReadCfConsistencyLevels(clmap);
+      ccl.setWriteCfConsistencyLevels(clmap);
+      // Operations consistency levels
+      String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+      ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+      LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'.");
+      opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+      ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+      LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'.");
 
+      // Then let the keyspace know
+      HFactory.createKeyspace("Keyspace", this.cluster, ccl);
       keyspaceDefinition = null;
     }
     else {
@@ -155,7 +196,7 @@ public class CassandraClient<K, T extend
           if (! comparatorType.equals(ComparatorType.BYTESTYPE)) {
             // GORA-197
             LOG.warn("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName()
-                   + ", not BytesType. It may cause a fatal error on column validation later.");
+                + ", not BytesType. It may cause a fatal error on column validation later.");
           }
           else {
             // LOG.info("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName() + ".");
@@ -164,7 +205,23 @@ public class CassandraClient<K, T extend
       }
     }
   }
-  
+
+  /**
+   * Method in charge of setting the consistency level for defined column families.
+   * @param pColFams  Column families
+   * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level.
+   */
+  private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) {
+    Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>();
+    // Get columnFamily consistency level.
+    String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+    LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'.");
+    // Define consistency for ColumnFamily "ColumnFamily"
+    for (ColumnFamilyDefinition colFamDef : pColFams)
+      clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl));
+    return clMap;
+  }
+
   /**
    * Drop keyspace.
    */
@@ -192,7 +249,7 @@ public class CassandraClient<K, T extend
       LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString());
       return;
     }
-    
+
     synchronized(mutator) {
       HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer);
     }
@@ -205,17 +262,16 @@ public class CassandraClient<K, T extend
    * @param columnName the column name (the member name, or the index of array)
    * @param value the member value
    */
-  @SuppressWarnings("unchecked")
   public void addSubColumn(K key, String fieldName, ByteBuffer columnName, Object value) {
     if (value == null) {
       return;
     }
 
     ByteBuffer byteBuffer = toByteBuffer(value);
-    
+
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String superColumnName = this.cassandraMapping.getColumn(fieldName);
-    
+
     synchronized(mutator) {
       HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer);
     }
@@ -250,12 +306,11 @@ public class CassandraClient<K, T extend
    * @param fieldName the field name
    * @param columnName the column name (the member name, or the index of array)
    */
-  @SuppressWarnings("unchecked")
   public void deleteSubColumn(K key, String fieldName, ByteBuffer columnName) {
 
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String superColumnName = this.cassandraMapping.getColumn(fieldName);
-    
+
     synchronized(mutator) {
       HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, columnName);
     }
@@ -266,8 +321,7 @@ public class CassandraClient<K, T extend
   }
 
 
-  @SuppressWarnings("unchecked")
-  public void addGenericArray(K key, String fieldName, GenericArray array) {
+  public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
     if (isSuper( cassandraMapping.getFamily(fieldName) )) {
       int i= 0;
       for (Object itemValue: array) {
@@ -291,7 +345,6 @@ public class CassandraClient<K, T extend
     }
   }
 
-  @SuppressWarnings("unchecked")
   public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) {
     if (isSuper( cassandraMapping.getFamily(fieldName) )) {
       int i= 0;
@@ -350,7 +403,7 @@ public class CassandraClient<K, T extend
    * @return a list of family rows
    */
   public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
-    
+
     String[] columnNames = cassandraQuery.getColumns(family);
     ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
     for (int i = 0; i < columnNames.length; i++) {
@@ -363,59 +416,61 @@ public class CassandraClient<K, T extend
     }
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
-    
-    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+
+    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = 
+        HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSlicesQuery.setColumnFamily(family);
     rangeSlicesQuery.setKeys(startKey, endKey);
     rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
     rangeSlicesQuery.setRowCount(limit);
     rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
-    
+
     QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
     OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
-    
-    
+
+
     return orderedRows.getList();
   }
-  
+
   private String getMappingFamily(String pField){
     String family = null;
     // TODO checking if it was a UNION field the one we are retrieving
-      family = this.cassandraMapping.getFamily(pField);
+    family = this.cassandraMapping.getFamily(pField);
     return family;
   }
-  
+
   private String getMappingColumn(String pField){
     String column = null;
     // TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
-      column = this.cassandraMapping.getColumn(pField);
+    column = this.cassandraMapping.getColumn(pField);
     return column;
   }
 
   /**
    * Select the families that contain at least one column mapped to a query field.
    * @param query indicates the columns to select
-   * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
+   * @return a map which keys are the family names and values the corresponding column 
+   * names required to get all the query fields.
    */
   public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
     Map<String, List<String>> map = new HashMap<String, List<String>>();
     for (String field: query.getFields()) {
       String family = this.getMappingFamily(field);
       String column = this.getMappingColumn(field);
-      
+
       // check if the family value was already initialized 
       List<String> list = map.get(family);
       if (list == null) {
         list = new ArrayList<String>();
         map.put(family, list);
       }
-      
+
       if (column != null) {
         list.add(column);
       }
-      
+
     }
-    
+
     return map;
   }
 
@@ -426,7 +481,7 @@ public class CassandraClient<K, T extend
   public CassandraMapping getCassandraMapping(){
     return this.cassandraMapping;
   }
-  
+
   /**
    * Select the field names according to the column names, which format if fully qualified: "family:column"
    * @param query
@@ -437,13 +492,13 @@ public class CassandraClient<K, T extend
     for (String field: query.getFields()) {
       String family = this.getMappingFamily(field);
       String column = this.getMappingColumn(field);
-      
+
       map.put(family + ":" + column, field);
     }
-    
+
     return map;
   }
-  
+
   public boolean isSuper(String family) {
     return this.cassandraMapping.isSuper(family);
   }
@@ -457,20 +512,20 @@ public class CassandraClient<K, T extend
     }
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
-    
-    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
+
+    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = 
+        HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), 
+            ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSuperSlicesQuery.setColumnFamily(family);    
     rangeSuperSlicesQuery.setKeys(startKey, endKey);
     rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
     rangeSuperSlicesQuery.setRowCount(limit);
     rangeSuperSlicesQuery.setColumnNames(columnNames);
-    
-    
+
+
     QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
     OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
     return orderedRows.getList();
-
-
   }
 
   /**
@@ -478,6 +533,6 @@ public class CassandraClient<K, T extend
    * @return Keyspace
    */
   public String getKeyspaceName() {
-	return this.cassandraMapping.getKeyspaceName();
+    return this.cassandraMapping.getKeyspaceName();
   }
 }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Mar  7 10:32:38 2014
@@ -55,6 +55,7 @@ import org.apache.gora.query.PartitionQu
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,13 +71,27 @@ public class CassandraStore<K, T extends
   /** Logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
 
-  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
+  /** Consistency property level for Cassandra column families */
+  private static final String COL_FAM_CL = "cf.consistency.level";
+
+  /** Consistency property level for Cassandra read operations. */
+  private static final String READ_OP_CL = "read.consistency.level";
+
+  /** Consistency property level for Cassandra write operations. */
+  private static final String WRITE_OP_CL = "write.consistency.level";
+
+  /** Variables to hold different consistency levels defined by the properties. */
+  public static String colFamConsLvl;
+  public static String readOpConsLvl;
+  public static String writeOpConsLvl;
+
+  private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
 
   /**
    * Default schema index used when AVRO Union data types are stored
    */
   public static int DEFAULT_UNION_SCHEMA = 0;
-
+  
   /**
    * The values are Avro fields pending to be stored.
    *
@@ -99,6 +114,14 @@ public class CassandraStore<K, T extends
   public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
     try {
       super.initialize(keyClass, persistent, properties);
+      if (autoCreateSchema) {
+        // If this is not set, then each Cassandra client should set its default
+        // column family
+        colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null);
+        // operations
+        readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null);
+        writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null);
+      }
       this.cassandraClient.initialize(keyClass, persistent);
     } catch (Exception e) {
       LOG.error(e.getMessage());
@@ -181,7 +204,7 @@ public class CassandraStore<K, T extends
    * partitioned across nodes based on row Key.
    */
   private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
-      CassandraResultSet cassandraResultSet) {
+      CassandraResultSet<K> cassandraResultSet) {
     // select family columns that are included in the query
     List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
 
@@ -474,6 +497,10 @@ public class CassandraStore<K, T extends
     return 0;
   }
 
+  /**
+   * Checks to see if a Cassandra Keyspace actually exists.
+   * Returns true if it does.
+   */
   @Override
   public boolean schemaExists() {
     LOG.info("schema exists");

Modified: gora/trunk/gora-cassandra/src/test/conf/gora.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/gora.properties?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/gora.properties (original)
+++ gora/trunk/gora-cassandra/src/test/conf/gora.properties Fri Mar  7 10:32:38 2014
@@ -14,16 +14,10 @@
 # limitations under the License.
 
 gora.datastore.default=org.apache.gora.cassandra.CassandraStore
-gora.cassandrastore.keyspace=
-gora.cassandrastore.name=
-gora.cassandrastore.class=
-gora.cassandrastore.qualifier=
-gora.cassandrastore.family=
-gora.cassandrastore.type=
-gora.cassandraStore.cluster=Test Cluster
-gora.cassandraStore.host=localhost
-
-
-
-
-
+gora.cassandrastore.cluster=Test Cluster
+gora.cassandrastore.host=localhost
+# property is annotated in CassandraClient#checkKeyspace()
+# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL. 
+gora.cassandrastore.cf.consistency.level=ONE
+gora.cassandrastore.read.consistency.level=QUORUM
+gora.cassandrastore.write.consistency.level=ONE

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java Fri Mar  7 10:32:38 2014
@@ -333,7 +333,7 @@ public class DataStoreFactory{
     String val = findProperty(properties, store, baseKey, null);
     if(val == null) {
       throw new IOException("Property with base name \""+baseKey+"\" could not be found, make " +
-      		"sure to include this property in gora.properties file");
+          "sure to include this property in gora.properties file");
     }
     return val;
   }

Modified: gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java (original)
+++ gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java Fri Mar  7 10:32:38 2014
@@ -36,6 +36,7 @@ import org.apache.gora.persistency.Persi
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.ws.impl.WSDataStoreBase;
 import org.apache.gora.util.GoraException;
 import org.slf4j.Logger;
@@ -161,11 +162,11 @@ public class DynamoDBStore<K, T extends 
       LOG.debug("Initializing DynamoDB store");
       getCredentials();
       setWsProvider(wsProvider);
-      preferredSchema = properties.getProperty(PREF_SCH_NAME);
-      dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf());
-      dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP));
+      preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null);
+      dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP, null),(AWSCredentials)getConf());
+      dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP, null));
       mapping = readMapping();
-      consistency = properties.getProperty(CONSISTENCY_READS);
+      consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null);
       persistentClass = pPersistentClass;
     }
     catch (Exception e) {