You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/08/15 18:51:01 UTC
[1/2] git commit: forward port of Make Cassandra keyspace consistency
configurable within gora.properties
Repository: gora
Updated Branches:
refs/heads/master dc76da920 -> 3cacf2a25
forward port of Make Cassandra keyspace consistency configurable within gora.properties
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/68302e21
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/68302e21
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/68302e21
Branch: refs/heads/master
Commit: 68302e213747700ca38cef546f05d0f15a3e99b4
Parents: dc76da9
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Fri Aug 15 09:49:14 2014 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Fri Aug 15 09:49:14 2014 -0700
----------------------------------------------------------------------
.../gora/cassandra/store/CassandraClient.java | 131 ++++++++++++++-----
.../gora/cassandra/store/CassandraStore.java | 25 +++-
gora-cassandra/src/test/conf/gora.properties | 15 +--
.../gora/dynamodb/store/DynamoDBStore.java | 8 +-
4 files changed, 133 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index 1d56e32..9e8cd7b 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -18,11 +18,16 @@
package org.apache.gora.cassandra.store;
+import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -57,7 +62,20 @@ import org.apache.gora.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * CassandraClient is where all of the primary datastore functionality is
+ * executed. Typically CassandraClient is invoked by calling
+ * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}.
+ * CassandraClient deals with Cassandra data model definition, mutation,
+ * and general/specific mappings.
+ * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
+ *
+ * @param <K>
+ * @param <T>
+ */
public class CassandraClient<K, T extends PersistentBase> {
+
+ /** The logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
private Cluster cluster;
@@ -66,10 +84,29 @@ public class CassandraClient<K, T extends PersistentBase> {
private Class<K> keyClass;
private Class<T> persistentClass;
+ /** Object which holds the XML mapping for Cassandra. */
private CassandraMapping cassandraMapping = null;
+ /** Hector client default column family consistency level. */
+ public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
+
+ /** Cassandra serializer to be used for serializing Gora's keys. */
private Serializer<K> keySerializer;
+ /**
+ * Given our key, persistentClass from
+ * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
+ * we make best efforts to dictate our data model.
+ * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String)
+ * to see if our keyspace has already been invented, this simple check prevents us from
+ * recreating the keyspace if it already exists.
+ * We then simple specify (based on the input keyclass) an appropriate serializer
+ * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
+ * defining a mutator from and by which we can mutate this object.
+ * @param keyClass the Key by which we wish o assign a record object
+ * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data.
+ * @throws Exception
+ */
public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception {
this.keyClass = keyClass;
@@ -77,12 +114,14 @@ public class CassandraClient<K, T extends PersistentBase> {
this.persistentClass = persistentClass;
this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass);
- this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+ this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(),
+ new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
// add keyspace to cluster
checkKeyspace();
- // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
+ // Just create a Keyspace object on the client side, corresponding to an already
+ // existing keyspace with already created column families.
this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
@@ -99,10 +138,17 @@ public class CassandraClient<K, T extends PersistentBase> {
/**
* Check if keyspace already exists. If not, create it.
- * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel}
- * logic. It is set by passing a ConfigurableConsistencyLevel object right
- * when the Keyspace is created. Currently consistency level is .ONE which
- * permits consistency to wait until one replica has responded.
+ * In this method, we also utilize Hector's
+ * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} logic.
+ * It is set by passing a
+ * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right
+ * when the {@link me.prettyprint.hector.api.Keyspace} is created.
+ * If we cannot find a consistency level within <code>gora.properites</code>,
+ * then column family consistency level is set to QUORUM (by default) which permits
+ * consistency to wait for a quorum of replicas to respond regardless of data center.
+ * QUORUM is Hector Client's default setting and we respect that here as well.
+ *
+ * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html
*/
public void checkKeyspace() {
// "describe keyspace <keyspaceName>;" query
@@ -116,29 +162,29 @@ public class CassandraClient<K, T extends PersistentBase> {
}
keyspaceDefinition = HFactory.createKeyspaceDefinition(
- this.cassandraMapping.getKeyspaceName(),
+ this.cassandraMapping.getKeyspaceName(),
this.cassandraMapping.getKeyspaceReplicationStrategy(),
this.cassandraMapping.getKeyspaceReplicationFactor(),
columnFamilyDefinitions
);
this.cluster.addKeyspace(keyspaceDefinition, true);
- // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
- // Create a customized Consistency Level
- ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
- Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
-
- // Define CL.ONE for ColumnFamily "ColumnFamily"
- clmap.put("ColumnFamily", HConsistencyLevel.ONE);
-
- // In this we use CL.ONE for read and writes. But you can use different CLs if needed.
- configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);
- configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);
-
- // Then let the keyspace know
- HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel);
-
+ // GORA-167 Create a customized Consistency Level
+ ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
+ Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions);
+ // Column family consistency levels
+ ccl.setReadCfConsistencyLevels(clmap);
+ ccl.setWriteCfConsistencyLevels(clmap);
+ // Operations consistency levels
+ String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+ ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+ LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'.");
+ opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+ ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+ LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'.");
+
+ HFactory.createKeyspace("Keyspace", this.cluster, ccl);
keyspaceDefinition = null;
}
else {
@@ -164,6 +210,22 @@ public class CassandraClient<K, T extends PersistentBase> {
}
/**
+ * Method in charge of setting the consistency level for defined column families.
+ * @param pColFams Column families
+ * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level.
+ */
+ private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) {
+ Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>();
+ // Get columnFamily consistency level.
+ String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+ LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'.");
+ // Define consistency for ColumnFamily "ColumnFamily"
+ for (ColumnFamilyDefinition colFamDef : pColFams)
+ clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl));
+ return clMap;
+ }
+
+ /**
* Drop keyspace.
*/
public void dropKeyspace() {
@@ -259,10 +321,10 @@ public class CassandraClient<K, T extends PersistentBase> {
/**
* Adds an subColumn inside the cassandraMapping file when a String is serialized
- * @param key
- * @param fieldName
- * @param columnName
- * @param value
+ * @param key the row key
+ * @param fieldName the field name
+ * @param columnName the column name (the member name, or the index of array)
+ * @param value the member value
*/
public void addSubColumn(K key, String fieldName, String columnName, Object value) {
addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
@@ -270,10 +332,10 @@ public class CassandraClient<K, T extends PersistentBase> {
/**
* Adds an subColumn inside the cassandraMapping file when an Integer is serialized
- * @param key
- * @param fieldName
- * @param columnName
- * @param value
+ * @param key the row key
+ * @param fieldName the field name
+ * @param columnName the column name (the member name, or the index of array)
+ * @param value the member value
*/
public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
@@ -323,6 +385,7 @@ public class CassandraClient<K, T extends PersistentBase> {
//TODO Verify this. Everything that goes inside a genericArray will go inside a column so let's just delete that.
deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName));
}
+
public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
int i= 0;
@@ -426,7 +489,8 @@ public class CassandraClient<K, T extends PersistentBase> {
K startKey = query.getStartKey();
K endKey = query.getEndKey();
- RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery
+ (this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSlicesQuery.setColumnFamily(family);
rangeSlicesQuery.setKeys(startKey, endKey);
rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -536,7 +600,8 @@ public class CassandraClient<K, T extends PersistentBase> {
K startKey = query.getStartKey();
K endKey = query.getEndKey();
- RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
+ RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery
+ (this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSuperSlicesQuery.setColumnFamily(family);
rangeSuperSlicesQuery.setKeys(startKey, endKey);
rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -556,6 +621,6 @@ public class CassandraClient<K, T extends PersistentBase> {
* @return Keyspace
*/
public String getKeyspaceName() {
- return this.cassandraMapping.getKeyspaceName();
+ return this.cassandraMapping.getKeyspaceName();
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index ffb4af0..496f1f0 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -60,6 +60,7 @@ import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
import org.slf4j.Logger;
@@ -76,7 +77,21 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
/** Logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
- private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
+ /** Consistency property level for Cassandra column families */
+ private static final String COL_FAM_CL = "cf.consistency.level";
+
+ /** Consistency property level for Cassandra read operations. */
+ private static final String READ_OP_CL = "read.consistency.level";
+
+ /** Consistency property level for Cassandra write operations. */
+ private static final String WRITE_OP_CL = "write.consistency.level";
+
+ /** Variables to hold different consistency levels defined by the properties. */
+ public static String colFamConsLvl;
+ public static String readOpConsLvl;
+ public static String writeOpConsLvl;
+
+ private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
/**
* Fixed string with value "UnionIndex" used to generate an extra column based on
@@ -126,6 +141,14 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K
public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
try {
super.initialize(keyClass, persistent, properties);
+ if (autoCreateSchema) {
+ // If this is not set, then each Cassandra client should set its default
+ // column family
+ colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null);
+ // operations
+ readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null);
+ writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null);
+ }
this.cassandraClient.initialize(keyClass, persistent);
} catch (Exception e) {
LOG.error(e.getMessage());
http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/test/conf/gora.properties
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/conf/gora.properties b/gora-cassandra/src/test/conf/gora.properties
index 80427b4..6c8b06a 100644
--- a/gora-cassandra/src/test/conf/gora.properties
+++ b/gora-cassandra/src/test/conf/gora.properties
@@ -14,14 +14,13 @@
# limitations under the License.
gora.datastore.default=org.apache.gora.cassandra.CassandraStore
-gora.cassandrastore.keyspace=
-gora.cassandrastore.name=
-gora.cassandrastore.class=
-gora.cassandrastore.qualifier=
-gora.cassandrastore.family=
-gora.cassandrastore.type=
-gora.cassandraStore.cluster=Test Cluster
-gora.cassandraStore.host=localhost
+gora.cassandrastore.cluster=Test Cluster
+gora.cassandrastore.host=localhost
+# property is annotated in CassandraClient#checkKeyspace()
+# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL.
+gora.cassandrastore.cf.consistency.level=ONE
+gora.cassandrastore.read.consistency.level=QUORUM
+gora.cassandrastore.write.consistency.level=ONE
http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
----------------------------------------------------------------------
diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
index 7192c8a..ee48542 100644
--- a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
+++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
@@ -161,11 +161,11 @@ public class DynamoDBStore<K, T extends Persistent> extends WSDataStoreBase<K, T
LOG.debug("Initializing DynamoDB store");
getCredentials();
setWsProvider(wsProvider);
- preferredSchema = properties.getProperty(PREF_SCH_NAME);
- dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf());
- dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP));
+ preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null);
+ dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP, null),(AWSCredentials)getConf());
+ dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP, null));
mapping = readMapping();
- consistency = properties.getProperty(CONSISTENCY_READS);
+ consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null);
persistentClass = pPersistentClass;
}
catch (Exception e) {
[2/2] git commit: GORA-167 forward port of Make Cassandra keyspace
consistency configurable within gora.properties
Posted by le...@apache.org.
GORA-167 forward port of Make Cassandra keyspace consistency configurable within gora.properties
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/3cacf2a2
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/3cacf2a2
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/3cacf2a2
Branch: refs/heads/master
Commit: 3cacf2a25f2f8937284d50805eeb353bf5a1eabc
Parents: 68302e2
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Fri Aug 15 09:50:50 2014 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Fri Aug 15 09:50:50 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/3cacf2a2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 28d376f..c1fca0c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@
Current Development 0.5-SNAPSHOT
+* GORA-167 forward port of Make Cassandra keyspace consistency configurable within gora.properties (rmarroquin via lewismc)
+
* GORA-364 MemStore.get fails with NPE when key is not set (Gerhard Gossen via lewismc)
* GORA-361 AvroUtils.deepClonePersistent needs to flush BinaryEncoder (Gerhard Gossen via hsaputra)