You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/14 01:37:53 UTC
[07/20] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra
serializers. - Fixes #956.
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java
new file mode 100644
index 0000000..4edd759
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains serializers implementation, to store BLOBs into Cassandra
+ */
+package org.apache.ignite.cache.store.cassandra.serializer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
new file mode 100644
index 0000000..e43db1d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information for batch operations (loadAll, deleteAll, writeAll) of Ignite cache
+ * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ *
+ * @param <R> type of the result returned from batch operation.
+ * @param <V> type of the value used in batch operation.
+ */
+public interface BatchExecutionAssistant<R, V> {
+ /**
+ * Indicates if Cassandra tables existence is required for this batch operation.
+ *
+ * @return {@code true} true if table existence required.
+ */
+ public boolean tableExistenceRequired();
+
+ /**
+ * Returns unbind CLQ statement for to be executed inside batch operation.
+ *
+ * @return Unbind CQL statement.
+ */
+ public String getStatement();
+
+ /**
+ * Binds prepared statement to current Cassandra session.
+ *
+ * @param statement Statement.
+ * @param obj Parameters for statement binding.
+ * @return Bounded statement.
+ */
+ public BoundStatement bindStatement(PreparedStatement statement, V obj);
+
+ /**
+ * Returns Ignite cache key/value persistence settings.
+ *
+ * @return persistence settings.
+ */
+ public KeyValuePersistenceSettings getPersistenceSettings();
+
+ /**
+ * Display name for the batch operation.
+ *
+ * @return Operation display name.
+ */
+ public String operationName();
+
+ /**
+ * Processes particular row inside batch operation.
+ *
+ * @param row Row to process.
+ * @param seqNum Sequential number of the row.
+ */
+ public void process(Row row, int seqNum);
+
+ /**
+ * Checks if row/object with specified sequential number is already processed.
+ *
+ * @param seqNum object sequential number
+ * @return {@code true} if object is already processed
+ */
+ public boolean alreadyProcessed(int seqNum);
+
+ /**
+ * @return number of processed objects/rows.
+ */
+ public int processedCount();
+
+ /**
+ * @return batch operation result.
+ */
+ public R processedData();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
new file mode 100644
index 0000000..387c98f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+
+/**
+ * Provides information for loadCache operation of {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ */
+public interface BatchLoaderAssistant {
+ /**
+ * Returns name of the batch load operation.
+ *
+ * @return operation name.
+ */
+ public String operationName();
+
+ /**
+ * Returns CQL statement to use in batch load operation.
+ *
+ * @return CQL statement for batch load operation.
+ */
+ public Statement getStatement();
+
+ /**
+ * Processes each row returned by batch load operation.
+ *
+ * @param row row selected from Cassandra table.
+ */
+ public void process(Row row);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
new file mode 100644
index 0000000..506982f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import java.io.Closeable;
+
+/**
+ * Wrapper around Cassandra driver session, to automatically handle:
+ * <ul>
+ * <li>Keyspace and table absence exceptions</li>
+ * <li>Timeout exceptions</li>
+ * <li>Batch operations</li>
+ * </ul>
+ */
+public interface CassandraSession extends Closeable {
+ /**
+ * Execute single synchronous operation against Cassandra database.
+ *
+ * @param assistant execution assistance to perform the main operation logic.
+ * @param <V> type of the result returned from operation.
+ *
+ * @return result of the operation.
+ */
+ public <V> V execute(ExecutionAssistant<V> assistant);
+
+ /**
+ * Executes batch asynchronous operation against Cassandra database.
+ *
+ * @param assistant execution assistance to perform the main operation logic.
+ * @param data data which should be processed in batch operation.
+ * @param <R> type of the result returned from batch operation.
+ * @param <V> type of the value used in batch operation.
+ *
+ * @return result of the operation.
+ */
+ public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data);
+
+ /**
+ * Executes batch asynchronous operation to load bunch of records
+ * specified by CQL statement from Cassandra database
+ *
+ * @param assistant execution assistance to perform the main operation logic.
+ */
+ public void execute(BatchLoaderAssistant assistant);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
new file mode 100644
index 0000000..95b8581
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.querybuilder.Batch;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+
+/**
+ * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}.
+ */
+public class CassandraSessionImpl implements CassandraSession {
+ /** Number of CQL query execution attempts. */
+ private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20;
+
+ /** Min timeout between CQL query execution attempts. */
+ private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100;
+
+ /** Max timeout between CQL query execution attempts. */
+ private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500;
+
+ /** Timeout increment for CQL query execution attempts. */
+ private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100;
+
+ /** Cassandra cluster builder. */
+ private volatile Cluster.Builder builder;
+
+ /** Cassandra driver session. */
+ private volatile Session ses;
+
+ /** Number of references to Cassandra driver session (for multithreaded environment). */
+ private volatile int refCnt = 0;
+
+ /** Storage for the session prepared statements */
+ private static final Map<String, PreparedStatement> sesStatements = new HashMap<>();
+
+ /** Number of records to immediately fetch in CQL statement execution. */
+ private Integer fetchSize;
+
+ /** Consistency level for Cassandra READ operations (select). */
+ private ConsistencyLevel readConsistency;
+
+ /** Consistency level for Cassandra WRITE operations (insert/update/delete). */
+ private ConsistencyLevel writeConsistency;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Table absence error handlers counter. */
+ private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1);
+
+ /** Prepared statement cluster disconnection error handlers counter. */
+ private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1);
+
+ /**
+ * Creates instance of Cassandra driver session wrapper.
+ *
+ * @param builder Builder for Cassandra cluster.
+ * @param fetchSize Number of rows to immediately fetch in CQL statement execution.
+ * @param readConsistency Consistency level for Cassandra READ operations (select).
+ * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete).
+ * @param log Logger.
+ */
+ public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
+ ConsistencyLevel writeConsistency, IgniteLogger log) {
+ this.builder = builder;
+ this.fetchSize = fetchSize;
+ this.readConsistency = readConsistency;
+ this.writeConsistency = writeConsistency;
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <V> V execute(ExecutionAssistant<V> assistant) {
+ int attempt = 0;
+ Throwable error = null;
+ String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement();
+
+ RandomSleeper sleeper = newSleeper();
+
+ incrementSessionRefs();
+
+ try {
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ error = null;
+
+ if (attempt != 0) {
+ log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " +
+ assistant.getStatement());
+ }
+
+ try {
+ PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
+ assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
+
+ if (preparedSt == null)
+ return null;
+
+ Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt));
+ ResultSet res = session().execute(statement);
+
+ Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next();
+
+ return row == null ? null : assistant.process(row);
+ }
+ catch (Throwable e) {
+ error = e;
+
+ if (CassandraHelper.isTableAbsenceError(e)) {
+ if (!assistant.tableExistenceRequired()) {
+ log.warning(errorMsg, e);
+ return null;
+ }
+
+ handleTableAbsenceError(assistant.getPersistenceSettings());
+ }
+ else if (CassandraHelper.isHostsAvailabilityError(e))
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+ else if (CassandraHelper.isPreparedStatementClusterError(e))
+ handlePreparedStatementClusterError(e);
+ else
+ // For an error which we don't know how to handle, we will not try next attempts and terminate.
+ throw new IgniteException(errorMsg, e);
+ }
+
+ sleeper.sleep();
+
+ attempt++;
+ }
+ }
+ catch (Throwable e) {
+ error = e;
+ }
+ finally {
+ decrementSessionRefs();
+ }
+
+ log.error(errorMsg, error);
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) {
+ if (data == null || !data.iterator().hasNext())
+ return assistant.processedData();
+
+ int attempt = 0;
+ String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
+ Throwable error = new IgniteException(errorMsg);
+
+ RandomSleeper sleeper = newSleeper();
+
+ int dataSize = 0;
+
+ incrementSessionRefs();
+
+ try {
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ if (attempt != 0) {
+ log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " +
+ assistant.operationName() + " operation to process rest " +
+ (dataSize - assistant.processedCount()) + " of " + dataSize + " elements");
+ }
+
+ //clean errors info before next communication with Cassandra
+ Throwable unknownEx = null;
+ Throwable tblAbsenceEx = null;
+ Throwable hostsAvailEx = null;
+ Throwable prepStatEx = null;
+
+ List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>();
+
+ PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
+ assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
+
+ if (preparedSt == null)
+ return null;
+
+ int seqNum = 0;
+
+ for (V obj : data) {
+ if (!assistant.alreadyProcessed(seqNum)) {
+ try {
+ Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj));
+ ResultSetFuture fut = session().executeAsync(statement);
+ futResults.add(new CacheEntryImpl<>(seqNum, fut));
+ }
+ catch (Throwable e) {
+ if (CassandraHelper.isTableAbsenceError(e)) {
+ // If there are table absence error and it is not required for the operation we can return.
+ if (!assistant.tableExistenceRequired())
+ return assistant.processedData();
+
+ tblAbsenceEx = e;
+ handleTableAbsenceError(assistant.getPersistenceSettings());
+ }
+ else if (CassandraHelper.isHostsAvailabilityError(e)) {
+ hostsAvailEx = e;
+
+ // Handle host availability only once.
+ if (hostsAvailEx == null)
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+ }
+ else if (CassandraHelper.isPreparedStatementClusterError(e)) {
+ prepStatEx = e;
+ handlePreparedStatementClusterError(e);
+ }
+ else
+ unknownEx = e;
+ }
+ }
+
+ seqNum++;
+ }
+
+ dataSize = seqNum;
+
+ // For an error which we don't know how to handle, we will not try next attempts and terminate.
+ if (unknownEx != null)
+ throw new IgniteException(errorMsg, unknownEx);
+
+ // Remembering any of last errors.
+ if (tblAbsenceEx != null)
+ error = tblAbsenceEx;
+ else if (hostsAvailEx != null)
+ error = hostsAvailEx;
+ else if (prepStatEx != null)
+ error = prepStatEx;
+
+ // Clean errors info before next communication with Cassandra.
+ unknownEx = null;
+ tblAbsenceEx = null;
+ hostsAvailEx = null;
+ prepStatEx = null;
+
+ for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) {
+ try {
+ ResultSet resSet = futureResult.getValue().getUninterruptibly();
+ Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null;
+
+ if (row != null)
+ assistant.process(row, futureResult.getKey());
+ }
+ catch (Throwable e) {
+ if (CassandraHelper.isTableAbsenceError(e))
+ tblAbsenceEx = e;
+ else if (CassandraHelper.isHostsAvailabilityError(e))
+ hostsAvailEx = e;
+ else if (CassandraHelper.isPreparedStatementClusterError(e))
+ prepStatEx = e;
+ else
+ unknownEx = e;
+ }
+ }
+
+ // For an error which we don't know how to handle, we will not try next attempts and terminate.
+ if (unknownEx != null)
+ throw new IgniteException(errorMsg, unknownEx);
+
+ // If there are no errors occurred it means that operation successfully completed and we can return.
+ if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null)
+ return assistant.processedData();
+
+ if (tblAbsenceEx != null) {
+ // If there are table absence error and it is not required for the operation we can return.
+ if (!assistant.tableExistenceRequired())
+ return assistant.processedData();
+
+ error = tblAbsenceEx;
+ handleTableAbsenceError(assistant.getPersistenceSettings());
+ }
+
+ if (hostsAvailEx != null) {
+ error = hostsAvailEx;
+ handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg);
+ }
+
+ if (prepStatEx != null) {
+ error = prepStatEx;
+ handlePreparedStatementClusterError(prepStatEx);
+ }
+
+ sleeper.sleep();
+
+ attempt++;
+ }
+ }
+ catch (Throwable e) {
+ error = e;
+ }
+ finally {
+ decrementSessionRefs();
+ }
+
+ errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) +
+ " of " + dataSize + " elements, during " + assistant.operationName() +
+ " operation with Cassandra";
+
+ log.error(errorMsg, error);
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(BatchLoaderAssistant assistant) {
+ int attempt = 0;
+ String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
+ Throwable error = new IgniteException(errorMsg);
+
+ RandomSleeper sleeper = newSleeper();
+
+ incrementSessionRefs();
+
+ try {
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ if (attempt != 0)
+ log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache");
+
+ Statement statement = tuneStatementExecutionOptions(assistant.getStatement());
+
+ try {
+ ResultSetFuture fut = session().executeAsync(statement);
+ ResultSet resSet = fut.getUninterruptibly();
+
+ if (resSet == null || !resSet.iterator().hasNext())
+ return;
+
+ for (Row row : resSet)
+ assistant.process(row);
+
+ return;
+ }
+ catch (Throwable e) {
+ error = e;
+
+ if (CassandraHelper.isTableAbsenceError(e))
+ return;
+ else if (CassandraHelper.isHostsAvailabilityError(e))
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+ else if (CassandraHelper.isPreparedStatementClusterError(e))
+ handlePreparedStatementClusterError(e);
+ else
+ // For an error which we don't know how to handle, we will not try next attempts and terminate.
+ throw new IgniteException(errorMsg, e);
+ }
+
+ sleeper.sleep();
+
+ attempt++;
+ }
+ }
+ catch (Throwable e) {
+ error = e;
+ }
+ finally {
+ decrementSessionRefs();
+ }
+
+ log.error(errorMsg, error);
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ if (decrementSessionRefs() == 0 && ses != null) {
+ SessionPool.put(this, ses);
+ ses = null;
+ }
+ }
+
+ /**
+ * Recreates Cassandra driver session.
+ */
+ private synchronized void refresh() {
+ //make sure that session removed from the pool
+ SessionPool.get(this);
+
+ //closing and reopening session
+ CassandraHelper.closeSession(ses);
+ ses = null;
+ session();
+
+ synchronized (sesStatements) {
+ sesStatements.clear();
+ }
+ }
+
+ /**
+ * @return Cassandra driver session.
+ */
+ private synchronized Session session() {
+ if (ses != null)
+ return ses;
+
+ ses = SessionPool.get(this);
+
+ if (ses != null)
+ return ses;
+
+ synchronized (sesStatements) {
+ sesStatements.clear();
+ }
+
+ try {
+ return ses = builder.build().connect();
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Failed to establish session with Cassandra database", e);
+ }
+ }
+
+ /**
+ * Increments number of references to Cassandra driver session (required for multithreaded environment).
+ */
+ private synchronized void incrementSessionRefs() {
+ refCnt++;
+ }
+
+ /**
+ * Decrements number of references to Cassandra driver session (required for multithreaded environment).
+ */
+ private synchronized int decrementSessionRefs() {
+ if (refCnt != 0)
+ refCnt--;
+
+ return refCnt;
+ }
+
+ /**
+ * Prepares CQL statement using current Cassandra driver session.
+ *
+ * @param statement CQL statement.
+ * @param settings Persistence settings.
+ * @param tblExistenceRequired Flag indicating if table existence is required for the statement.
+ * @return Prepared statement.
+ */
+ private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings,
+ boolean tblExistenceRequired) {
+
+ int attempt = 0;
+ Throwable error = null;
+ String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement;
+
+ RandomSleeper sleeper = newSleeper();
+
+ incrementSessionRefs();
+
+ try {
+ synchronized (sesStatements) {
+ if (sesStatements.containsKey(statement))
+ return sesStatements.get(statement);
+ }
+
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ try {
+ PreparedStatement prepStatement = session().prepare(statement);
+
+ synchronized (sesStatements) {
+ sesStatements.put(statement, prepStatement);
+ }
+
+ return prepStatement;
+ }
+ catch (Throwable e) {
+ if (CassandraHelper.isTableAbsenceError(e)) {
+ if (!tblExistenceRequired)
+ return null;
+
+ handleTableAbsenceError(settings);
+ }
+ else if (CassandraHelper.isHostsAvailabilityError(e))
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+ else
+ throw new IgniteException(errorMsg, e);
+
+ error = e;
+ }
+
+ sleeper.sleep();
+
+ attempt++;
+ }
+ }
+ finally {
+ decrementSessionRefs();
+ }
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /**
+ * Creates Cassandra keyspace.
+ *
+ * @param settings Persistence settings.
+ */
+ private void createKeyspace(KeyValuePersistenceSettings settings) {
+ int attempt = 0;
+ Throwable error = null;
+ String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'";
+
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ try {
+ log.info("-----------------------------------------------------------------------");
+ log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'");
+ log.info("-----------------------------------------------------------------------\n\n" +
+ settings.getKeyspaceDDLStatement() + "\n");
+ log.info("-----------------------------------------------------------------------");
+ session().execute(settings.getKeyspaceDDLStatement());
+ log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created");
+ return;
+ }
+ catch (AlreadyExistsException ignored) {
+ log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist");
+ return;
+ }
+ catch (Throwable e) {
+ if (!CassandraHelper.isHostsAvailabilityError(e))
+ throw new IgniteException(errorMsg, e);
+
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+
+ error = e;
+ }
+
+ attempt++;
+ }
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /**
+ * Creates Cassandra table.
+ *
+ * @param settings Persistence settings.
+ */
+ private void createTable(KeyValuePersistenceSettings settings) {
+ int attempt = 0;
+ Throwable error = null;
+ String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'";
+
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ try {
+ log.info("-----------------------------------------------------------------------");
+ log.info("Creating Cassandra table '" + settings.getTableFullName() + "'");
+ log.info("-----------------------------------------------------------------------\n\n" +
+ settings.getTableDDLStatement() + "\n");
+ log.info("-----------------------------------------------------------------------");
+ session().execute(settings.getTableDDLStatement());
+ log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created");
+ return;
+ }
+ catch (AlreadyExistsException ignored) {
+ log.info("Cassandra table '" + settings.getTableFullName() + "' already exist");
+ return;
+ }
+ catch (Throwable e) {
+ if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e))
+ throw new IgniteException(errorMsg, e);
+
+ if (CassandraHelper.isKeyspaceAbsenceError(e)) {
+ log.warning("Failed to create Cassandra table '" + settings.getTableFullName() +
+ "' cause appropriate keyspace doesn't exist", e);
+ createKeyspace(settings);
+ }
+ else if (CassandraHelper.isHostsAvailabilityError(e))
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+
+ error = e;
+ }
+
+ attempt++;
+ }
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /**
+ * Creates Cassandra table indexes.
+ *
+ * @param settings Persistence settings.
+ */
+ private void createTableIndexes(KeyValuePersistenceSettings settings) {
+ if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty())
+ return;
+
+ int attempt = 0;
+ Throwable error = null;
+ String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName();
+
+ while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ try {
+ log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'");
+
+ for (String statement : settings.getIndexDDLStatements()) {
+ try {
+ session().execute(statement);
+ }
+ catch (AlreadyExistsException ignored) {
+ }
+ catch (Throwable e) {
+ if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists"))
+ throw new IgniteException(errorMsg, e);
+ }
+ }
+
+ log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created");
+
+ return;
+ }
+ catch (Throwable e) {
+ if (CassandraHelper.isHostsAvailabilityError(e))
+ handleHostsAvailabilityError(e, attempt, errorMsg);
+ else if (CassandraHelper.isTableAbsenceError(e))
+ createTable(settings);
+ else
+ throw new IgniteException(errorMsg, e);
+
+ error = e;
+ }
+
+ attempt++;
+ }
+
+ throw new IgniteException(errorMsg, error);
+ }
+
+ /**
+ * Tunes CQL statement execution options (consistency level, fetch option and etc.).
+ *
+ * @param statement Statement.
+ * @return Modified statement.
+ */
+ private Statement tuneStatementExecutionOptions(Statement statement) {
+ String qry = "";
+
+ if (statement instanceof BoundStatement)
+ qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase();
+ else if (statement instanceof PreparedStatement)
+ qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase();
+
+ boolean readStatement = qry.startsWith("select");
+ boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement ||
+ qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update");
+
+ if (readStatement && readConsistency != null)
+ statement.setConsistencyLevel(readConsistency);
+
+ if (writeStatement && writeConsistency != null)
+ statement.setConsistencyLevel(writeConsistency);
+
+ if (fetchSize != null)
+ statement.setFetchSize(fetchSize);
+
+ return statement;
+ }
+
+ /**
+ * Handles situation when Cassandra table doesn't exist.
+ *
+ * @param settings Persistence settings.
+ */
+ private void handleTableAbsenceError(KeyValuePersistenceSettings settings) {
+ int hndNum = tblAbsenceHandlersCnt.incrementAndGet();
+
+ try {
+ synchronized (tblAbsenceHandlersCnt) {
+ // Oooops... I am not the first thread who tried to handle table absence problem.
+ if (hndNum != 0) {
+ log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
+ "Another thread already fixed it.");
+ return;
+ }
+
+ log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
+ "Trying to create table.");
+
+ IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName());
+
+ int attempt = 0;
+
+ while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+ error = null;
+
+ try {
+ createKeyspace(settings);
+ createTable(settings);
+ createTableIndexes(settings);
+ }
+ catch (Throwable e) {
+ if (CassandraHelper.isHostsAvailabilityError(e))
+ handleHostsAvailabilityError(e, attempt, null);
+ else
+ throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e);
+
+ error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e);
+ }
+
+ attempt++;
+ }
+
+ if (error != null)
+ throw error;
+ }
+ }
+ finally {
+ if (hndNum == 0)
+ tblAbsenceHandlersCnt.set(-1);
+ }
+ }
+
+ /**
+ * Handles situation when prepared statement execution failed cause session to the cluster was released.
+ *
+ */
+ private void handlePreparedStatementClusterError(Throwable e) {
+ int hndNum = prepStatementHandlersCnt.incrementAndGet();
+
+ try {
+ synchronized (prepStatementHandlersCnt) {
+ // Oooops... I am not the first thread who tried to handle prepared statement problem.
+ if (hndNum != 0) {
+ log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e);
+ return;
+ }
+
+ log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e);
+
+ refresh();
+
+ log.warning("Cassandra session refreshed");
+ }
+ }
+ finally {
+ if (hndNum == 0)
+ prepStatementHandlersCnt.set(-1);
+ }
+ }
+
+ /**
+ * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable.
+ *
+ * @param e Exception to handle.
+ * @param attempt Number of attempts.
+ * @param msg Error message.
+ * @return {@code true} if host unavailability was successfully handled.
+ */
+ private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) {
+ if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) {
+ log.error("Host availability problem detected. " +
+ "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT +
+ ", exception will be thrown to upper execution layer.", e);
+ throw msg == null ? new IgniteException(e) : new IgniteException(msg, e);
+ }
+
+ if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4 ||
+ attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 ||
+ attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4 ||
+ attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) {
+ log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
+ "refreshing Cassandra session", e);
+
+ refresh();
+
+ log.warning("Cassandra session refreshed");
+
+ return true;
+ }
+
+ log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
+ "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e);
+
+ try {
+ Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT);
+ }
+ catch (InterruptedException ignored) {
+ }
+
+ log.warning("Sleep completed");
+
+ return false;
+ }
+
+ /**
+ * @return New random sleeper.
+ */
+ private RandomSleeper newSleeper() {
+ return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT,
+ CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT,
+ CQL_ATTEMPTS_TIMEOUT_INCREMENT, log);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
new file mode 100644
index 0000000..867f58d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information for single operations (load, delete, write) of Ignite cache
+ * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ *
+ * @param <R> type of the result returned from operation.
+ */
+public interface ExecutionAssistant<R> {
+ /**
+ * Indicates if Cassandra table existence is required for operation.
+ *
+ * @return true if table existence required.
+ */
+ public boolean tableExistenceRequired();
+
+ /**
+ * Returns CQL statement to be used for operation.
+ *
+ * @return CQL statement.
+ */
+ public String getStatement();
+
+ /**
+ * Binds prepared statement.
+ *
+ * @param statement prepared statement.
+ *
+ * @return bound statement.
+ */
+ public BoundStatement bindStatement(PreparedStatement statement);
+
+ /**
+ * Persistence settings to use for operation.
+ *
+ * @return persistence settings.
+ */
+ public KeyValuePersistenceSettings getPersistenceSettings();
+
+ /**
+ * Returns operation name.
+ *
+ * @return operation name.
+ */
+ public String operationName();
+
+ /**
+ * Processes Cassandra database table row returned by specified CQL statement.
+ *
+ * @param row Cassandra database table row.
+ *
+ * @return result of the operation.
+ */
+ public R process(Row row);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
new file mode 100644
index 0000000..17494dd
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}.
+ *
+ * @param <R> Type of the result returned from batch operation
+ * @param <V> Type of the value used in batch operation
+ */
+public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> {
+ /** Identifiers of already processed objects. */
+ private Set<Integer> processed = new HashSet<>();
+
+ /** {@inheritDoc} */
+ @Override public void process(Row row, int seqNum) {
+ if (processed.contains(seqNum))
+ return;
+
+ process(row);
+
+ processed.add(seqNum);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean alreadyProcessed(int seqNum) {
+ return processed.contains(seqNum);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int processedCount() {
+ return processed.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public R processedData() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return false;
+ }
+
+ /**
+ * Processes particular row inside batch operation.
+ *
+ * @param row Row to process.
+ */
+ protected void process(Row row) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
new file mode 100644
index 0000000..d3ace7d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+import org.apache.ignite.lang.IgniteBiInClosure;
+
+/**
+ * Worker for load cache using custom user query.
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
+ /** Cassandra session to execute CQL query */
+ private final CassandraSession ses;
+
+ /** User query. */
+ private final String qry;
+
+ /** Persistence controller */
+ private final PersistenceController ctrl;
+
+ /** Logger */
+ private final IgniteLogger log;
+
+ /** Closure for loaded values. */
+ private final IgniteBiInClosure<K, V> clo;
+
+ /**
+ * @param clo Closure for loaded values.
+ */
+ public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
+ IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+ this.ses = ses;
+ this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
+ this.ctrl = ctrl;
+ this.log = log;
+ this.clo = clo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ ses.execute(new BatchLoaderAssistant() {
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "loadCache";
+ }
+
+ /** {@inheritDoc} */
+ @Override public Statement getStatement() {
+ return new SimpleStatement(qry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void process(Row row) {
+ K key;
+ V val;
+
+ try {
+ key = (K)ctrl.buildKeyObject(row);
+ }
+ catch (Throwable e) {
+ log.error("Failed to build Ignite key object from provided Cassandra row", e);
+
+ throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e);
+ }
+
+ try {
+ val = (V)ctrl.buildValueObject(row);
+ }
+ catch (Throwable e) {
+ log.error("Failed to build Ignite value object from provided Cassandra row", e);
+
+ throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e);
+ }
+
+ clo.apply(key, val);
+ }
+ });
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
new file mode 100644
index 0000000..ecbbe78
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains classes responsible for handling sessions and communication with Cassandra
+ */
+package org.apache.ignite.cache.store.cassandra.session;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
new file mode 100644
index 0000000..fc4a907
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+
+/**
+ * Cassandra driver sessions pool.
+ */
+public class SessionPool {
+ /**
+ * Monitors session pool and closes unused session.
+ */
+ private static class SessionMonitor extends Thread {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ while (true) {
+ try {
+ Thread.sleep(SLEEP_TIMEOUT);
+ }
+ catch (InterruptedException ignored) {
+ return;
+ }
+
+ List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>();
+
+ int sessionsCnt;
+
+ synchronized (sessions) {
+ sessionsCnt = sessions.size();
+
+ for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) {
+ if (entry.getValue().expired())
+ expiredSessions.add(entry);
+ }
+
+ for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
+ sessions.remove(entry.getKey());
+ }
+
+ for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
+ entry.getValue().release();
+
+ // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool
+ if (sessionsCnt == expiredSessions.size())
+ return;
+ }
+ }
+ finally {
+ release();
+ }
+ }
+ }
+
+ /** Sessions monitor sleep timeout. */
+ private static final long SLEEP_TIMEOUT = 60000; // 1 minute.
+
+ /** Sessions which were returned to pool. */
+ private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>();
+
+ /** Singleton instance. */
+ private static SessionMonitor monitorSingleton;
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override public void run() {
+ release();
+ }
+ });
+ }
+
+ /**
+ * Returns Cassandra driver session to sessions pool.
+ *
+ * @param cassandraSes Session wrapper.
+ * @param driverSes Driver session.
+ */
+ public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
+ if (cassandraSes == null || driverSes == null)
+ return;
+
+ SessionWrapper old;
+
+ synchronized (sessions) {
+ old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
+
+ if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
+ monitorSingleton = new SessionMonitor();
+ monitorSingleton.setDaemon(true);
+ monitorSingleton.setName("Cassandra-sessions-pool");
+ monitorSingleton.start();
+ }
+ }
+
+ if (old != null)
+ old.release();
+ }
+
+ /**
+ * Extracts Cassandra driver session from pool.
+ *
+ * @param cassandraSes Session wrapper.
+ * @return Cassandra driver session.
+ */
+ public static Session get(CassandraSessionImpl cassandraSes) {
+ if (cassandraSes == null)
+ return null;
+
+ SessionWrapper wrapper;
+
+ synchronized (sessions) {
+ wrapper = sessions.remove(cassandraSes);
+ }
+
+ return wrapper == null ? null : wrapper.driverSession();
+ }
+
+ /**
+ * Releases all session from pool and closes all their connections to Cassandra database.
+ */
+ public static void release() {
+ Collection<SessionWrapper> wrappers;
+
+ synchronized (sessions) {
+ try {
+ if (sessions.size() == 0)
+ return;
+
+ wrappers = new LinkedList<>();
+
+ for (SessionWrapper wrapper : sessions.values())
+ wrappers.add(wrapper);
+
+ sessions.clear();
+ }
+ finally {
+ if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) {
+ try {
+ monitorSingleton.interrupt();
+ }
+ catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ for (SessionWrapper wrapper : wrappers)
+ wrapper.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
new file mode 100644
index 0000000..7c5722b
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+
+/**
+ * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
+ */
+public class SessionWrapper {
+ /** Expiration timeout for Cassandra driver session. */
+ public static final long DFLT_EXPIRATION_TIMEOUT = 300000; // 5 minutes.
+
+ /** Cassandra driver session. */
+ private Session ses;
+
+ /** Wrapper creation time. */
+ private long time;
+
+ /**
+ * Creates instance of Cassandra driver session wrapper.
+ *
+ * @param ses Cassandra driver session.
+ */
+ public SessionWrapper(Session ses) {
+ this.ses = ses;
+ this.time = System.currentTimeMillis();
+ }
+
+ /**
+ * Checks if Cassandra driver session expired.
+ *
+ * @return true if session expired.
+ */
+ public boolean expired() {
+ return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
+ }
+
+ /**
+ * Returns wrapped Cassandra driver session.
+ *
+ * @return Cassandra driver session.
+ */
+ public Session driverSession() {
+ return ses;
+ }
+
+ /**
+ * Closes wrapped Cassandra driver session
+ */
+ public void release() {
+ CassandraHelper.closeSession(ses);
+ ses = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
new file mode 100644
index 0000000..21c292f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains session pool implenetation for Cassandra sessions
+ */
+package org.apache.ignite.cache.store.cassandra.session.pool;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
new file mode 100644
index 0000000..4f40478
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.utils;
+
+import java.io.File;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Generates Cassandra DDL statements from persistence descriptor xml file.
+ */
+public class DDLGenerator {
+ /**
+ * DDLGenerator entry point.
+ *
+ * @param args Arguments for DDLGenerator.
+ */
+ public static void main(String[] args) {
+ if (args == null || args.length == 0)
+ return;
+
+ for (String arg : args) {
+ File file = new File(arg);
+ if (!file.isFile()) {
+ System.out.println("-------------------------------------------------------------");
+ System.out.println("Incorrect file specified: " + arg);
+ System.out.println("-------------------------------------------------------------");
+ continue;
+ }
+
+ try {
+ KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file);
+ System.out.println("-------------------------------------------------------------");
+ System.out.println("DDL for keyspace/table from file: " + arg);
+ System.out.println("-------------------------------------------------------------");
+ System.out.println();
+ System.out.println(settings.getKeyspaceDDLStatement());
+ System.out.println();
+ System.out.println(settings.getTableDDLStatement());
+ System.out.println();
+ }
+ catch (Throwable e) {
+ System.out.println("-------------------------------------------------------------");
+ System.out.println("Incorrect file specified: " + arg);
+ System.out.println("-------------------------------------------------------------");
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
new file mode 100644
index 0000000..2460dfe
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains utility classes
+ */
+package org.apache.ignite.cache.store.cassandra.utils;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/bootstrap/aws/README.txt
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/bootstrap/aws/README.txt b/modules/cassandra/store/src/test/bootstrap/aws/README.txt
new file mode 100644
index 0000000..a61b235
--- /dev/null
+++ b/modules/cassandra/store/src/test/bootstrap/aws/README.txt
@@ -0,0 +1,13 @@
+Shell scripts to spin up Ignite, Cassandra and Load tests clusters in AWS.
+
+1) cassandra - bootstrap scripts for Cassandra cluster nodes
+2) ganglia - bootstrap scripts for Ganglia master and agents
+3) ignite - bootstrap scripts for Ignite cluster nodes
+4) tests - bootstrap scripts for Load Tests cluster nodes
+5) common.sh - definitions for common functions
+6) env.sh - definitions for common variables
+7) log-collector.sh - log collector daemon script, to collect logs and upload them to S3
+
+For more details please look at the documentation:
+
+ https://apacheignite.readme.io/docs/aws-infrastructure-deployment
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh b/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
new file mode 100644
index 0000000..017b1b1
--- /dev/null
+++ b/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
@@ -0,0 +1,336 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# -----------------------------------------------------------------------------------------------
+# Bootstrap script to spin up Cassandra cluster
+# -----------------------------------------------------------------------------------------------
+
+# URL to download AWS CLI tools
+AWS_CLI_DOWNLOAD_URL=https://s3.amazonaws.com/aws-cli/awscli-bundle.zip
+
+# URL to download JDK
+JDK_DOWNLOAD_URL=http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz
+
+# URL to download Ignite-Cassandra tests package - you should previously package and upload it to this place
+TESTS_PACKAGE_DONLOAD_URL=s3://<bucket>/<folder>/ignite-cassandra-tests-<version>.zip
+
+# Terminates script execution and upload logs to S3
+terminate()
+{
+ SUCCESS_URL=$S3_CASSANDRA_BOOTSTRAP_SUCCESS
+ FAILURE_URL=$S3_CASSANDRA_BOOTSTRAP_FAILURE
+
+ if [ -n "$SUCCESS_URL" ] && [[ "$SUCCESS_URL" != */ ]]; then
+ SUCCESS_URL=${SUCCESS_URL}/
+ fi
+
+ if [ -n "$FAILURE_URL" ] && [[ "$FAILURE_URL" != */ ]]; then
+ FAILURE_URL=${FAILURE_URL}/
+ fi
+
+ host_name=$(hostname -f | tr '[:upper:]' '[:lower:]')
+ msg=$host_name
+
+ if [ -n "$1" ]; then
+ echo "[ERROR] $1"
+ echo "[ERROR]-----------------------------------------------------"
+ echo "[ERROR] Cassandra node bootstrap failed"
+ echo "[ERROR]-----------------------------------------------------"
+ msg=$1
+
+ if [ -z "$FAILURE_URL" ]; then
+ exit 1
+ fi
+
+ reportFolder=${FAILURE_URL}${host_name}
+ reportFile=$reportFolder/__error__
+ else
+ echo "[INFO]-----------------------------------------------------"
+ echo "[INFO] Cassandra node bootstrap successfully completed"
+ echo "[INFO]-----------------------------------------------------"
+
+ if [ -z "$SUCCESS_URL" ]; then
+ exit 0
+ fi
+
+ reportFolder=${SUCCESS_URL}${host_name}
+ reportFile=$reportFolder/__success__
+ fi
+
+ echo $msg > /opt/bootstrap-result
+
+ aws s3 rm --recursive $reportFolder
+ if [ $? -ne 0 ]; then
+ echo "[ERROR] Failed to drop report folder: $reportFolder"
+ fi
+
+ aws s3 cp --sse AES256 /opt/bootstrap-result $reportFile
+ if [ $? -ne 0 ]; then
+ echo "[ERROR] Failed to report bootstrap result to: $reportFile"
+ fi
+
+ rm -f /opt/bootstrap-result
+
+ if [ -n "$1" ]; then
+ exit 1
+ fi
+
+ exit 0
+}
+
+# Downloads specified package
+downloadPackage()
+{
+ echo "[INFO] Downloading $3 package from $1 into $2"
+
+ for i in 0 9;
+ do
+ if [[ "$1" == s3* ]]; then
+ aws s3 cp $1 $2
+ code=$?
+ else
+ curl "$1" -o "$2"
+ code=$?
+ fi
+
+ if [ $code -eq 0 ]; then
+ echo "[INFO] $3 package successfully downloaded from $1 into $2"
+ return 0
+ fi
+
+ echo "[WARN] Failed to download $3 package from $i attempt, sleeping extra 5sec"
+ sleep 5s
+ done
+
+ terminate "All 10 attempts to download $3 package from $1 are failed"
+}
+
+# Downloads and setup JDK
+setupJava()
+{
+ rm -Rf /opt/java /opt/jdk.tar.gz
+
+ echo "[INFO] Downloading 'jdk'"
+ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "$JDK_DOWNLOAD_URL" -O /opt/jdk.tar.gz
+ if [ $? -ne 0 ]; then
+ terminate "Failed to download 'jdk'"
+ fi
+
+ echo "[INFO] Untaring 'jdk'"
+ tar -xvzf /opt/jdk.tar.gz -C /opt
+ if [ $? -ne 0 ]; then
+ terminate "Failed to untar 'jdk'"
+ fi
+
+ rm -Rf /opt/jdk.tar.gz
+
+ unzipDir=$(ls /opt | grep "jdk")
+ if [ "$unzipDir" != "java" ]; then
+ mv /opt/$unzipDir /opt/java
+ fi
+}
+
+# Downloads and setup AWS CLI
+setupAWSCLI()
+{
+ echo "[INFO] Installing 'awscli'"
+ pip install --upgrade awscli
+ if [ $? -eq 0 ]; then
+ return 0
+ fi
+
+ echo "[ERROR] Failed to install 'awscli' using pip"
+ echo "[INFO] Trying to install awscli using zip archive"
+ echo "[INFO] Downloading awscli zip"
+
+ downloadPackage "$AWS_CLI_DOWNLOAD_URL" "/opt/awscli-bundle.zip" "awscli"
+
+ echo "[INFO] Unzipping awscli zip"
+ unzip /opt/awscli-bundle.zip -d /opt
+ if [ $? -ne 0 ]; then
+ terminate "Failed to unzip awscli zip"
+ fi
+
+ rm -Rf /opt/awscli-bundle.zip
+
+ echo "[INFO] Installing awscli"
+ /opt/awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
+ if [ $? -ne 0 ]; then
+ terminate "Failed to install awscli"
+ fi
+
+ echo "[INFO] Successfully installed awscli from zip archive"
+}
+
+# Setup all the pre-requisites (packages, settings and etc.)
+setupPreRequisites()
+{
+ echo "[INFO] Installing 'wget' package"
+ yum -y install wget
+ if [ $? -ne 0 ]; then
+ terminate "Failed to install 'wget' package"
+ fi
+
+ echo "[INFO] Installing 'net-tools' package"
+ yum -y install net-tools
+ if [ $? -ne 0 ]; then
+ terminate "Failed to install 'net-tools' package"
+ fi
+
+ echo "[INFO] Installing 'python' package"
+ yum -y install python
+ if [ $? -ne 0 ]; then
+ terminate "Failed to install 'python' package"
+ fi
+
+ echo "[INFO] Installing 'unzip' package"
+ yum -y install unzip
+ if [ $? -ne 0 ]; then
+ terminate "Failed to install 'unzip' package"
+ fi
+
+ downloadPackage "https://bootstrap.pypa.io/get-pip.py" "/opt/get-pip.py" "get-pip.py"
+
+ echo "[INFO] Installing 'pip'"
+ python /opt/get-pip.py
+ if [ $? -ne 0 ]; then
+ terminate "Failed to install 'pip'"
+ fi
+}
+
+# Downloads and setup tests package
+setupTestsPackage()
+{
+ downloadPackage "$TESTS_PACKAGE_DONLOAD_URL" "/opt/ignite-cassandra-tests.zip" "Tests"
+
+ rm -Rf /opt/ignite-cassandra-tests
+
+ unzip /opt/ignite-cassandra-tests.zip -d /opt
+ if [ $? -ne 0 ]; then
+ terminate "Failed to unzip tests package"
+ fi
+
+ rm -f /opt/ignite-cassandra-tests.zip
+
+ unzipDir=$(ls /opt | grep "ignite-cassandra")
+ if [ "$unzipDir" != "ignite-cassandra-tests" ]; then
+ mv /opt/$unzipDir /opt/ignite-cassandra-tests
+ fi
+
+ find /opt/ignite-cassandra-tests -type f -name "*.sh" -exec chmod ug+x {} \;
+
+ . /opt/ignite-cassandra-tests/bootstrap/aws/common.sh "cassandra"
+
+ setupNTP
+
+ echo "[INFO] Starting logs collector daemon"
+
+ HOST_NAME=$(hostname -f | tr '[:upper:]' '[:lower:]')
+ /opt/ignite-cassandra-tests/bootstrap/aws/logs-collector.sh "$S3_LOGS_TRIGGER" "$S3_CASSANDRA_LOGS/$HOST_NAME" "/opt/cassandra/logs" "/opt/cassandra/cassandra-start.log" > /opt/logs-collector.log &
+
+ echo "[INFO] Logs collector daemon started: $!"
+
+ echo "----------------------------------------------------------------------------------------"
+ printInstanceInfo
+ echo "----------------------------------------------------------------------------------------"
+ tagInstance
+ bootstrapGangliaAgent "cassandra" 8641
+}
+
+# Downloads Cassandra package
+downloadCassandra()
+{
+ downloadPackage "$CASSANDRA_DOWNLOAD_URL" "/opt/apache-cassandra.tar.gz" "Cassandra"
+
+ rm -Rf /opt/cassandra
+
+ echo "[INFO] Untaring Cassandra package"
+ tar -xvzf /opt/apache-cassandra.tar.gz -C /opt
+ if [ $? -ne 0 ]; then
+ terminate "Failed to untar Cassandra package"
+ fi
+
+ rm -f /opt/apache-cassandra.tar.gz
+
+ unzipDir=$(ls /opt | grep "cassandra" | grep "apache")
+ if [ "$unzipDir" != "cassandra" ]; then
+ mv /opt/$unzipDir /opt/cassandra
+ fi
+}
+
+# Setups Cassandra
+setupCassandra()
+{
+ echo "[INFO] Creating 'cassandra' group"
+ exists=$(cat /etc/group | grep cassandra)
+ if [ -z "$exists" ]; then
+ groupadd cassandra
+ if [ $? -ne 0 ]; then
+ terminate "Failed to create 'cassandra' group"
+ fi
+ fi
+
+ echo "[INFO] Creating 'cassandra' user"
+ exists=$(cat /etc/passwd | grep cassandra)
+ if [ -z "$exists" ]; then
+ useradd -g cassandra cassandra
+ if [ $? -ne 0 ]; then
+ terminate "Failed to create 'cassandra' user"
+ fi
+ fi
+
+ rm -f /opt/cassandra/conf/cassandra-env.sh /opt/cassandra/conf/cassandra-template.yaml
+
+ cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-env.sh /opt/cassandra/conf
+ cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-template.yaml /opt/cassandra/conf
+
+ chown -R cassandra:cassandra /opt/cassandra /opt/ignite-cassandra-tests
+
+ createCassandraStorageLayout
+
+ cat /opt/cassandra/conf/cassandra-template.yaml | sed -r "s/\\\$\{CASSANDRA_DATA_DIR\}/$CASSANDRA_DATA_DIR/g" > /opt/cassandra/conf/cassandra-template-1.yaml
+ cat /opt/cassandra/conf/cassandra-template-1.yaml | sed -r "s/\\\$\{CASSANDRA_COMMITLOG_DIR\}/$CASSANDRA_COMMITLOG_DIR/g" > /opt/cassandra/conf/cassandra-template-2.yaml
+ cat /opt/cassandra/conf/cassandra-template-2.yaml | sed -r "s/\\\$\{CASSANDRA_CACHES_DIR\}/$CASSANDRA_CACHES_DIR/g" > /opt/cassandra/conf/cassandra-template-3.yaml
+
+ rm -f /opt/cassandra/conf/cassandra-template.yaml /opt/cassandra/conf/cassandra-template-1.yaml /opt/cassandra/conf/cassandra-template-2.yaml
+ mv /opt/cassandra/conf/cassandra-template-3.yaml /opt/cassandra/conf/cassandra-template.yaml
+
+ echo "export JAVA_HOME=/opt/java" >> $1
+ echo "export CASSANDRA_HOME=/opt/cassandra" >> $1
+ echo "export PATH=\$JAVA_HOME/bin:\$CASSANDRA_HOME/bin:\$PATH" >> $1
+}
+
+###################################################################################################################
+
+echo "[INFO]-----------------------------------------------------------------"
+echo "[INFO] Bootstrapping Cassandra node"
+echo "[INFO]-----------------------------------------------------------------"
+
+setupPreRequisites
+setupJava
+setupAWSCLI
+setupTestsPackage
+downloadCassandra
+setupCassandra "/root/.bash_profile"
+
+cmd="/opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-start.sh"
+
+#sudo -u cassandra -g cassandra sh -c "$cmd | tee /opt/cassandra/cassandra-start.log"
+
+$cmd | tee /opt/cassandra/cassandra-start.log
\ No newline at end of file