You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by rm...@apache.org on 2014/03/07 11:32:39 UTC
svn commit: r1575224 - in /gora/trunk:
gora-cassandra/src/main/java/org/apache/gora/cassandra/store/
gora-cassandra/src/test/conf/ gora-core/src/main/java/org/apache/gora/store/
gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/
Author: rmarroquin
Date: Fri Mar 7 10:32:38 2014
New Revision: 1575224
URL: http://svn.apache.org/r1575224
Log:
GORA-167
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/trunk/gora-cassandra/src/test/conf/gora.properties
gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Mar 7 10:32:38 2014
@@ -18,12 +18,16 @@
package org.apache.gora.cassandra.store;
+import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -47,53 +51,80 @@ import me.prettyprint.hector.api.query.R
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Serializer;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.query.CassandraQuery;
-import org.apache.gora.cassandra.serializers.GenericArraySerializer;
import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
-import org.apache.gora.cassandra.serializers.TypeUtils;
import org.apache.gora.mapreduce.GoraRecordReader;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.persistency.State;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.query.Query;
-import org.apache.gora.util.ByteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * CassandraClient is where all of the primary datastore functionality is
+ * executed. Typically CassandraClient is invoked by calling
+ * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}.
+ * CassandraClient deals with Cassandra data model definition, mutation,
+ * and general/specific mappings.
+ * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
+ *
+ * @param <K>
+ * @param <T>
+ */
public class CassandraClient<K, T extends PersistentBase> {
+
+ /** The logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
-
+
private Cluster cluster;
private Keyspace keyspace;
private Mutator<K> mutator;
private Class<K> keyClass;
private Class<T> persistentClass;
-
+
+ /** Object containing the XML mapping for Cassandra. */
private CassandraMapping cassandraMapping = null;
+ /** Hector client default column family consistency level. */
+ public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
+
+ /** Cassandra serializer to be used for serializing Gora's keys. */
private Serializer<K> keySerializer;
-
+
+ /**
+ * Given our key, persistentClass from
+ * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
+ * we make best efforts to dictate our data model.
+ * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String)
+ * to see if our keyspace has already been invented, this simple check prevents us from
+ * recreating the keyspace if it already exists.
+ * We then simple specify (based on the input keyclass) an appropriate serializer
+ * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
+ * defining a mutator from and by which we can mutate this object.
+ * @param keyClass the Key by which we wish o assign a record object
+ * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data.
+ * @throws Exception
+ */
public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception {
this.keyClass = keyClass;
// get cassandra mapping with persistent class
this.persistentClass = persistentClass;
this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass);
- // LOG.info("persistentClass=" + persistentClass.getName() + " -> cassandraMapping=" + cassandraMapping);
- this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
-
+ this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(),
+ new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+
// add keyspace to cluster
checkKeyspace();
-
- // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
+
+ // Just create a Keyspace object on the client side, corresponding to an
+ // already existing keyspace with already created column families.
this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
-
+
this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
}
@@ -105,43 +136,53 @@ public class CassandraClient<K, T extend
KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
return (keyspaceDefinition != null);
}
-
+
/**
* Check if keyspace already exists. If not, create it.
- * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel}
- * logic. It is set by passing a ConfigurableConsistencyLevel object right
- * when the Keyspace is created. Currently consistency level is .ONE which
- * permits consistency to wait until one replica has responded.
+ * In this method, we also utilize Hector's
+ * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel}
+ * logic.
+ * It is set by passing a
+ * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right
+ * when the {@link me.prettyprint.hector.api.Keyspace} is created.
+ * If we cannot find a consistency level within <code>gora.properites</code>,
+ * then column family consistency level is set to QUORUM (by default) which permits
+ * consistency to wait for a quorum of replicas to respond regardless of data center.
+ * QUORUM is Hector Client's default setting and we respect that here as well.
+ *
+ * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html
*/
public void checkKeyspace() {
// "describe keyspace <keyspaceName>;" query
KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
if (keyspaceDefinition == null) {
- List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
+ List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
// GORA-197
for (ColumnFamilyDefinition cfDef : columnFamilyDefinitions) {
cfDef.setComparatorType(ComparatorType.BYTESTYPE);
}
- keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
+ keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(),
+ "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
this.cluster.addKeyspace(keyspaceDefinition, true);
- // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
-
- // Create a customized Consistency Level
- ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
- Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
-
- // Define CL.ONE for ColumnFamily "ColumnFamily"
- clmap.put("ColumnFamily", HConsistencyLevel.ONE);
-
- // In this we use CL.ONE for read and writes. But you can use different CLs if needed.
- configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);
- configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);
- // Then let the keyspace know
- HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel);
+ // GORA-167 Create a customized Consistency Level
+ ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
+ Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions);
+ // Column family consistency levels
+ ccl.setReadCfConsistencyLevels(clmap);
+ ccl.setWriteCfConsistencyLevels(clmap);
+ // Operations consistency levels
+ String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+ ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+ LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'.");
+ opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+ ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+ LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'.");
+ // Then let the keyspace know
+ HFactory.createKeyspace("Keyspace", this.cluster, ccl);
keyspaceDefinition = null;
}
else {
@@ -155,7 +196,7 @@ public class CassandraClient<K, T extend
if (! comparatorType.equals(ComparatorType.BYTESTYPE)) {
// GORA-197
LOG.warn("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName()
- + ", not BytesType. It may cause a fatal error on column validation later.");
+ + ", not BytesType. It may cause a fatal error on column validation later.");
}
else {
// LOG.info("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName() + ".");
@@ -164,7 +205,23 @@ public class CassandraClient<K, T extend
}
}
}
-
+
+ /**
+ * Method in charge of setting the consistency level for defined column families.
+ * @param pColFams Column families
+ * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level.
+ */
+ private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) {
+ Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>();
+ // Get columnFamily consistency level.
+ String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+ LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'.");
+ // Define consistency for ColumnFamily "ColumnFamily"
+ for (ColumnFamilyDefinition colFamDef : pColFams)
+ clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl));
+ return clMap;
+ }
+
/**
* Drop keyspace.
*/
@@ -192,7 +249,7 @@ public class CassandraClient<K, T extend
LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString());
return;
}
-
+
synchronized(mutator) {
HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer);
}
@@ -205,17 +262,16 @@ public class CassandraClient<K, T extend
* @param columnName the column name (the member name, or the index of array)
* @param value the member value
*/
- @SuppressWarnings("unchecked")
public void addSubColumn(K key, String fieldName, ByteBuffer columnName, Object value) {
if (value == null) {
return;
}
ByteBuffer byteBuffer = toByteBuffer(value);
-
+
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String superColumnName = this.cassandraMapping.getColumn(fieldName);
-
+
synchronized(mutator) {
HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer);
}
@@ -250,12 +306,11 @@ public class CassandraClient<K, T extend
* @param fieldName the field name
* @param columnName the column name (the member name, or the index of array)
*/
- @SuppressWarnings("unchecked")
public void deleteSubColumn(K key, String fieldName, ByteBuffer columnName) {
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String superColumnName = this.cassandraMapping.getColumn(fieldName);
-
+
synchronized(mutator) {
HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, columnName);
}
@@ -266,8 +321,7 @@ public class CassandraClient<K, T extend
}
- @SuppressWarnings("unchecked")
- public void addGenericArray(K key, String fieldName, GenericArray array) {
+ public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
int i= 0;
for (Object itemValue: array) {
@@ -291,7 +345,6 @@ public class CassandraClient<K, T extend
}
}
- @SuppressWarnings("unchecked")
public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) {
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
int i= 0;
@@ -350,7 +403,7 @@ public class CassandraClient<K, T extend
* @return a list of family rows
*/
public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
-
+
String[] columnNames = cassandraQuery.getColumns(family);
ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
@@ -363,59 +416,61 @@ public class CassandraClient<K, T extend
}
K startKey = query.getStartKey();
K endKey = query.getEndKey();
-
- RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery =
+ HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSlicesQuery.setColumnFamily(family);
rangeSlicesQuery.setKeys(startKey, endKey);
rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
rangeSlicesQuery.setRowCount(limit);
rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
-
+
QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
-
-
+
+
return orderedRows.getList();
}
-
+
private String getMappingFamily(String pField){
String family = null;
// TODO checking if it was a UNION field the one we are retrieving
- family = this.cassandraMapping.getFamily(pField);
+ family = this.cassandraMapping.getFamily(pField);
return family;
}
-
+
private String getMappingColumn(String pField){
String column = null;
// TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
- column = this.cassandraMapping.getColumn(pField);
+ column = this.cassandraMapping.getColumn(pField);
return column;
}
/**
* Select the families that contain at least one column mapped to a query field.
* @param query indicates the columns to select
- * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
+ * @return a map which keys are the family names and values the corresponding column
+ * names required to get all the query fields.
*/
public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
Map<String, List<String>> map = new HashMap<String, List<String>>();
for (String field: query.getFields()) {
String family = this.getMappingFamily(field);
String column = this.getMappingColumn(field);
-
+
// check if the family value was already initialized
List<String> list = map.get(family);
if (list == null) {
list = new ArrayList<String>();
map.put(family, list);
}
-
+
if (column != null) {
list.add(column);
}
-
+
}
-
+
return map;
}
@@ -426,7 +481,7 @@ public class CassandraClient<K, T extend
public CassandraMapping getCassandraMapping(){
return this.cassandraMapping;
}
-
+
/**
* Select the field names according to the column names, which format if fully qualified: "family:column"
* @param query
@@ -437,13 +492,13 @@ public class CassandraClient<K, T extend
for (String field: query.getFields()) {
String family = this.getMappingFamily(field);
String column = this.getMappingColumn(field);
-
+
map.put(family + ":" + column, field);
}
-
+
return map;
}
-
+
public boolean isSuper(String family) {
return this.cassandraMapping.isSuper(family);
}
@@ -457,20 +512,20 @@ public class CassandraClient<K, T extend
}
K startKey = query.getStartKey();
K endKey = query.getEndKey();
-
- RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
+
+ RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery =
+ HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(),
+ ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSuperSlicesQuery.setColumnFamily(family);
rangeSuperSlicesQuery.setKeys(startKey, endKey);
rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
rangeSuperSlicesQuery.setRowCount(limit);
rangeSuperSlicesQuery.setColumnNames(columnNames);
-
-
+
+
QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
return orderedRows.getList();
-
-
}
/**
@@ -478,6 +533,6 @@ public class CassandraClient<K, T extend
* @return Keyspace
*/
public String getKeyspaceName() {
- return this.cassandraMapping.getKeyspaceName();
+ return this.cassandraMapping.getKeyspaceName();
}
}
Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Mar 7 10:32:38 2014
@@ -55,6 +55,7 @@ import org.apache.gora.query.PartitionQu
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.DataStoreBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,13 +71,27 @@ public class CassandraStore<K, T extends
/** Logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
- private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
+ /** Consistency property level for Cassandra column families */
+ private static final String COL_FAM_CL = "cf.consistency.level";
+
+ /** Consistency property level for Cassandra read operations. */
+ private static final String READ_OP_CL = "read.consistency.level";
+
+ /** Consistency property level for Cassandra write operations. */
+ private static final String WRITE_OP_CL = "write.consistency.level";
+
+ /** Variables to hold different consistency levels defined by the properties. */
+ public static String colFamConsLvl;
+ public static String readOpConsLvl;
+ public static String writeOpConsLvl;
+
+ private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
/**
* Default schema index used when AVRO Union data types are stored
*/
public static int DEFAULT_UNION_SCHEMA = 0;
-
+
/**
* The values are Avro fields pending to be stored.
*
@@ -99,6 +114,14 @@ public class CassandraStore<K, T extends
public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
try {
super.initialize(keyClass, persistent, properties);
+ if (autoCreateSchema) {
+ // If this is not set, then each Cassandra client should set its default
+ // column family
+ colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null);
+ // operations
+ readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null);
+ writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null);
+ }
this.cassandraClient.initialize(keyClass, persistent);
} catch (Exception e) {
LOG.error(e.getMessage());
@@ -181,7 +204,7 @@ public class CassandraStore<K, T extends
* partitioned across nodes based on row Key.
*/
private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
- CassandraResultSet cassandraResultSet) {
+ CassandraResultSet<K> cassandraResultSet) {
// select family columns that are included in the query
List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family);
@@ -474,6 +497,10 @@ public class CassandraStore<K, T extends
return 0;
}
+ /**
+ * Checks to see if a Cassandra Keyspace actually exists.
+ * Returns true if it does.
+ */
@Override
public boolean schemaExists() {
LOG.info("schema exists");
Modified: gora/trunk/gora-cassandra/src/test/conf/gora.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/test/conf/gora.properties?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/test/conf/gora.properties (original)
+++ gora/trunk/gora-cassandra/src/test/conf/gora.properties Fri Mar 7 10:32:38 2014
@@ -14,16 +14,10 @@
# limitations under the License.
gora.datastore.default=org.apache.gora.cassandra.CassandraStore
-gora.cassandrastore.keyspace=
-gora.cassandrastore.name=
-gora.cassandrastore.class=
-gora.cassandrastore.qualifier=
-gora.cassandrastore.family=
-gora.cassandrastore.type=
-gora.cassandraStore.cluster=Test Cluster
-gora.cassandraStore.host=localhost
-
-
-
-
-
+gora.cassandrastore.cluster=Test Cluster
+gora.cassandrastore.host=localhost
+# property is annotated in CassandraClient#checkKeyspace()
+# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL.
+gora.cassandrastore.cf.consistency.level=ONE
+gora.cassandrastore.read.consistency.level=QUORUM
+gora.cassandrastore.write.consistency.level=ONE
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java Fri Mar 7 10:32:38 2014
@@ -333,7 +333,7 @@ public class DataStoreFactory{
String val = findProperty(properties, store, baseKey, null);
if(val == null) {
throw new IOException("Property with base name \""+baseKey+"\" could not be found, make " +
- "sure to include this property in gora.properties file");
+ "sure to include this property in gora.properties file");
}
return val;
}
Modified: gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java?rev=1575224&r1=1575223&r2=1575224&view=diff
==============================================================================
--- gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java (original)
+++ gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java Fri Mar 7 10:32:38 2014
@@ -36,6 +36,7 @@ import org.apache.gora.persistency.Persi
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.ws.impl.WSDataStoreBase;
import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
@@ -161,11 +162,11 @@ public class DynamoDBStore<K, T extends
LOG.debug("Initializing DynamoDB store");
getCredentials();
setWsProvider(wsProvider);
- preferredSchema = properties.getProperty(PREF_SCH_NAME);
- dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf());
- dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP));
+ preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null);
+ dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP, null),(AWSCredentials)getConf());
+ dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP, null));
mapping = readMapping();
- consistency = properties.getProperty(CONSISTENCY_READS);
+ consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null);
persistentClass = pPersistentClass;
}
catch (Exception e) {