You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2018/02/27 12:07:55 UTC
[3/5] gora git commit: GORA-530 : Reinstated exception throwing in
DataStore and Query
GORA-530 : Reinstated exception throwing in DataStore and Query
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/b06da5f3
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/b06da5f3
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/b06da5f3
Branch: refs/heads/master
Commit: b06da5f32ec572c88f7ec5245a4b573c73ae8c22
Parents: f028a54
Author: Alfonso Nishikawa Muñumer <al...@gmail.com>
Authored: Thu Jan 18 16:37:09 2018 -0100
Committer: Alfonso Nishikawa Muñumer <al...@gmail.com>
Committed: Thu Jan 18 16:37:09 2018 -0100
----------------------------------------------------------------------
.../gora/accumulo/store/AccumuloStore.java | 75 ++---
.../gora/aerospike/store/AerospikeStore.java | 290 ++++++++++--------
.../cassandra/serializers/AvroSerializer.java | 306 ++++++++++---------
.../serializers/CassandraSerializer.java | 148 +++++----
.../cassandra/serializers/NativeSerializer.java | 121 +++++---
.../gora/cassandra/store/CassandraStore.java | 78 +++--
.../store/TestAvroSerializationWithUDT.java | 4 +-
.../cassandra/store/TestCassandraStore.java | 3 +-
.../TestCassandraStoreWithCassandraKey.java | 11 +-
...stCassandraStoreWithNativeSerialization.java | 18 +-
.../store/TestNativeSerializationWithUDT.java | 4 +-
.../org/apache/gora/avro/store/AvroStore.java | 28 +-
.../gora/avro/store/DataFileAvroStore.java | 33 +-
.../org/apache/gora/memory/store/MemStore.java | 24 +-
.../gora/persistency/impl/BeanFactoryImpl.java | 8 +-
.../main/java/org/apache/gora/query/Query.java | 3 +-
.../org/apache/gora/query/impl/QueryBase.java | 3 +-
.../apache/gora/query/ws/impl/QueryWSBase.java | 3 +-
.../java/org/apache/gora/store/DataStore.java | 29 +-
.../apache/gora/store/impl/DataStoreBase.java | 42 ++-
.../store/impl/FileBackedDataStoreBase.java | 50 ++-
.../store/ws/impl/WSBackedDataStoreBase.java | 16 +-
.../gora/store/ws/impl/WSDataStoreBase.java | 5 +-
.../apache/gora/memory/store/MemStoreTest.java | 22 +-
.../apache/gora/mock/store/MockDataStore.java | 30 +-
.../apache/gora/store/DataStoreTestUtil.java | 3 +-
.../apache/gora/couchdb/store/CouchDBStore.java | 198 +++++++-----
.../gora/couchdb/store/TestCouchDBStore.java | 3 +-
.../gora/dynamodb/store/DynamoDBAvroStore.java | 11 +-
.../dynamodb/store/DynamoDBNativeStore.java | 151 +++++----
.../gora/dynamodb/store/DynamoDBStore.java | 100 +++---
.../org/apache/gora/hbase/store/HBaseStore.java | 109 ++++---
.../gora/infinispan/store/InfinispanClient.java | 29 +-
.../gora/infinispan/store/InfinispanStore.java | 131 +++++---
.../java/org/apache/gora/infinispan/Utils.java | 3 +-
.../gora/jcache/store/JCacheCacheLoader.java | 34 ++-
.../gora/jcache/store/JCacheCacheWriter.java | 30 +-
.../apache/gora/jcache/store/JCacheStore.java | 157 +++++++---
.../apache/gora/mongodb/store/MongoStore.java | 270 ++++++++++------
.../gora/mongodb/store/TestMongoStore.java | 3 +-
.../gora/orientdb/store/OrientDBStore.java | 140 ++++-----
.../org/apache/gora/solr/query/SolrResult.java | 12 +-
.../org/apache/gora/solr/store/SolrStore.java | 62 ++--
.../tutorial/log/DistributedLogManager.java | 23 +-
44 files changed, 1649 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
index bac354b..6737dbb 100644
--- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
+++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
@@ -359,7 +359,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
* @param properties
*/
@Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
try{
super.initialize(keyClass, persistentClass, properties);
@@ -376,7 +376,8 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
try {
encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -394,10 +395,12 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
if (autoCreateSchema && !schemaExists())
createSchema();
} catch (AccumuloException | AccumuloSecurityException e) {
- throw new IOException(e);
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
} catch(IOException e){
LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -470,7 +473,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
@Override
- public void createSchema() {
+ public void createSchema() throws GoraException {
try {
conn.tableOperations().create(mapping.tableName);
Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
@@ -480,13 +483,15 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
} catch (AccumuloException | AccumuloSecurityException e) {
LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
} catch (TableExistsException e) {
LOG.debug(e.getMessage(), e);
+ // Assume this is not an error
}
}
@Override
- public void deleteSchema() {
+ public void deleteSchema() throws GoraException {
try {
if (batchWriter != null)
batchWriter.close();
@@ -494,12 +499,18 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
conn.tableOperations().delete(mapping.tableName);
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@Override
- public boolean schemaExists() {
- return conn.tableOperations().exists(mapping.tableName);
+ public boolean schemaExists() throws GoraException {
+ try {
+ return conn.tableOperations().exists(mapping.tableName);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException {
@@ -639,7 +650,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
@Override
- public T get(K key, String[] fields) {
+ public T get(K key, String[] fields) throws GoraException {
try {
// TODO make isolated scanner optional?
Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
@@ -653,17 +664,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
if (row == null)
return null;
return persistent;
- } catch (TableNotFoundException e) {
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
- return null;
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- return null;
+ throw new GoraException(e);
}
}
@Override
- public void put(K key, T val) {
+ public void put(K key, T val) throws GoraException {
try{
Mutation m = new Mutation(new Text(toBytes(key)));
@@ -725,8 +733,11 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
} catch (MutationsRejectedException e) {
LOG.error(e.getMessage(), e);
}
- } catch (IOException e) {
+ } catch (GoraException e) {
+ throw e;
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -758,7 +769,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
return count;
}
- private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) {
+ private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) throws GoraException {
// First of all we delete array field on accumulo store
Text rowKey = new Text(m.getRow());
@@ -782,14 +793,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
@Override
- public boolean delete(K key) {
+ public boolean delete(K key) throws GoraException {
Query<K,T> q = newQuery();
q.setKey(key);
return deleteByQuery(q) > 0;
}
@Override
- public long deleteByQuery(Query<K,T> query) {
+ public long deleteByQuery(Query<K,T> query) throws GoraException {
try {
Scanner scanner = createScanner(query);
// add iterator that drops values on the server side
@@ -814,16 +825,9 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
return count;
- } catch (TableNotFoundException e) {
- // TODO return 0?
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
- return 0;
- } catch (MutationsRejectedException e) {
- LOG.error(e.getMessage(), e);
- return 0;
- } catch (IOException e){
- LOG.error(e.getMessage(), e);
- return 0;
+ throw new GoraException(e);
}
}
@@ -865,14 +869,13 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
* Execute the query and return the result.
*/
@Override
- public Result<K,T> execute(Query<K,T> query) {
+ public Result<K,T> execute(Query<K,T> query) throws GoraException {
try {
Scanner scanner = createScanner(query);
return new AccumuloResult<>(this, query, scanner);
} catch (TableNotFoundException e) {
- // TODO return empty result?
LOG.error(e.getMessage(), e);
- return null;
+ throw new GoraException(e) ;
}
}
@@ -893,7 +896,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
@Override
- public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException {
+ public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws GoraException {
try {
TabletLocator tl;
if (conn instanceof MockConnector)
@@ -962,8 +965,9 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
return ret;
- } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
- throw new IOException(e);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -1020,13 +1024,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T
}
@Override
- public void flush() {
+ public void flush() throws GoraException {
try {
if (batchWriter != null) {
batchWriter.flush();
}
- } catch (MutationsRejectedException e) {
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
----------------------------------------------------------------------
diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
index b456cf7..cf41392 100644
--- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
+++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
@@ -18,20 +18,12 @@ package org.apache.gora.aerospike.store;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
-import com.aerospike.client.Key;
-import com.aerospike.client.Value;
-import com.aerospike.client.Bin;
-import com.aerospike.client.Record;
-import com.aerospike.client.AerospikeClient;
-import com.aerospike.client.policy.ClientPolicy;
-import com.aerospike.client.query.RecordSet;
-import com.aerospike.client.query.Statement;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.util.Utf8;
@@ -48,9 +40,20 @@ import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Key;
+import com.aerospike.client.Record;
+import com.aerospike.client.Value;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.query.RecordSet;
+import com.aerospike.client.query.Statement;
+
/**
* Implementation of a Aerospike data store to be used by gora.
*
@@ -79,36 +82,41 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @param properties properties
*/
@Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
super.initialize(keyClass, persistentClass, properties);
- AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder();
- aerospikeMappingBuilder
- .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass,
- persistentClass);
- aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(),
- properties, getConf());
- ClientPolicy policy = new ClientPolicy();
- policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy();
- policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy();
-
- // 'SendKey' property is enabled by default as the key is needed in query execution
- policy.readPolicyDefault.sendKey = true;
- policy.writePolicyDefault.sendKey = true;
-
- // Set the credentials for servers with restricted access
- if (aerospikeParameters.getUsername() != null) {
- policy.user = aerospikeParameters.getUsername();
- }
- if (aerospikeParameters.getPassword() != null) {
- policy.password = aerospikeParameters.getPassword();
+ try {
+ AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder();
+ aerospikeMappingBuilder
+ .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass,
+ persistentClass);
+ aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(),
+ properties, getConf());
+ ClientPolicy policy = new ClientPolicy();
+ policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy();
+ policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy();
+
+ // 'SendKey' property is enabled by default as the key is needed in query execution
+ policy.readPolicyDefault.sendKey = true;
+ policy.writePolicyDefault.sendKey = true;
+
+ // Set the credentials for servers with restricted access
+ if (aerospikeParameters.getUsername() != null) {
+ policy.user = aerospikeParameters.getUsername();
+ }
+ if (aerospikeParameters.getPassword() != null) {
+ policy.password = aerospikeParameters.getPassword();
+ }
+
+ aerospikeClient = new AerospikeClient(policy, aerospikeParameters.getHost(),
+ aerospikeParameters.getPort());
+ aerospikeParameters.setServerSpecificParameters(aerospikeClient);
+ aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields());
+ LOG.info("Aerospike Gora datastore initialized successfully.");
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
-
- aerospikeClient = new AerospikeClient(policy, aerospikeParameters.getHost(),
- aerospikeParameters.getPort());
- aerospikeParameters.setServerSpecificParameters(aerospikeClient);
- aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields());
- LOG.info("Aerospike Gora datastore initialized successfully.");
}
/**
@@ -129,7 +137,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* the fly. Thus, schema creation functionality is unavailable in gora-aerospike module.
*/
@Override
- public void createSchema() {
+ public void createSchema() throws GoraException {
}
/**
@@ -138,7 +146,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* the fly. Thus, schema deletion functionality is unavailable in gora-aerospike module.
*/
@Override
- public void deleteSchema() {
+ public void deleteSchema() throws GoraException {
}
/**
@@ -147,7 +155,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* the fly. Thus, schema exists functionality is unavailable in gora-aerospike module.
*/
@Override
- public boolean schemaExists() {
+ public boolean schemaExists() throws GoraException {
return true;
}
@@ -159,17 +167,25 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @return the Object corresponding to the key or null if it cannot be found
*/
@Override
- public T get(K key, String[] fields) {
-
- Key recordKey = getAerospikeKey(key);
- fields = getFieldsToQuery(fields);
+ public T get(K key, String[] fields) throws GoraException {
- Record record = aerospikeClient
- .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey, fields);
- if (record == null) {
- return null;
+ try {
+ Key recordKey = getAerospikeKey(key);
+ fields = getFieldsToQuery(fields);
+
+ Record record = aerospikeClient
+ .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey, fields);
+
+ if (record == null) {
+ return null;
+ }
+ return createPersistentInstance(record, fields);
+ } catch (GoraException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- return createPersistentInstance(record, fields);
}
/**
@@ -181,36 +197,41 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @param persistent object to be persisted
*/
@Override
- public void put(K key, T persistent) {
-
- Key recordKey = getAerospikeKey(key);
+ public void put(K key, T persistent) throws GoraException {
- List<Field> fields = persistent.getSchema().getFields();
-
- for (int i = 0; i < fields.size(); i++) {
- if (!persistent.isDirty(i)) {
- continue;
- }
- Object persistentValue = persistent.get(i);
-
- String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping()
- .get(fields.get(i).name());
- if (mappingBinName == null) {
- LOG.error("Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?",
- persistent.getClass().getName(), fields.get(i).name());
- throw new RuntimeException(
- "Aerospike mapping for field [" + persistent.getClass().getName() + "#" + fields
- .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?");
- }
- Bin bin;
- if (persistentValue != null) {
- bin = new Bin(mappingBinName,
- getSerializableValue(persistentValue, fields.get(i).schema()));
- } else {
- bin = Bin.asNull(mappingBinName);
+ try {
+ Key recordKey = getAerospikeKey(key);
+
+ List<Field> fields = persistent.getSchema().getFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ if (!persistent.isDirty(i)) {
+ continue;
+ }
+ Object persistentValue = persistent.get(i);
+
+ String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping()
+ .get(fields.get(i).name());
+ if (mappingBinName == null) {
+ LOG.error("Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?",
+ persistent.getClass().getName(), fields.get(i).name());
+ throw new RuntimeException(
+ "Aerospike mapping for field [" + persistent.getClass().getName() + "#" + fields
+ .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?");
+ }
+ Bin bin;
+ if (persistentValue != null) {
+ bin = new Bin(mappingBinName,
+ getSerializableValue(persistentValue, fields.get(i).schema()));
+ } else {
+ bin = Bin.asNull(mappingBinName);
+ }
+ aerospikeClient
+ .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin);
}
- aerospikeClient
- .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -221,10 +242,15 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @return whether the object was successfully deleted
*/
@Override
- public boolean delete(K key) {
- Key recordKey = getAerospikeKey(key);
- return aerospikeClient
- .delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey);
+ public boolean delete(K key) throws GoraException {
+ try {
+ Key recordKey = getAerospikeKey(key);
+ return aerospikeClient
+ .delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
/**
@@ -234,10 +260,10 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @return the number of deleted records
*/
@Override
- public long deleteByQuery(Query<K, T> query) {
- Result<K, T> result = query.execute();
+ public long deleteByQuery(Query<K, T> query) throws GoraException {
int deleteCount = 0;
try {
+ Result<K, T> result = query.execute();
while (result.next()) {
if (aerospikeClient.delete(null, getAerospikeKey(result.getKey()))) {
deleteCount++;
@@ -246,7 +272,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
return deleteCount;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- return -1;
+ throw new GoraException(e);
}
}
@@ -257,59 +283,64 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @return the query result
*/
@Override
- public Result<K, T> execute(Query<K, T> query) {
+ public Result<K, T> execute(Query<K, T> query) throws GoraException {
List<AerospikeResultRecord> resultRecords = new ArrayList<>();
String namespace = aerospikeParameters.getAerospikeMapping().getNamespace();
String set = aerospikeParameters.getAerospikeMapping().getSet();
- // Query execution without any keys
- if (query.getStartKey() == null && query.getEndKey() == null) {
-
- try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) {
- while (recordSet.next()) {
- AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(recordSet.getKey(),
- recordSet.getRecord());
- resultRecords.add(aerospikeRecord);
+ try {
+ // Query execution without any keys
+ if (query.getStartKey() == null && query.getEndKey() == null) {
+
+ try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) {
+ while (recordSet.next()) {
+ AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(recordSet.getKey(),
+ recordSet.getRecord());
+ resultRecords.add(aerospikeRecord);
+ }
}
}
- }
-
- // Query execution for single key
- else if (query.getKey() != null) {
- Key key = getAerospikeKey(query.getKey());
- Record record = aerospikeClient.get(null, key);
- if (record != null) {
- resultRecords.add(new AerospikeResultRecord(key, record));
+
+ // Query execution for single key
+ else if (query.getKey() != null) {
+ Key key = getAerospikeKey(query.getKey());
+ Record record = aerospikeClient.get(null, key);
+ if (record != null) {
+ resultRecords.add(new AerospikeResultRecord(key, record));
+ }
}
+
+ // Query execution for key ranges
+ // ToDo: Implement Query execution for key ranges
+ // else if (query.getStartKey() != null && query.getEndKey() != null) {
+ //
+ // // the key range filtering at the gora side, which is not a better solution
+ // String lowerBound = query.getStartKey().toString();
+ // String upperBound = query.getEndKey().toString();
+ //
+ // try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) {
+ // while (recordSet.next()) {
+ // Key key = recordSet.getKey();
+ // Record record = recordSet.getRecord();
+ //
+ // String input = key.userKey.toString();
+ // boolean isSpecifiedRange = input.compareToIgnoreCase(lowerBound) >= 0 && input
+ // .compareToIgnoreCase(upperBound) <= 0;
+ //
+ // if (isSpecifiedRange) {
+ // AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(key, record);
+ // resultRecords.add(aerospikeRecord);
+ // }
+ //
+ // }
+ // }
+ // }
+ return new AerospikeQueryResult<>(this, query, resultRecords, getFieldsToQuery(null));
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e) ;
}
-
- // Query execution for key ranges
- // ToDo: Implement Query execution for key ranges
- // else if (query.getStartKey() != null && query.getEndKey() != null) {
- //
- // // the key range filtering at the gora side, which is not a better solution
- // String lowerBound = query.getStartKey().toString();
- // String upperBound = query.getEndKey().toString();
- //
- // try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) {
- // while (recordSet.next()) {
- // Key key = recordSet.getKey();
- // Record record = recordSet.getRecord();
- //
- // String input = key.userKey.toString();
- // boolean isSpecifiedRange = input.compareToIgnoreCase(lowerBound) >= 0 && input
- // .compareToIgnoreCase(upperBound) <= 0;
- //
- // if (isSpecifiedRange) {
- // AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(key, record);
- // resultRecords.add(aerospikeRecord);
- // }
- //
- // }
- // }
- // }
- return new AerospikeQueryResult<>(this, query, resultRecords, getFieldsToQuery(null));
}
/**
@@ -341,7 +372,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
}
@Override
- public void flush() {
+ public void flush() throws GoraException {
}
/**
@@ -440,8 +471,9 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K
* @param record record retrieved from database
* @param fields fields
* @return persistent object created
+ * @throws GoraException
*/
- public T createPersistentInstance(Record record, String[] fields) {
+ public T createPersistentInstance(Record record, String[] fields) throws GoraException {
T persistent = newPersistent();
for (String field : fields) {
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index 0ab21c5..9660485 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -38,6 +38,7 @@ import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,18 +60,18 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
private Schema persistentSchema;
- AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) {
+ AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) throws GoraException {
super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
if (PersistentBase.class.isAssignableFrom(dataStore.getPersistentClass())) {
persistentSchema = ((PersistentBase) dataStore.getBeanFactory().getCachedPersistent()).getSchema();
} else {
- throw new RuntimeException("Unsupported persistent class, couldn't able to find the Avro schema.");
+ throw new GoraException("Unsupported persistent class, couldn't able to find the Avro schema.");
}
this.cassandraDataStore = dataStore;
try {
analyzePersistent();
} catch (Exception e) {
- throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
+ throw new GoraException("Error occurred while analyzing the persistent class, :" + e.getMessage());
}
}
@@ -136,28 +137,35 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
* @return
*/
@Override
- public Persistent get(Object key, String[] fields) {
- if (fields == null) {
- fields = getFields();
- }
- ArrayList<String> cassandraKeys = new ArrayList<>();
- ArrayList<Object> cassandraValues = new ArrayList<>();
- AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
- String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys);
- SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray());
- if (readConsistencyLevel != null) {
- statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
- }
- ResultSet resultSet = this.client.getSession().execute(statement);
- Iterator<Row> iterator = resultSet.iterator();
- ColumnDefinitions definitions = resultSet.getColumnDefinitions();
- T obj = null;
- if (iterator.hasNext()) {
- obj = cassandraDataStore.newPersistent();
- AbstractGettableData row = (AbstractGettableData) iterator.next();
- populateValuesToPersistent(row, definitions, obj, fields);
+ public Persistent get(Object key, String[] fields) throws GoraException {
+ try {
+ if (fields == null) {
+ fields = getFields();
+ }
+ ArrayList<String> cassandraKeys = new ArrayList<>();
+ ArrayList<Object> cassandraValues = new ArrayList<>();
+ AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+ String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys);
+ SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray());
+ if (readConsistencyLevel != null) {
+ statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+ }
+ ResultSet resultSet = this.client.getSession().execute(statement);
+ Iterator<Row> iterator = resultSet.iterator();
+ ColumnDefinitions definitions = resultSet.getColumnDefinitions();
+ T obj = null;
+ if (iterator.hasNext()) {
+ obj = cassandraDataStore.newPersistent();
+ AbstractGettableData row = (AbstractGettableData) iterator.next();
+ populateValuesToPersistent(row, definitions, obj, fields);
+ }
+ return obj;
+ } catch (GoraException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- return obj;
}
/**
@@ -167,67 +175,72 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
* @param persistent
*/
@Override
- public void put(Object key, Persistent persistent) {
- if (persistent instanceof PersistentBase) {
- if (persistent.isDirty()) {
- PersistentBase persistentBase = (PersistentBase) persistent;
- ArrayList<String> fields = new ArrayList<>();
- ArrayList<Object> values = new ArrayList<>();
- AvroCassandraUtils.processKeys(mapping, key, fields, values);
- for (Schema.Field f : persistentBase.getSchema().getFields()) {
- String fieldName = f.name();
- Field field = mapping.getFieldFromFieldName(fieldName);
- if (field == null) {
- LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass});
- continue;
- }
- if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) {
- Object value = persistentBase.get(f.pos());
- String fieldType = field.getType();
- if (fieldType.contains("frozen")) {
- fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
- UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType);
- UDTValue udtValue = userType.newValue();
- Schema udtSchema = f.schema();
- if (udtSchema.getType().equals(Schema.Type.UNION)) {
- for (Schema schema : udtSchema.getTypes()) {
- if (schema.getType().equals(Schema.Type.RECORD)) {
- udtSchema = schema;
- break;
+ public void put(Object key, Persistent persistent) throws GoraException {
+ try {
+ if (persistent instanceof PersistentBase) {
+ if (persistent.isDirty()) {
+ PersistentBase persistentBase = (PersistentBase) persistent;
+ ArrayList<String> fields = new ArrayList<>();
+ ArrayList<Object> values = new ArrayList<>();
+ AvroCassandraUtils.processKeys(mapping, key, fields, values);
+ for (Schema.Field f : persistentBase.getSchema().getFields()) {
+ String fieldName = f.name();
+ Field field = mapping.getFieldFromFieldName(fieldName);
+ if (field == null) {
+ LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass});
+ continue;
+ }
+ if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) {
+ Object value = persistentBase.get(f.pos());
+ String fieldType = field.getType();
+ if (fieldType.contains("frozen")) {
+ fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">"));
+ UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType);
+ UDTValue udtValue = userType.newValue();
+ Schema udtSchema = f.schema();
+ if (udtSchema.getType().equals(Schema.Type.UNION)) {
+ for (Schema schema : udtSchema.getTypes()) {
+ if (schema.getType().equals(Schema.Type.RECORD)) {
+ udtSchema = schema;
+ break;
+ }
}
}
- }
- PersistentBase udtObjectBase = (PersistentBase) value;
- for (Schema.Field udtField : udtSchema.getFields()) {
- Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field);
- if (udtField.schema().getType().equals(Schema.Type.MAP)) {
- udtValue.setMap(udtField.name(), (Map) udtFieldValue);
- } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) {
- udtValue.setList(udtField.name(), (List) udtFieldValue);
- } else {
- udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass());
+ PersistentBase udtObjectBase = (PersistentBase) value;
+ for (Schema.Field udtField : udtSchema.getFields()) {
+ Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field);
+ if (udtField.schema().getType().equals(Schema.Type.MAP)) {
+ udtValue.setMap(udtField.name(), (Map) udtFieldValue);
+ } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) {
+ udtValue.setList(udtField.name(), (List) udtFieldValue);
+ } else {
+ udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass());
+ }
}
+ value = udtValue;
+ } else {
+ value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field);
}
- value = udtValue;
- } else {
- value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field);
+ values.add(value);
+ fields.add(fieldName);
}
- values.add(value);
- fields.add(fieldName);
}
+ String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields);
+ SimpleStatement statement = new SimpleStatement(cqlQuery, values.toArray());
+ if (writeConsistencyLevel != null) {
+ statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+ }
+ client.getSession().execute(statement);
+ } else {
+ LOG.info("Ignored putting persistent bean {} in the store as it is neither "
+ + "new, neither dirty.", new Object[]{persistent});
}
- String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields);
- SimpleStatement statement = new SimpleStatement(cqlQuery, values.toArray());
- if (writeConsistencyLevel != null) {
- statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
- }
- client.getSession().execute(statement);
} else {
- LOG.info("Ignored putting persistent bean {} in the store as it is neither "
- + "new, neither dirty.", new Object[]{persistent});
+ LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class});
}
- } else {
- LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class});
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -238,25 +251,30 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
* @return
*/
@Override
- public Persistent get(Object key) {
- ArrayList<String> cassandraKeys = new ArrayList<>();
- ArrayList<Object> cassandraValues = new ArrayList<>();
- AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
- String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys);
- SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray());
- if (readConsistencyLevel != null) {
- statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
- }
- ResultSet resultSet = client.getSession().execute(statement);
- Iterator<Row> iterator = resultSet.iterator();
- ColumnDefinitions definitions = resultSet.getColumnDefinitions();
- T obj = null;
- if (iterator.hasNext()) {
- obj = cassandraDataStore.newPersistent();
- AbstractGettableData row = (AbstractGettableData) iterator.next();
- populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames());
+ public Persistent get(Object key) throws GoraException {
+ try {
+ ArrayList<String> cassandraKeys = new ArrayList<>();
+ ArrayList<Object> cassandraValues = new ArrayList<>();
+ AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+ String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys);
+ SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray());
+ if (readConsistencyLevel != null) {
+ statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+ }
+ ResultSet resultSet = client.getSession().execute(statement);
+ Iterator<Row> iterator = resultSet.iterator();
+ ColumnDefinitions definitions = resultSet.getColumnDefinitions();
+ T obj = null;
+ if (iterator.hasNext()) {
+ obj = cassandraDataStore.newPersistent();
+ AbstractGettableData row = (AbstractGettableData) iterator.next();
+ populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames());
+ }
+ return obj;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- return obj;
}
/**
@@ -380,17 +398,22 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
* @return
*/
@Override
- public boolean delete(Object key) {
+ public boolean delete(Object key) throws GoraException {
ArrayList<String> cassandraKeys = new ArrayList<>();
ArrayList<Object> cassandraValues = new ArrayList<>();
- AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
- String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys);
- SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray());
- if (writeConsistencyLevel != null) {
- statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+ try {
+ AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues);
+ String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys);
+ SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray());
+ if (writeConsistencyLevel != null) {
+ statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+ }
+ ResultSet resultSet = client.getSession().execute(statement);
+ return resultSet.wasApplied();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- ResultSet resultSet = client.getSession().execute(statement);
- return resultSet.wasApplied();
}
/**
@@ -401,46 +424,51 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
* @return
*/
@Override
- public Result execute(DataStore dataStore, Query query) {
- List<Object> objectArrayList = new ArrayList<>();
- String[] fields = query.getFields();
- if (fields != null) {
- fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
- } else {
- fields = mapping.getAllFieldsIncludingKeys();
- }
- CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
- String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
- ResultSet results;
- SimpleStatement statement;
- if (objectArrayList.size() == 0) {
- statement = new SimpleStatement(cqlQuery);
- } else {
- statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
- }
- if (readConsistencyLevel != null) {
- statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
- }
- results = client.getSession().execute(statement);
- Iterator<Row> iterator = results.iterator();
- ColumnDefinitions definitions = results.getColumnDefinitions();
- T obj;
- K keyObject;
- CassandraKey cassandraKey = mapping.getCassandraKey();
- while (iterator.hasNext()) {
- AbstractGettableData row = (AbstractGettableData) iterator.next();
- obj = cassandraDataStore.newPersistent();
- keyObject = cassandraDataStore.newKey();
- populateValuesToPersistent(row, definitions, obj, fields);
- if (cassandraKey != null) {
- populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames());
+ public Result execute(DataStore dataStore, Query query) throws GoraException {
+ try {
+ List<Object> objectArrayList = new ArrayList<>();
+ String[] fields = query.getFields();
+ if (fields != null) {
+ fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+ } else {
+ fields = mapping.getAllFieldsIncludingKeys();
+ }
+ CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
+ String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
+ ResultSet results;
+ SimpleStatement statement;
+ if (objectArrayList.size() == 0) {
+ statement = new SimpleStatement(cqlQuery);
} else {
- Field key = mapping.getInlinedDefinedPartitionKey();
- keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null);
+ statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
+ }
+ if (readConsistencyLevel != null) {
+ statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+ }
+ results = client.getSession().execute(statement);
+ Iterator<Row> iterator = results.iterator();
+ ColumnDefinitions definitions = results.getColumnDefinitions();
+ T obj;
+ K keyObject;
+ CassandraKey cassandraKey = mapping.getCassandraKey();
+ while (iterator.hasNext()) {
+ AbstractGettableData row = (AbstractGettableData) iterator.next();
+ obj = cassandraDataStore.newPersistent();
+ keyObject = cassandraDataStore.newKey();
+ populateValuesToPersistent(row, definitions, obj, fields);
+ if (cassandraKey != null) {
+ populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames());
+ } else {
+ Field key = mapping.getInlinedDefinedPartitionKey();
+ keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null);
+ }
+ cassandraResult.addResultElement(keyObject, obj);
}
- cassandraResult.addResultElement(keyObject, obj);
+ return cassandraResult;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- return cassandraResult;
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index d93ff9c..cd806d0 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -16,11 +16,11 @@
*/
package org.apache.gora.cassandra.serializers;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.TableMetadata;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
import org.apache.gora.cassandra.bean.Field;
import org.apache.gora.cassandra.store.CassandraClient;
import org.apache.gora.cassandra.store.CassandraMapping;
@@ -29,13 +29,15 @@ import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.TableMetadata;
/**
* This is the abstract Cassandra Serializer class.
@@ -77,8 +79,9 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
* @param <K> key class
* @param <T> persistent class
* @return Serializer
+ * @throws GoraException
*/
- public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) {
+ public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) throws GoraException {
CassandraStore.SerializerType serType = type == null || type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
CassandraSerializer serializer;
switch (serType) {
@@ -100,40 +103,60 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
protected abstract void analyzePersistent() throws Exception;
- public void createSchema() {
- LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
- this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
- for (Map.Entry udtType : userDefineTypeMaps.entrySet()) {
- LOG.debug("creating Cassandra User Define Type {}", udtType.getKey());
- this.client.getSession().execute((String) udtType.getValue());
+ public void createSchema() throws GoraException {
+ try {
+ LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
+ this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
+ for (Map.Entry udtType : userDefineTypeMaps.entrySet()) {
+ LOG.debug("creating Cassandra User Define Type {}", udtType.getKey());
+ this.client.getSession().execute((String) udtType.getValue());
+ }
+ LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
+ this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
- this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
}
- public void deleteSchema() {
- LOG.debug("dropping Cassandra table {}", mapping.getCoreName());
- this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping));
- LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName());
- this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping));
+ public void deleteSchema() throws GoraException {
+ try {
+ LOG.debug("dropping Cassandra table {}", mapping.getCoreName());
+ this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping));
+ LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName());
+ this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping));
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
public void close() {
this.client.close();
}
- public void truncateSchema() {
- LOG.debug("truncating Cassandra table {}", mapping.getCoreName());
- this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+ public void truncateSchema() throws GoraException {
+ try {
+ LOG.debug("truncating Cassandra table {}", mapping.getCoreName());
+ this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
- public boolean schemaExists() {
- KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName());
- if (keyspace != null) {
- TableMetadata table = keyspace.getTable(mapping.getCoreName());
- return table != null;
- } else {
- return false;
+ public boolean schemaExists() throws GoraException {
+ try {
+ KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName());
+ if (keyspace != null) {
+ TableMetadata table = keyspace.getTable(mapping.getCoreName());
+ return table != null;
+ } else {
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
}
@@ -151,7 +174,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
* @param key key value
* @param value persistent value
*/
- public abstract void put(K key, T value);
+ public abstract void put(K key, T value) throws GoraException;
/**
* Retrieves the persistent value according to the key
@@ -159,7 +182,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
* @param key key value
* @return persistent value
*/
- public abstract T get(K key);
+ public abstract T get(K key) throws GoraException;
/**
* Deletes persistent value according to the key
@@ -167,7 +190,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
* @param key key value
* @return isDeleted
*/
- public abstract boolean delete(K key);
+ public abstract boolean delete(K key) throws GoraException;
/**
* Retrieves the persistent value according to the key and fields
@@ -176,7 +199,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
* @param fields fields
* @return persistent value
*/
- public abstract T get(K key, String[] fields);
+ public abstract T get(K key, String[] fields) throws GoraException;
/**
* Executes the given query and returns the results.
@@ -185,7 +208,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
* @param query Cassandra Query
* @return Cassandra Result
*/
- public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query);
+ public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query) throws GoraException ;
/**
* Update the persistent objects
@@ -195,31 +218,36 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
*/
public abstract boolean updateByQuery(Query query);
- public long deleteByQuery(Query query) {
- List<Object> objectArrayList = new ArrayList<>();
- if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) {
- if (query.getFields() == null) {
- client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+ public long deleteByQuery(Query query) throws GoraException {
+ try {
+ List<Object> objectArrayList = new ArrayList<>();
+ if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) {
+ if (query.getFields() == null) {
+ client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+ } else {
+ LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields.");
+ }
} else {
- LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields.");
- }
- } else {
- String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
- ResultSet results;
- SimpleStatement statement;
- if (objectArrayList.size() == 0) {
- statement = new SimpleStatement(cqlQuery);
- } else {
- statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
- }
- if (writeConsistencyLevel != null) {
- statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+ String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
+ ResultSet results;
+ SimpleStatement statement;
+ if (objectArrayList.size() == 0) {
+ statement = new SimpleStatement(cqlQuery);
+ } else {
+ statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
+ }
+ if (writeConsistencyLevel != null) {
+ statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+ }
+ results = client.getSession().execute(statement);
+ LOG.debug("Delete by Query was applied : " + results.wasApplied());
}
- results = client.getSession().execute(statement);
- LOG.debug("Delete by Query was applied : " + results.wasApplied());
+ LOG.info("Delete By Query method doesn't return the deleted element count.");
+ return 0;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- LOG.info("Delete By Query method doesn't return the deleted element count.");
- return 0;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index bf28ee0..3fb8e10 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -16,12 +16,12 @@
*/
package org.apache.gora.cassandra.serializers;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.datastax.driver.mapping.Result;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.commons.lang.ArrayUtils;
import org.apache.gora.cassandra.bean.Field;
import org.apache.gora.cassandra.query.CassandraResultSet;
@@ -30,14 +30,16 @@ import org.apache.gora.cassandra.store.CassandraMapping;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
/**
* This Class contains the operation relates to Native Serialization.
@@ -48,12 +50,13 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
private Mapper<T> mapper;
- NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
+ NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) throws GoraException {
super(cassandraClient, keyClass, persistentClass, mapping);
try {
analyzePersistent();
} catch (Exception e) {
- throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage());
+ LOG.error(e.getMessage(), e);
+ throw new GoraException("Error occurred while analyzing the persistent class, :" + e.getMessage(), e);
}
this.createSchema();
MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
@@ -74,9 +77,14 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
* @param value
*/
@Override
- public void put(Object key, Persistent value) {
- LOG.debug("Object is saved with key : {} and value : {}", key, value);
- mapper.save((T) value);
+ public void put(Object key, Persistent value) throws GoraException {
+ try {
+ LOG.debug("Object is saved with key : {} and value : {}", key, value);
+ mapper.save((T) value);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
/**
@@ -86,14 +94,19 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
* @return
*/
@Override
- public T get(Object key) {
- T object = mapper.get(key);
- if (object != null) {
- LOG.debug("Object is found for key : {}", key);
- } else {
- LOG.debug("Object is not found for key : {}", key);
+ public T get(Object key) throws GoraException {
+ try {
+ T object = mapper.get(key);
+ if (object != null) {
+ LOG.debug("Object is found for key : {}", key);
+ } else {
+ LOG.debug("Object is not found for key : {}", key);
+ }
+ return object;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- return object;
}
/**
@@ -103,10 +116,15 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
* @return
*/
@Override
- public boolean delete(Object key) {
+ public boolean delete(Object key) throws GoraException {
LOG.debug("Object is deleted for key : {}", key);
- mapper.delete(key);
- return true;
+ try {
+ mapper.delete(key);
+ return true;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
/**
@@ -187,30 +205,35 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
* @return
*/
@Override
- public org.apache.gora.query.Result execute(DataStore dataStore, Query query) {
- List<Object> objectArrayList = new ArrayList<>();
- String[] fields = query.getFields();
- if (fields != null) {
- fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
- } else {
- fields = mapping.getAllFieldsIncludingKeys();
- }
- CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
- String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
- ResultSet results;
- if (objectArrayList.size() == 0) {
- results = client.getSession().execute(cqlQuery);
- } else {
- results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
- }
- Result<T> objects = mapper.map(results);
- Iterator iterator = objects.iterator();
- while (iterator.hasNext()) {
- T result = (T) iterator.next();
- K key = getKey(result);
- cassandraResult.addResultElement(key, result);
+ public org.apache.gora.query.Result execute(DataStore dataStore, Query query) throws GoraException {
+ try {
+ List<Object> objectArrayList = new ArrayList<>();
+ String[] fields = query.getFields();
+ if (fields != null) {
+ fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+ } else {
+ fields = mapping.getAllFieldsIncludingKeys();
+ }
+ CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
+ String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
+ ResultSet results;
+ if (objectArrayList.size() == 0) {
+ results = client.getSession().execute(cqlQuery);
+ } else {
+ results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+ }
+ Result<T> objects = mapper.map(results);
+ Iterator iterator = objects.iterator();
+ while (iterator.hasNext()) {
+ T result = (T) iterator.next();
+ K key = getKey(result);
+ cassandraResult.addResultElement(key, result);
+ }
+ return cassandraResult;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
}
- return cassandraResult;
}
private K getKey(T object) {
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 7d7d0d8..865d8cf 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -16,6 +16,11 @@
*/
package org.apache.gora.cassandra.store;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.serializers.CassandraSerializer;
import org.apache.gora.persistency.BeanFactory;
@@ -27,14 +32,10 @@ import org.apache.gora.query.Result;
import org.apache.gora.query.ws.impl.PartitionWSQueryImpl;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.util.GoraException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
/**
* Implementation of Cassandra Store.
*
@@ -70,7 +71,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* @param properties properties
*/
@Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
LOG.debug("Initializing Cassandra store");
String serializationType;
try {
@@ -86,8 +87,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
CassandraClient cassandraClient = new CassandraClient();
cassandraClient.initialize(properties, mapping);
cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, serializationType, this, mapping);
+ } catch (GoraException e) {
+ throw e;
} catch (Exception e) {
- throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e);
+ LOG.error(e.getMessage(), e);
+ throw new GoraException("Error while initializing Cassandra store: " + e.getMessage(), e);
}
}
@@ -125,7 +129,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public void createSchema() {
+ public void createSchema() throws GoraException {
cassandraSerializer.createSchema();
}
@@ -133,7 +137,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public void deleteSchema() {
+ public void deleteSchema() throws GoraException {
cassandraSerializer.deleteSchema();
}
@@ -158,15 +162,16 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public K newKey() {
+ public K newKey() throws GoraException {
try {
if (beanFactory != null) {
return beanFactory.newKey();
} else {
return keyClass.newInstance();
}
- } catch (Exception ex) {
- throw new RuntimeException("Error while instantiating a key: " + ex.getMessage(), ex);
+ } catch (Exception e) {
+ LOG.error("Error while instantiating a key: " + e.getMessage(), e);
+ throw new GoraException("Error while instantiating a key: " + e.getMessage(), e);
}
}
@@ -175,15 +180,16 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
*/
@SuppressWarnings("all")
@Override
- public T newPersistent() {
+ public T newPersistent() throws GoraException {
try {
if (beanFactory != null) {
return this.beanFactory.newPersistent();
} else {
return persistentClass.newInstance();
}
- } catch (Exception ex) {
- throw new RuntimeException("Error while instantiating a persistent: " + ex.getMessage(), ex);
+ } catch (Exception e) {
+ LOG.error("Error while instantiating a persistent: " + e.getMessage(), e);
+ throw new GoraException("Error while instantiating a key: " + e.getMessage(), e);
}
}
@@ -215,7 +221,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public T get(K key) {
+ public T get(K key) throws GoraException {
return (T) cassandraSerializer.get(key);
}
@@ -223,7 +229,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public T get(K key, String[] fields) {
+ public T get(K key, String[] fields) throws GoraException {
return (T) cassandraSerializer.get(key, fields);
}
@@ -231,7 +237,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public void put(K key, T obj) {
+ public void put(K key, T obj) throws GoraException {
cassandraSerializer.put(key, obj);
}
@@ -239,7 +245,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public boolean delete(K key) {
+ public boolean delete(K key) throws GoraException {
return cassandraSerializer.delete(key);
}
@@ -247,7 +253,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public long deleteByQuery(Query<K, T> query) {
+ public long deleteByQuery(Query<K, T> query) throws GoraException {
return cassandraSerializer.deleteByQuery(query);
}
@@ -255,8 +261,13 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public Result<K, T> execute(Query<K, T> query) {
- return (Result<K, T>) cassandraSerializer.execute(this, query);
+ public Result<K, T> execute(Query<K, T> query) throws GoraException {
+ try {
+ return (Result<K, T>) cassandraSerializer.execute(this, query);
+ } catch (Exception e) {
+ this.LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
/**
@@ -282,19 +293,24 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
- List<PartitionQuery<K, T>> partitions = new ArrayList<>();
- PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query);
- pqi.setDataStore(this);
- partitions.add(pqi);
- return partitions;
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws GoraException {
+ try {
+ List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+ PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query);
+ pqi.setDataStore(this);
+ partitions.add(pqi);
+ return partitions;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new GoraException(e);
+ }
}
/**
* {@inheritDoc}
*/
@Override
- public void flush() {
+ public void flush() throws GoraException {
// ignore since caching has been disabled
}
@@ -310,7 +326,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public void truncateSchema() {
+ public void truncateSchema() throws GoraException {
cassandraSerializer.truncateSchema();
}
@@ -318,7 +334,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
* {@inheritDoc}
*/
@Override
- public boolean schemaExists() {
+ public boolean schemaExists() throws GoraException{
return cassandraSerializer.schemaExists();
}
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
index c016893..736de0f 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java
@@ -22,6 +22,7 @@ import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.GoraCassandraTestDriver;
import org.apache.gora.examples.generated.Metadata;
import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.util.GoraException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,9 +76,10 @@ public class TestAvroSerializationWithUDT {
/**
* This is for testGetNested() with UDT dataType with avro serialization
+ * @throws GoraException
*/
@Test
- public void testSimplePutAndGEt() {
+ public void testSimplePutAndGEt() throws GoraException {
webPageCassandraStore.createSchema();
WebPage webpage = WebPage.newBuilder().build();
webpage.setUrl(new Utf8("url.."));
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
index ce9e2df..a588ab1 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
@@ -25,6 +25,7 @@ import org.apache.gora.query.Query;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreTestBase;
import org.apache.gora.store.DataStoreTestUtil;
+import org.apache.gora.util.GoraException;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -83,7 +84,7 @@ public class TestCassandraStore extends DataStoreTestBase {
public void testGetPartitions() throws IOException {
}
- private void preConfiguration() {
+ private void preConfiguration() throws GoraException {
if (webPageStore.schemaExists()) {
webPageStore.truncateSchema();
} else {
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
index 3ae3152..a8039ed 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java
@@ -24,6 +24,7 @@ import org.apache.gora.cassandra.example.generated.AvroSerialization.CassandraRe
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
+import org.apache.gora.util.GoraException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,9 +76,10 @@ public class TestCassandraStoreWithCassandraKey {
/**
* In this test case, schema exists method behavior of the data store is testing.
+ * @throws GoraException
*/
@Test
- public void testSchemaRelatedBehaviour() {
+ public void testSchemaRelatedBehaviour() throws GoraException {
cassandraRecordDataStore.createSchema();
Assert.assertTrue(cassandraRecordDataStore.schemaExists());
cassandraRecordDataStore.deleteSchema();
@@ -88,9 +90,10 @@ public class TestCassandraStoreWithCassandraKey {
/**
* In this test case, get, put and delete methods behaviour of the data store is testing.
+ * @throws GoraException
*/
@Test
- public void testSimplePutGet() {
+ public void testSimplePutGet() throws GoraException {
cassandraRecordDataStore.createSchema();
CassandraRecord record = new CassandraRecord();
record.setDataLong(719411002L);
@@ -221,7 +224,7 @@ public class TestCassandraStoreWithCassandraKey {
}
@Test
- public void testUpdateByQuery() {
+ public void testUpdateByQuery() throws GoraException {
cassandraRecordDataStore.truncateSchema();
//insert data
CassandraRecord record1 = new CassandraRecord();
@@ -266,7 +269,7 @@ public class TestCassandraStoreWithCassandraKey {
@Test
- public void testDataTypes() {
+ public void testDataTypes() throws GoraException {
cassandraRecordDataStore.truncateSchema();
CassandraRecord record = new CassandraRecord();
record.setDataLong(719411002L);
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
index 489732c..3216149 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
@@ -79,9 +79,10 @@ public class TestCassandraStoreWithNativeSerialization {
/**
* In this test case, put and get behavior of the data store are testing.
+ * @throws GoraException
*/
@Test
- public void testSimplePutAndGet() {
+ public void testSimplePutAndGet() throws GoraException {
UUID id = UUID.randomUUID();
User user1 = new User(id, "madhawa", Date.from(Instant.now()));
// storing data;
@@ -94,9 +95,10 @@ public class TestCassandraStoreWithNativeSerialization {
/**
* In this test case, put and delete behavior of the data store are testing.
+ * @throws GoraException
*/
@Test
- public void testSimplePutDeleteAndGet() {
+ public void testSimplePutDeleteAndGet() throws GoraException {
UUID id = UUID.randomUUID();
User user1 = new User(id, "kasun", Date.from(Instant.now()));
// storing data;
@@ -114,9 +116,10 @@ public class TestCassandraStoreWithNativeSerialization {
/**
* In this test case, schema exists method behavior of the data store is testing.
+ * @throws GoraException
*/
@Test()
- public void testSchemaExists() {
+ public void testSchemaExists() throws GoraException {
userDataStore.deleteSchema();
Assert.assertFalse(userDataStore.schemaExists());
userDataStore.createSchema();
@@ -125,9 +128,10 @@ public class TestCassandraStoreWithNativeSerialization {
/**
* In this test case, schema exists method behavior of the data store is testing.
+ * @throws GoraException
*/
@Test
- public void testTruncateSchema() {
+ public void testTruncateSchema() throws GoraException {
if (!userDataStore.schemaExists()) {
userDataStore.createSchema();
}
@@ -144,9 +148,10 @@ public class TestCassandraStoreWithNativeSerialization {
/**
* In this test case, get with fields method behavior of the data store is testing.
+ * @throws GoraException
*/
@Test
- public void testGetWithFields() {
+ public void testGetWithFields() throws GoraException {
UUID id = UUID.randomUUID();
User user1 = new User(id, "Madhawa Kasun Gunasekara", Date.from(Instant.now()));
userDataStore.put(id, user1);
@@ -249,9 +254,10 @@ public class TestCassandraStoreWithNativeSerialization {
/**
* In this test case, update by quert method behavior of the data store is testing.
+ * @throws GoraException
*/
@Test
- public void testUpdateByQuery() {
+ public void testUpdateByQuery() throws GoraException {
userDataStore.truncateSchema();
UUID id1 = UUID.randomUUID();
User user1 = new User(id1, "user1", Date.from(Instant.now()));
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
index f9b5df4..6c35ae6 100644
--- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
+++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java
@@ -21,6 +21,7 @@ package org.apache.gora.cassandra.store;
import org.apache.gora.cassandra.GoraCassandraTestDriver;
import org.apache.gora.cassandra.example.generated.nativeSerialization.Customer;
import org.apache.gora.cassandra.example.generated.nativeSerialization.Document;
+import org.apache.gora.util.GoraException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -68,9 +69,10 @@ public class TestNativeSerializationWithUDT {
/**
* This is for testGetNested() with UDT dataType with native serialization.
+ * @throws GoraException
*/
@Test
- public void testSimplePutAndGEt() {
+ public void testSimplePutAndGEt() throws GoraException {
documentCassandraStore.createSchema();
Document document = new Document();
document.setDefaultId("yawamu.com");