You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/30 07:51:59 UTC
[2/3] ignite git commit: IGNITE-3609 Utilize Cassandra logged batches
for transactions. - Fixes #1111.
IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.
Signed-off-by: Alexey Kuznetsov <ak...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b8aca64
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b8aca64
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b8aca64
Branch: refs/heads/master
Commit: 3b8aca64b8ebe6ba21f5d02f50cf69ad46dbbc95
Parents: 00576d8
Author: Igor <ir...@gmail.com>
Authored: Fri Sep 30 14:39:30 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Sep 30 14:39:30 2016 +0700
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 112 ++++--
.../store/cassandra/common/CassandraHelper.java | 29 +-
.../store/cassandra/persistence/PojoField.java | 9 +-
.../cassandra/persistence/PojoValueField.java | 2 -
.../cassandra/session/CassandraSession.java | 10 +
.../cassandra/session/CassandraSessionImpl.java | 113 +++++-
.../session/transaction/BaseMutation.java | 68 ++++
.../session/transaction/DeleteMutation.java | 57 +++
.../cassandra/session/transaction/Mutation.java | 64 +++
.../session/transaction/WriteMutation.java | 60 +++
.../session/transaction/package-info.java | 21 +
.../tests/CassandraDirectPersistenceTest.java | 396 ++++++++++++++++---
.../ignite/tests/CassandraLocalServer.java | 58 +++
.../apache/ignite/tests/DDLGeneratorTest.java | 35 +-
.../ignite/tests/IgnitePersistentStoreTest.java | 265 +++++++++++++
.../org/apache/ignite/tests/pojos/Product.java | 123 ++++++
.../apache/ignite/tests/pojos/ProductOrder.java | 148 +++++++
.../ignite/tests/utils/CacheStoreHelper.java | 19 +-
.../ignite/tests/utils/TestCacheSession.java | 12 +-
.../ignite/tests/utils/TestTransaction.java | 133 +++++++
.../apache/ignite/tests/utils/TestsHelper.java | 299 +++++++++++++-
.../tests/persistence/pojo/ignite-config.xml | 41 +-
.../ignite/tests/persistence/pojo/order.xml | 21 +
.../ignite/tests/persistence/pojo/product.xml | 21 +
.../store/src/test/resources/tests.properties | 15 +
25 files changed, 2005 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 6aef0c4..aead39a 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -22,8 +22,10 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -41,6 +43,9 @@ import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant;
import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant;
import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker;
+import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
+import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.logger.NullLogger;
@@ -54,14 +59,16 @@ import org.apache.ignite.resources.LoggerResource;
* @param <V> Ignite cache value type.
*/
public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
- /** Connection attribute property name. */
- private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION";
+ /** Buffer to store mutations performed withing transaction. */
+ private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
/** Auto-injected store session. */
+ @SuppressWarnings("unused")
@CacheStoreSessionResource
private CacheStoreSession storeSes;
/** Auto-injected logger instance. */
+ @SuppressWarnings("unused")
@LoggerResource
private IgniteLogger log;
@@ -127,12 +134,22 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
- if (storeSes == null || storeSes.transaction() == null)
+ if (!storeSes.isWithinTransaction())
return;
- CassandraSession cassandraSes = (CassandraSession) storeSes.properties().remove(ATTR_CONN_PROP);
+ List<Mutation> mutations = mutations();
+ if (mutations == null || mutations.isEmpty())
+ return;
- U.closeQuiet(cassandraSes);
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ ses.execute(mutations);
+ }
+ finally {
+ mutations.clear();
+ U.closeQuiet(ses);
+ }
}
/** {@inheritDoc} */
@@ -182,7 +199,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
});
}
finally {
- closeCassandraSession(ses);
+ U.closeQuiet(ses);
}
}
@@ -235,7 +252,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
}, keys);
}
finally {
- closeCassandraSession(ses);
+ U.closeQuiet(ses);
}
}
@@ -244,6 +261,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
if (entry == null || entry.getKey() == null)
return;
+ if (storeSes.isWithinTransaction()) {
+ accumulate(new WriteMutation(entry, cassandraTable(), controller));
+ return;
+ }
+
CassandraSession ses = getCassandraSession();
try {
@@ -285,7 +307,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
});
}
finally {
- closeCassandraSession(ses);
+ U.closeQuiet(ses);
}
}
@@ -294,6 +316,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
if (entries == null || entries.isEmpty())
return;
+ if (storeSes.isWithinTransaction()) {
+ for (Cache.Entry<?, ?> entry : entries)
+ accumulate(new WriteMutation(entry, cassandraTable(), controller));
+
+ return;
+ }
+
CassandraSession ses = getCassandraSession();
try {
@@ -331,7 +360,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
}, entries);
}
finally {
- closeCassandraSession(ses);
+ U.closeQuiet(ses);
}
}
@@ -340,6 +369,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
if (key == null)
return;
+ if (storeSes.isWithinTransaction()) {
+ accumulate(new DeleteMutation(key, cassandraTable(), controller));
+ return;
+ }
+
CassandraSession ses = getCassandraSession();
try {
@@ -382,7 +416,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
});
}
finally {
- closeCassandraSession(ses);
+ U.closeQuiet(ses);
}
}
@@ -391,6 +425,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
if (keys == null || keys.isEmpty())
return;
+ if (storeSes.isWithinTransaction()) {
+ for (Object key : keys)
+ accumulate(new DeleteMutation(key, cassandraTable(), controller));
+
+ return;
+ }
+
CassandraSession ses = getCassandraSession();
try {
@@ -422,7 +463,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
}, keys);
}
finally {
- closeCassandraSession(ses);
+ U.closeQuiet(ses);
}
}
@@ -433,36 +474,43 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
* @return Cassandra session wrapper.
*/
private CassandraSession getCassandraSession() {
- if (storeSes == null || storeSes.transaction() == null)
- return dataSrc.session(log != null ? log : new NullLogger());
-
- CassandraSession ses = (CassandraSession) storeSes.properties().get(ATTR_CONN_PROP);
-
- if (ses == null) {
- ses = dataSrc.session(log != null ? log : new NullLogger());
- storeSes.properties().put(ATTR_CONN_PROP, ses);
- }
+ return dataSrc.session(log != null ? log : new NullLogger());
+ }
- return ses;
+ /**
+ * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+ *
+ * @return Table name.
+ */
+ private String cassandraTable() {
+ return controller.getPersistenceSettings().getTable() != null ?
+ controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase();
}
/**
- * Releases Cassandra related resources.
+ * Accumulates mutation in the transaction buffer.
*
- * @param ses Cassandra session wrapper.
+ * @param mutation Mutation operation.
*/
- private void closeCassandraSession(CassandraSession ses) {
- if (ses != null && (storeSes == null || storeSes.transaction() == null))
- U.closeQuiet(ses);
+ private void accumulate(Mutation mutation) {
+ //noinspection unchecked
+ List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
+
+ if (mutations == null) {
+ mutations = new LinkedList<>();
+ storeSes.properties().put(TRANSACTION_BUFFER, mutations);
+ }
+
+ mutations.add(mutation);
}
/**
- * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+ * Returns all the mutations performed withing transaction.
*
- * @return Table name.
+ * @return Mutations
*/
- private String cassandraTable() {
- return controller.getPersistenceSettings().getTable() != null ?
- controller.getPersistenceSettings().getTable() : storeSes.cacheName().toLowerCase();
+ private List<Mutation> mutations() {
+ //noinspection unchecked
+ return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
index 9066112..badd5df 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
@@ -20,9 +20,13 @@ package org.apache.ignite.cache.store.cassandra.common;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.regex.Pattern;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -36,8 +40,15 @@ public class CassandraHelper {
/** Cassandra error message if trying to create table inside nonexistent keyspace. */
private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*");
+ /** Cassandra error message if trying to create table inside nonexistent keyspace. */
+ private static final Pattern KEYSPACE_EXIST_ERROR3 = Pattern.compile("Error preparing query, got ERROR INVALID: " +
+ "Keyspace [0-9a-zA-Z_]+ does not exist");
+
+ /** Cassandra error message if specified table doesn't exist. */
+ private static final Pattern TABLE_EXIST_ERROR1 = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+
/** Cassandra error message if specified table doesn't exist. */
- private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+ private static final String TABLE_EXIST_ERROR2 = "Error preparing query, got ERROR INVALID: unconfigured table";
/** Cassandra error message if trying to use prepared statement created from another session. */
private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " +
@@ -85,11 +96,25 @@ public class CassandraHelper {
public static boolean isTableAbsenceError(Throwable e) {
while (e != null) {
if (e instanceof InvalidQueryException &&
- (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() ||
+ (TABLE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
return true;
+ if (e instanceof NoHostAvailableException && ((NoHostAvailableException) e).getErrors() != null) {
+ NoHostAvailableException ex = (NoHostAvailableException)e;
+
+ for (Map.Entry<InetSocketAddress, Throwable> entry : ex.getErrors().entrySet()) {
+ //noinspection ThrowableResultOfMethodCallIgnored
+ Throwable error = entry.getValue();
+
+ if (error instanceof DriverException &&
+ (error.getMessage().contains(TABLE_EXIST_ERROR2) ||
+ KEYSPACE_EXIST_ERROR3.matcher(error.getMessage()).matches()))
+ return true;
+ }
+ }
+
e = e.getCause();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
index d708a34..78e75a9 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -85,11 +85,10 @@ public abstract class PojoField implements Serializable {
public PojoField(PropertyDescriptor desc) {
this.name = desc.getName();
- QuerySqlField sqlField = desc.getReadMethod() != null ?
- desc.getReadMethod().getAnnotation(QuerySqlField.class) :
- desc.getWriteMethod() == null ?
- null :
- desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+ QuerySqlField sqlField = desc.getReadMethod() != null &&
+ desc.getReadMethod().getAnnotation(QuerySqlField.class) != null ?
+ desc.getReadMethod().getAnnotation(QuerySqlField.class) :
+ desc.getWriteMethod() == null ? null : desc.getWriteMethod().getAnnotation(QuerySqlField.class);
col = sqlField != null && sqlField.name() != null &&
!sqlField.name().trim().isEmpty() ? sqlField.name() : name.toLowerCase();
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
index c3512c3..3e636c0 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -146,7 +146,5 @@ public class PojoValueField extends PojoField {
* @param sqlField {@link QuerySqlField} annotation.
*/
protected void init(QuerySqlField sqlField) {
- if (sqlField.index())
- isIndexed = true;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
index 506982f..b0e50ec 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
@@ -17,7 +17,10 @@
package org.apache.ignite.cache.store.cassandra.session;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
+
import java.io.Closeable;
+import java.util.List;
/**
* Wrapper around Cassandra driver session, to automatically handle:
@@ -57,4 +60,11 @@ public interface CassandraSession extends Closeable {
* @param assistant execution assistance to perform the main operation logic.
*/
public void execute(BatchLoaderAssistant assistant);
+
+ /**
+ * Executes all the mutations performed withing Ignite transaction against Cassandra database.
+ *
+ * @param mutations Mutations.
+ */
+ public void execute(List<Mutation> mutations);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index d2c9e97..4857fa4 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
/**
@@ -162,7 +163,8 @@ public class CassandraSessionImpl implements CassandraSession {
throw new IgniteException(errorMsg, e);
}
- sleeper.sleep();
+ if (!CassandraHelper.isTableAbsenceError(error))
+ sleeper.sleep();
attempt++;
}
@@ -320,7 +322,8 @@ public class CassandraSessionImpl implements CassandraSession {
handlePreparedStatementClusterError(prepStatEx);
}
- sleeper.sleep();
+ if (!CassandraHelper.isTableAbsenceError(error))
+ sleeper.sleep();
attempt++;
}
@@ -402,6 +405,103 @@ public class CassandraSessionImpl implements CassandraSession {
}
/** {@inheritDoc} */
+ @Override public void execute(List<Mutation> mutations) {
+ if (mutations == null || mutations.isEmpty())
+ return;
+
+ Throwable error = null;
+ String errorMsg = "Failed to apply " + mutations.size() + " mutations performed withing Ignite " +
+ "transaction into Cassandra";
+
+ int attempt = 0;
+ boolean tableExistenceRequired = false;
+ Map<String, PreparedStatement> statements = new HashMap<>();
+ Map<String, KeyValuePersistenceSettings> tableSettings = new HashMap<>();
+ RandomSleeper sleeper = newSleeper();
+
+ incrementSessionRefs();
+
+ try {
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ error = null;
+
+ if (attempt != 0) {
+ log.warning("Trying " + (attempt + 1) + " attempt to apply " + mutations.size() + " mutations " +
+ "performed withing Ignite transaction into Cassandra");
+ }
+
+ try {
+ BatchStatement batch = new BatchStatement();
+
+ // accumulating all the mutations into one Cassandra logged batch
+ for (Mutation mutation : mutations) {
+ String key = mutation.getTable() + mutation.getClass().getName();
+ PreparedStatement st = statements.get(key);
+
+ if (st == null) {
+ st = prepareStatement(mutation.getTable(), mutation.getStatement(),
+ mutation.getPersistenceSettings(), mutation.tableExistenceRequired());
+
+ if (st != null)
+ statements.put(key, st);
+ }
+
+ if (st != null)
+ batch.add(mutation.bindStatement(st));
+
+ if (attempt == 0) {
+ if (mutation.tableExistenceRequired()) {
+ tableExistenceRequired = true;
+
+ if (!tableSettings.containsKey(mutation.getTable()))
+ tableSettings.put(mutation.getTable(), mutation.getPersistenceSettings());
+ }
+ }
+ }
+
+ // committing logged batch into Cassandra
+ if (batch.size() > 0)
+ session().execute(tuneStatementExecutionOptions(batch));
+
+ return;
+ } catch (Throwable e) {
+ error = e;
+
+ if (CassandraHelper.isTableAbsenceError(e)) {
+ if (tableExistenceRequired) {
+ for (Map.Entry<String, KeyValuePersistenceSettings> entry : tableSettings.entrySet())
+ handleTableAbsenceError(entry.getKey(), entry.getValue());
+ }
+ else
+ return;
+ } else if (CassandraHelper.isHostsAvailabilityError(e)) {
+ if (handleHostsAvailabilityError(e, attempt, errorMsg))
+ statements.clear();
+ } else if (CassandraHelper.isPreparedStatementClusterError(e)) {
+ handlePreparedStatementClusterError(e);
+ statements.clear();
+ } else {
+ // For an error which we don't know how to handle, we will not try next attempts and terminate.
+ throw new IgniteException(errorMsg, e);
+ }
+ }
+
+ if (!CassandraHelper.isTableAbsenceError(error))
+ sleeper.sleep();
+
+ attempt++;
+ }
+ } catch (Throwable e) {
+ error = e;
+ } finally {
+ decrementSessionRefs();
+ }
+
+ log.error(errorMsg, error);
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /** {@inheritDoc} */
@Override public synchronized void close() throws IOException {
if (decrementSessionRefs() == 0 && ses != null) {
SessionPool.put(this, ses);
@@ -517,7 +617,8 @@ public class CassandraSessionImpl implements CassandraSession {
error = e;
}
- sleeper.sleep();
+ if (!CassandraHelper.isTableAbsenceError(error))
+ sleeper.sleep();
attempt++;
}
@@ -585,7 +686,7 @@ public class CassandraSessionImpl implements CassandraSession {
log.info("-----------------------------------------------------------------------");
log.info("Creating Cassandra table '" + tableFullName + "'");
log.info("-----------------------------------------------------------------------\n\n" +
- tableFullName + "\n");
+ settings.getTableDDLStatement(table) + "\n");
log.info("-----------------------------------------------------------------------");
session().execute(settings.getTableDDLStatement(table));
log.info("Cassandra table '" + tableFullName + "' was successfully created");
@@ -634,10 +735,14 @@ public class CassandraSessionImpl implements CassandraSession {
while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
try {
+ log.info("-----------------------------------------------------------------------");
log.info("Creating indexes for Cassandra table '" + tableFullName + "'");
+ log.info("-----------------------------------------------------------------------");
for (String statement : indexDDLStatements) {
try {
+ log.info(statement);
+ log.info("-----------------------------------------------------------------------");
session().execute(statement);
}
catch (AlreadyExistsException ignored) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
new file mode 100644
index 0000000..2625e87
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Base class to inherit from to implement specific mutations operation.
+ */
+public abstract class BaseMutation implements Mutation {
+ /** Cassandra table to use. */
+ private final String table;
+
+ /** Persistence controller to be utilized for mutation. */
+ private final PersistenceController ctrl;
+
+ /**
+ * Creates instance of mutation operation.
+ *
+ * @param table Cassandra table which should be used for the mutation.
+ * @param ctrl Persistence controller to use.
+ */
+ public BaseMutation(String table, PersistenceController ctrl) {
+ if (table == null || table.trim().isEmpty())
+ throw new IllegalArgumentException("Table name should be specified");
+
+ if (ctrl == null)
+ throw new IllegalArgumentException("Persistence controller should be specified");
+
+ this.table = table;
+ this.ctrl = ctrl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return table;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return ctrl.getPersistenceSettings();
+ }
+
+ /**
+ * Service method to get persistence controller instance
+ *
+ * @return Persistence controller to use for the mutation
+ */
+ protected PersistenceController controller() {
+ return ctrl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
new file mode 100644
index 0000000..79c0bfe
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Mutation which deletes object from Cassandra.
+ */
+public class DeleteMutation extends BaseMutation {
+ /** Ignite cache key of the object which should be deleted. */
+ private final Object key;
+
+ /**
+ * Creates instance of delete mutation operation.
+ *
+ * @param key Ignite cache key of the object which should be deleted.
+ * @param table Cassandra table which should be used for the mutation.
+ * @param ctrl Persistence controller to use.
+ */
+ public DeleteMutation(Object key, String table, PersistenceController ctrl) {
+ super(table, ctrl);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller().getDeleteStatement(getTable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement) {
+ return controller().bindKey(statement, key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
new file mode 100644
index 0000000..cb014f8
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information about particular mutation operation performed withing transaction.
+ */
+public interface Mutation {
+ /**
+ * Cassandra table to use for an operation.
+ *
+ * @return Table name.
+ */
+ public String getTable();
+
+ /**
+ * Indicates if Cassandra tables existence is required for this operation.
+ *
+ * @return {@code true} true if table existence required.
+ */
+ public boolean tableExistenceRequired();
+
+ /**
+ * Returns Ignite cache key/value persistence settings.
+ *
+ * @return persistence settings.
+ */
+ public KeyValuePersistenceSettings getPersistenceSettings();
+
+ /**
+ * Returns unbind CLQ statement for to be executed.
+ *
+ * @return Unbind CQL statement.
+ */
+ public String getStatement();
+
+ /**
+ * Binds prepared statement to current Cassandra session.
+ *
+ * @param statement Statement.
+ * @param obj Parameters for statement binding.
+ * @return Bounded statement.
+ */
+ public BoundStatement bindStatement(PreparedStatement statement);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
new file mode 100644
index 0000000..3c74378
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import javax.cache.Cache;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Mutation which writes(inserts) object into Cassandra.
+ */
+public class WriteMutation extends BaseMutation {
+ /** Ignite cache entry to be inserted into Cassandra. */
+ private final Cache.Entry entry;
+
+ /**
+ * Creates instance of delete mutation operation.
+ *
+ * @param entry Ignite cache entry to be inserted into Cassandra.
+ * @param table Cassandra table which should be used for the mutation.
+ * @param ctrl Persistence controller to use.
+ */
+ public WriteMutation(Cache.Entry entry, String table, PersistenceController ctrl) {
+ super(table, ctrl);
+ this.entry = entry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller().getWriteStatement(getTable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement) {
+ return controller().bindKeyValue(statement, entry.getKey(), entry.getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
new file mode 100644
index 0000000..7141845
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains mutations implementation, to store changes made inside Ignite transaction
+ */
+package org.apache.ignite.cache.store.cassandra.session.transaction;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
index 9974898..f9e9649 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
@@ -18,14 +18,21 @@
package org.apache.ignite.tests;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.tests.pojos.Person;
import org.apache.ignite.tests.pojos.PersonId;
+import org.apache.ignite.tests.pojos.Product;
+import org.apache.ignite.tests.pojos.ProductOrder;
import org.apache.ignite.tests.utils.CacheStoreHelper;
import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.ignite.tests.utils.TestCacheSession;
+import org.apache.ignite.tests.utils.TestTransaction;
import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.ignite.transactions.Transaction;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -113,31 +120,31 @@ public class CassandraDirectPersistenceTest {
LOGGER.info("Running PRIMITIVE strategy write tests");
- LOGGER.info("Running single operation write tests");
+ LOGGER.info("Running single write operation tests");
store1.write(longEntries.iterator().next());
store2.write(strEntries.iterator().next());
- LOGGER.info("Single operation write tests passed");
+ LOGGER.info("Single write operation tests passed");
- LOGGER.info("Running bulk operation write tests");
+ LOGGER.info("Running bulk write operation tests");
store1.writeAll(longEntries);
store2.writeAll(strEntries);
- LOGGER.info("Bulk operation write tests passed");
+ LOGGER.info("Bulk write operation tests passed");
LOGGER.info("PRIMITIVE strategy write tests passed");
LOGGER.info("Running PRIMITIVE strategy read tests");
- LOGGER.info("Running single operation read tests");
+ LOGGER.info("Running single read operation tests");
LOGGER.info("Running real keys read tests");
Long longVal = (Long)store1.load(longEntries.iterator().next().getKey());
if (!longEntries.iterator().next().getValue().equals(longVal))
- throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
String strVal = (String)store2.load(strEntries.iterator().next().getKey());
if (!strEntries.iterator().next().getValue().equals(strVal))
- throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
LOGGER.info("Running fake keys read tests");
@@ -149,31 +156,31 @@ public class CassandraDirectPersistenceTest {
if (strVal != null)
throw new RuntimeException("String value with fake key '-1' was found in Cassandra");
- LOGGER.info("Single operation read tests passed");
+ LOGGER.info("Single read operation tests passed");
- LOGGER.info("Running bulk operation read tests");
+ LOGGER.info("Running bulk read operation tests");
LOGGER.info("Running real keys read tests");
Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries));
if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
- throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
Map strValues = store2.loadAll(TestsHelper.getKeys(strEntries));
if (!TestsHelper.checkCollectionsEqual(strValues, strEntries))
- throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
LOGGER.info("Running fake keys read tests");
longValues = store1.loadAll(fakeLongKeys);
if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
- throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
strValues = store2.loadAll(fakeStrKeys);
if (!TestsHelper.checkCollectionsEqual(strValues, strEntries))
- throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
- LOGGER.info("Bulk operation read tests passed");
+ LOGGER.info("Bulk read operation tests passed");
LOGGER.info("PRIMITIVE strategy read tests passed");
@@ -219,53 +226,53 @@ public class CassandraDirectPersistenceTest {
LOGGER.info("Running BLOB strategy write tests");
- LOGGER.info("Running single operation write tests");
+ LOGGER.info("Running single write operation tests");
store1.write(longEntries.iterator().next());
store2.write(personEntries.iterator().next());
store3.write(personEntries.iterator().next());
- LOGGER.info("Single operation write tests passed");
+ LOGGER.info("Single write operation tests passed");
- LOGGER.info("Running bulk operation write tests");
+ LOGGER.info("Running bulk write operation tests");
store1.writeAll(longEntries);
store2.writeAll(personEntries);
store3.writeAll(personEntries);
- LOGGER.info("Bulk operation write tests passed");
+ LOGGER.info("Bulk write operation tests passed");
LOGGER.info("BLOB strategy write tests passed");
LOGGER.info("Running BLOB strategy read tests");
- LOGGER.info("Running single operation read tests");
+ LOGGER.info("Running single read operation tests");
Long longVal = (Long)store1.load(longEntries.iterator().next().getKey());
if (!longEntries.iterator().next().getValue().equals(longVal))
- throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
Person personVal = (Person)store2.load(personEntries.iterator().next().getKey());
if (!personEntries.iterator().next().getValue().equals(personVal))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
personVal = (Person)store3.load(personEntries.iterator().next().getKey());
if (!personEntries.iterator().next().getValue().equals(personVal))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
- LOGGER.info("Single operation read tests passed");
+ LOGGER.info("Single read operation tests passed");
- LOGGER.info("Running bulk operation read tests");
+ LOGGER.info("Running bulk read operation tests");
Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries));
if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
- throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
Map personValues = store2.loadAll(TestsHelper.getKeys(personEntries));
if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
personValues = store3.loadAll(TestsHelper.getKeys(personEntries));
if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
- LOGGER.info("Bulk operation read tests passed");
+ LOGGER.info("Bulk read operation tests passed");
LOGGER.info("BLOB strategy read tests passed");
@@ -303,69 +310,99 @@ public class CassandraDirectPersistenceTest {
new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml"),
CassandraHelper.getAdminDataSrc());
+ CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+ new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+ CassandraHelper.getAdminDataSrc());
+
+ CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+ new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+ CassandraHelper.getAdminDataSrc());
+
Collection<CacheEntryImpl<Long, Person>> entries1 = TestsHelper.generateLongsPersonsEntries();
Collection<CacheEntryImpl<PersonId, Person>> entries2 = TestsHelper.generatePersonIdsPersonsEntries();
Collection<CacheEntryImpl<PersonId, Person>> entries3 = TestsHelper.generatePersonIdsPersonsEntries();
+ Collection<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries();
+ Collection<CacheEntryImpl<Long, ProductOrder>> orderEntries = TestsHelper.generateOrderEntries();
LOGGER.info("Running POJO strategy write tests");
- LOGGER.info("Running single operation write tests");
+ LOGGER.info("Running single write operation tests");
store1.write(entries1.iterator().next());
store2.write(entries2.iterator().next());
store3.write(entries3.iterator().next());
store4.write(entries3.iterator().next());
- LOGGER.info("Single operation write tests passed");
+ productStore.write(productEntries.iterator().next());
+ orderStore.write(orderEntries.iterator().next());
+ LOGGER.info("Single write operation tests passed");
- LOGGER.info("Running bulk operation write tests");
+ LOGGER.info("Running bulk write operation tests");
store1.writeAll(entries1);
store2.writeAll(entries2);
store3.writeAll(entries3);
store4.writeAll(entries3);
- LOGGER.info("Bulk operation write tests passed");
+ productStore.writeAll(productEntries);
+ orderStore.writeAll(orderEntries);
+ LOGGER.info("Bulk write operation tests passed");
LOGGER.info("POJO strategy write tests passed");
LOGGER.info("Running POJO strategy read tests");
- LOGGER.info("Running single operation read tests");
+ LOGGER.info("Running single read operation tests");
Person person = (Person)store1.load(entries1.iterator().next().getKey());
if (!entries1.iterator().next().getValue().equalsPrimitiveFields(person))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
person = (Person)store2.load(entries2.iterator().next().getKey());
if (!entries2.iterator().next().getValue().equalsPrimitiveFields(person))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
person = (Person)store3.load(entries3.iterator().next().getKey());
if (!entries3.iterator().next().getValue().equals(person))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
person = (Person)store4.load(entries3.iterator().next().getKey());
if (!entries3.iterator().next().getValue().equals(person))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
- LOGGER.info("Single operation read tests passed");
+ Product product = (Product)productStore.load(productEntries.iterator().next().getKey());
+ if (!productEntries.iterator().next().getValue().equals(product))
+ throw new RuntimeException("Product values were incorrectly deserialized from Cassandra");
- LOGGER.info("Running bulk operation read tests");
+ ProductOrder order = (ProductOrder)orderStore.load(orderEntries.iterator().next().getKey());
+ if (!orderEntries.iterator().next().getValue().equals(order))
+ throw new RuntimeException("Order values were incorrectly deserialized from Cassandra");
+
+ LOGGER.info("Single read operation tests passed");
+
+ LOGGER.info("Running bulk read operation tests");
Map persons = store1.loadAll(TestsHelper.getKeys(entries1));
if (!TestsHelper.checkPersonCollectionsEqual(persons, entries1, true))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
persons = store2.loadAll(TestsHelper.getKeys(entries2));
if (!TestsHelper.checkPersonCollectionsEqual(persons, entries2, true))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
persons = store3.loadAll(TestsHelper.getKeys(entries3));
if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
persons = store4.loadAll(TestsHelper.getKeys(entries3));
if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false))
- throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+ throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
+
+ Map products = productStore.loadAll(TestsHelper.getKeys(productEntries));
+ if (!TestsHelper.checkProductCollectionsEqual(products, productEntries))
+ throw new RuntimeException("Product values were incorrectly deserialized from Cassandra");
+
+ Map orders = orderStore.loadAll(TestsHelper.getKeys(orderEntries));
+ if (!TestsHelper.checkOrderCollectionsEqual(orders, orderEntries))
+ throw new RuntimeException("Order values were incorrectly deserialized from Cassandra");
- LOGGER.info("Bulk operation read tests passed");
+ LOGGER.info("Bulk read operation tests passed");
LOGGER.info("POJO strategy read tests passed");
@@ -383,6 +420,277 @@ public class CassandraDirectPersistenceTest {
store4.delete(entries3.iterator().next().getKey());
store4.deleteAll(TestsHelper.getKeys(entries3));
+ productStore.delete(productEntries.iterator().next().getKey());
+ productStore.deleteAll(TestsHelper.getKeys(productEntries));
+
+ orderStore.delete(orderEntries.iterator().next().getKey());
+ orderStore.deleteAll(TestsHelper.getKeys(orderEntries));
+
LOGGER.info("POJO strategy delete tests passed");
}
+
+ /** */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void pojoStrategyTransactionTest() {
+ Map<Object, Object> sessionProps = U.newHashMap(1);
+ Transaction sessionTx = new TestTransaction();
+
+ CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+ new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+ CassandraHelper.getAdminDataSrc(), new TestCacheSession("product", sessionTx, sessionProps));
+
+ CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+ new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+ CassandraHelper.getAdminDataSrc(), new TestCacheSession("order", sessionTx, sessionProps));
+
+ List<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries();
+ Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> ordersPerProduct =
+ TestsHelper.generateOrdersPerProductEntries(productEntries, 2);
+
+ Collection<Long> productIds = TestsHelper.getProductIds(productEntries);
+ Collection<Long> orderIds = TestsHelper.getOrderIds(ordersPerProduct);
+
+ LOGGER.info("Running POJO strategy transaction write tests");
+
+ LOGGER.info("Running single write operation tests");
+
+ CassandraHelper.dropTestKeyspaces();
+
+ Product product = productEntries.iterator().next().getValue();
+ ProductOrder order = ordersPerProduct.get(product.getId()).iterator().next().getValue();
+
+ productStore.write(productEntries.iterator().next());
+ orderStore.write(ordersPerProduct.get(product.getId()).iterator().next());
+
+ if (productStore.load(product.getId()) != null || orderStore.load(order.getId()) != null) {
+ throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already persisted into Cassandra");
+ }
+
+ Map<Long, Product> products = (Map<Long, Product>)productStore.loadAll(productIds);
+ Map<Long, ProductOrder> orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+ throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already persisted into Cassandra");
+ }
+
+ //noinspection deprecation
+ orderStore.sessionEnd(true);
+ //noinspection deprecation
+ productStore.sessionEnd(true);
+
+ Product product1 = (Product)productStore.load(product.getId());
+ ProductOrder order1 = (ProductOrder)orderStore.load(order.getId());
+
+ if (product1 == null || order1 == null) {
+ throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+ "no objects were persisted into Cassandra");
+ }
+
+ if (!product.equals(product1) || !order.equals(order1)) {
+ throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+ "objects were incorrectly persisted/loaded to/from Cassandra");
+ }
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+ throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+ "no objects were persisted into Cassandra");
+ }
+
+ if (products.size() > 1 || orders.size() > 1) {
+ throw new RuntimeException("Single write operation test failed. There were committed more objects " +
+ "into Cassandra than expected");
+ }
+
+ product1 = products.entrySet().iterator().next().getValue();
+ order1 = orders.entrySet().iterator().next().getValue();
+
+ if (!product.equals(product1) || !order.equals(order1)) {
+ throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+ "objects were incorrectly persisted/loaded to/from Cassandra");
+ }
+
+ LOGGER.info("Single write operation tests passed");
+
+ LOGGER.info("Running bulk write operation tests");
+
+ CassandraHelper.dropTestKeyspaces();
+ sessionProps.clear();
+
+ productStore.writeAll(productEntries);
+
+ for (Long productId : ordersPerProduct.keySet())
+ orderStore.writeAll(ordersPerProduct.get(productId));
+
+ for (Long productId : productIds) {
+ if (productStore.load(productId) != null) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already persisted into Cassandra");
+ }
+ }
+
+ for (Long orderId : orderIds) {
+ if (orderStore.load(orderId) != null) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already persisted into Cassandra");
+ }
+ }
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already persisted into Cassandra");
+ }
+
+ //noinspection deprecation
+ productStore.sessionEnd(true);
+ //noinspection deprecation
+ orderStore.sessionEnd(true);
+
+ for (CacheEntryImpl<Long, Product> entry : productEntries) {
+ product = (Product)productStore.load(entry.getKey());
+
+ if (!entry.getValue().equals(product)) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+ "not all objects were persisted into Cassandra");
+ }
+ }
+
+ for (Long productId : ordersPerProduct.keySet()) {
+ for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) {
+ order = (ProductOrder)orderStore.load(entry.getKey());
+
+ if (!entry.getValue().equals(order)) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+ "not all objects were persisted into Cassandra");
+ }
+ }
+ }
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+ "no objects were persisted into Cassandra");
+ }
+
+ if (products.size() < productIds.size() || orders.size() < orderIds.size()) {
+ throw new RuntimeException("Bulk write operation test failed. There were committed less objects " +
+ "into Cassandra than expected");
+ }
+
+ if (products.size() > productIds.size() || orders.size() > orderIds.size()) {
+ throw new RuntimeException("Bulk write operation test failed. There were committed more objects " +
+ "into Cassandra than expected");
+ }
+
+ for (CacheEntryImpl<Long, Product> entry : productEntries) {
+ product = products.get(entry.getKey());
+
+ if (!entry.getValue().equals(product)) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+ "some objects were incorrectly persisted/loaded to/from Cassandra");
+ }
+ }
+
+ for (Long productId : ordersPerProduct.keySet()) {
+ for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) {
+ order = orders.get(entry.getKey());
+
+ if (!entry.getValue().equals(order)) {
+ throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+ "some objects were incorrectly persisted/loaded to/from Cassandra");
+ }
+ }
+ }
+
+ LOGGER.info("Bulk write operation tests passed");
+
+ LOGGER.info("POJO strategy transaction write tests passed");
+
+ LOGGER.info("Running POJO strategy transaction delete tests");
+
+ LOGGER.info("Running single delete tests");
+
+ sessionProps.clear();
+
+ Product deletedProduct = productEntries.remove(0).getValue();
+ ProductOrder deletedOrder = ordersPerProduct.get(deletedProduct.getId()).remove(0).getValue();
+
+ productStore.delete(deletedProduct.getId());
+ orderStore.delete(deletedOrder.getId());
+
+ if (productStore.load(deletedProduct.getId()) == null || orderStore.load(deletedOrder.getId()) == null) {
+ throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already deleted from Cassandra");
+ }
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if (products.size() != productIds.size() || orders.size() != orderIds.size()) {
+ throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already deleted from Cassandra");
+ }
+
+ //noinspection deprecation
+ productStore.sessionEnd(true);
+ //noinspection deprecation
+ orderStore.sessionEnd(true);
+
+ if (productStore.load(deletedProduct.getId()) != null || orderStore.load(deletedOrder.getId()) != null) {
+ throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+ "objects were not deleted from Cassandra");
+ }
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if (products.get(deletedProduct.getId()) != null || orders.get(deletedOrder.getId()) != null) {
+ throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+ "objects were not deleted from Cassandra");
+ }
+
+ LOGGER.info("Single delete tests passed");
+
+ LOGGER.info("Running bulk delete tests");
+
+ sessionProps.clear();
+
+ productStore.deleteAll(productIds);
+ orderStore.deleteAll(orderIds);
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+ throw new RuntimeException("Bulk delete operation test failed. Transaction wasn't committed yet, but " +
+ "objects were already deleted from Cassandra");
+ }
+
+ //noinspection deprecation
+ orderStore.sessionEnd(true);
+ //noinspection deprecation
+ productStore.sessionEnd(true);
+
+ products = (Map<Long, Product>)productStore.loadAll(productIds);
+ orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+ if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+ throw new RuntimeException("Bulk delete operation test failed. Transaction was committed, but " +
+ "objects were not deleted from Cassandra");
+ }
+
+ LOGGER.info("Bulk delete tests passed");
+
+ LOGGER.info("POJO strategy transaction delete tests passed");
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
new file mode 100644
index 0000000..fc54e5b
--- /dev/null
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Simple helper class to run Cassandra on localhost
+ */
+public class CassandraLocalServer {
+ /** */
+ private static final Logger LOGGER = Logger.getLogger(CassandraLocalServer.class.getName());
+
+ /** */
+ public static void main(String[] args) {
+ try {
+ CassandraHelper.startEmbeddedCassandra(LOGGER);
+ }
+ catch (Throwable e) {
+ throw new RuntimeException("Failed to start embedded Cassandra instance", e);
+ }
+
+ LOGGER.info("Testing admin connection to Cassandra");
+ CassandraHelper.testAdminConnection();
+
+ LOGGER.info("Testing regular connection to Cassandra");
+ CassandraHelper.testRegularConnection();
+
+ LOGGER.info("Dropping all artifacts from previous tests execution session");
+ CassandraHelper.dropTestKeyspaces();
+
+ while (true) {
+ try {
+ System.out.println("Cassandra server running");
+ Thread.sleep(10000);
+ }
+ catch (Throwable e) {
+ throw new RuntimeException("Cassandra server terminated", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
index 43b6d3c..6465580 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
@@ -25,33 +25,30 @@ import org.junit.Test;
* DDLGenerator test.
*/
public class DDLGeneratorTest {
- private static final String URL1 = "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml";
- private static final String URL2 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml";
- private static final String URL3 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml";
+ private static final String[] RESOURCES = new String[] {
+ "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml",
+ "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml",
+ "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml",
+ "org/apache/ignite/tests/persistence/pojo/product.xml",
+ "org/apache/ignite/tests/persistence/pojo/order.xml"
+ };
@Test
@SuppressWarnings("unchecked")
/** */
public void generatorTest() {
- ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader();
-
- URL url1 = clsLdr.getResource(URL1);
- if (url1 == null)
- throw new IllegalStateException("Failed to find resource: " + URL1);
+ String[] files = new String[RESOURCES.length];
- URL url2 = clsLdr.getResource(URL2);
- if (url2 == null)
- throw new IllegalStateException("Failed to find resource: " + URL2);
+ ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader();
- URL url3 = clsLdr.getResource(URL3);
- if (url3 == null)
- throw new IllegalStateException("Failed to find resource: " + URL3);
+ for (int i = 0; i < RESOURCES.length; i++) {
+ URL url = clsLdr.getResource(RESOURCES[i]);
+ if (url == null)
+ throw new IllegalStateException("Failed to find resource: " + RESOURCES[i]);
- String file1 = url1.getFile();
- String file2 = url2.getFile();
- String file3 = url3.getFile();
+ files[i] = url.getFile();
+ }
- DDLGenerator.main(new String[]{file1, file2, file3});
+ DDLGenerator.main(files);
}
-
}