You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/15 06:40:26 UTC
[28/40] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra
serializers. - Fixes #956.
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
deleted file mode 100644
index 95b8581..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ /dev/null
@@ -1,832 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.querybuilder.Batch;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
-import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
-import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
-
-/**
- * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}.
- */
-public class CassandraSessionImpl implements CassandraSession {
- /** Number of CQL query execution attempts. */
- private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20;
-
- /** Min timeout between CQL query execution attempts. */
- private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100;
-
- /** Max timeout between CQL query execution attempts. */
- private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500;
-
- /** Timeout increment for CQL query execution attempts. */
- private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100;
-
- /** Cassandra cluster builder. */
- private volatile Cluster.Builder builder;
-
- /** Cassandra driver session. */
- private volatile Session ses;
-
- /** Number of references to Cassandra driver session (for multithreaded environment). */
- private volatile int refCnt = 0;
-
- /** Storage for the session prepared statements */
- private static final Map<String, PreparedStatement> sesStatements = new HashMap<>();
-
- /** Number of records to immediately fetch in CQL statement execution. */
- private Integer fetchSize;
-
- /** Consistency level for Cassandra READ operations (select). */
- private ConsistencyLevel readConsistency;
-
- /** Consistency level for Cassandra WRITE operations (insert/update/delete). */
- private ConsistencyLevel writeConsistency;
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Table absence error handlers counter. */
- private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1);
-
- /** Prepared statement cluster disconnection error handlers counter. */
- private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1);
-
- /**
- * Creates instance of Cassandra driver session wrapper.
- *
- * @param builder Builder for Cassandra cluster.
- * @param fetchSize Number of rows to immediately fetch in CQL statement execution.
- * @param readConsistency Consistency level for Cassandra READ operations (select).
- * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete).
- * @param log Logger.
- */
- public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
- ConsistencyLevel writeConsistency, IgniteLogger log) {
- this.builder = builder;
- this.fetchSize = fetchSize;
- this.readConsistency = readConsistency;
- this.writeConsistency = writeConsistency;
- this.log = log;
- }
-
- /** {@inheritDoc} */
- @Override public <V> V execute(ExecutionAssistant<V> assistant) {
- int attempt = 0;
- Throwable error = null;
- String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement();
-
- RandomSleeper sleeper = newSleeper();
-
- incrementSessionRefs();
-
- try {
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- error = null;
-
- if (attempt != 0) {
- log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " +
- assistant.getStatement());
- }
-
- try {
- PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
- assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
-
- if (preparedSt == null)
- return null;
-
- Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt));
- ResultSet res = session().execute(statement);
-
- Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next();
-
- return row == null ? null : assistant.process(row);
- }
- catch (Throwable e) {
- error = e;
-
- if (CassandraHelper.isTableAbsenceError(e)) {
- if (!assistant.tableExistenceRequired()) {
- log.warning(errorMsg, e);
- return null;
- }
-
- handleTableAbsenceError(assistant.getPersistenceSettings());
- }
- else if (CassandraHelper.isHostsAvailabilityError(e))
- handleHostsAvailabilityError(e, attempt, errorMsg);
- else if (CassandraHelper.isPreparedStatementClusterError(e))
- handlePreparedStatementClusterError(e);
- else
- // For an error which we don't know how to handle, we will not try next attempts and terminate.
- throw new IgniteException(errorMsg, e);
- }
-
- sleeper.sleep();
-
- attempt++;
- }
- }
- catch (Throwable e) {
- error = e;
- }
- finally {
- decrementSessionRefs();
- }
-
- log.error(errorMsg, error);
-
- throw new IgniteException(errorMsg, error);
- }
-
- /** {@inheritDoc} */
- @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) {
- if (data == null || !data.iterator().hasNext())
- return assistant.processedData();
-
- int attempt = 0;
- String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
- Throwable error = new IgniteException(errorMsg);
-
- RandomSleeper sleeper = newSleeper();
-
- int dataSize = 0;
-
- incrementSessionRefs();
-
- try {
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- if (attempt != 0) {
- log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " +
- assistant.operationName() + " operation to process rest " +
- (dataSize - assistant.processedCount()) + " of " + dataSize + " elements");
- }
-
- //clean errors info before next communication with Cassandra
- Throwable unknownEx = null;
- Throwable tblAbsenceEx = null;
- Throwable hostsAvailEx = null;
- Throwable prepStatEx = null;
-
- List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>();
-
- PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
- assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
-
- if (preparedSt == null)
- return null;
-
- int seqNum = 0;
-
- for (V obj : data) {
- if (!assistant.alreadyProcessed(seqNum)) {
- try {
- Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj));
- ResultSetFuture fut = session().executeAsync(statement);
- futResults.add(new CacheEntryImpl<>(seqNum, fut));
- }
- catch (Throwable e) {
- if (CassandraHelper.isTableAbsenceError(e)) {
- // If there are table absence error and it is not required for the operation we can return.
- if (!assistant.tableExistenceRequired())
- return assistant.processedData();
-
- tblAbsenceEx = e;
- handleTableAbsenceError(assistant.getPersistenceSettings());
- }
- else if (CassandraHelper.isHostsAvailabilityError(e)) {
- hostsAvailEx = e;
-
- // Handle host availability only once.
- if (hostsAvailEx == null)
- handleHostsAvailabilityError(e, attempt, errorMsg);
- }
- else if (CassandraHelper.isPreparedStatementClusterError(e)) {
- prepStatEx = e;
- handlePreparedStatementClusterError(e);
- }
- else
- unknownEx = e;
- }
- }
-
- seqNum++;
- }
-
- dataSize = seqNum;
-
- // For an error which we don't know how to handle, we will not try next attempts and terminate.
- if (unknownEx != null)
- throw new IgniteException(errorMsg, unknownEx);
-
- // Remembering any of last errors.
- if (tblAbsenceEx != null)
- error = tblAbsenceEx;
- else if (hostsAvailEx != null)
- error = hostsAvailEx;
- else if (prepStatEx != null)
- error = prepStatEx;
-
- // Clean errors info before next communication with Cassandra.
- unknownEx = null;
- tblAbsenceEx = null;
- hostsAvailEx = null;
- prepStatEx = null;
-
- for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) {
- try {
- ResultSet resSet = futureResult.getValue().getUninterruptibly();
- Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null;
-
- if (row != null)
- assistant.process(row, futureResult.getKey());
- }
- catch (Throwable e) {
- if (CassandraHelper.isTableAbsenceError(e))
- tblAbsenceEx = e;
- else if (CassandraHelper.isHostsAvailabilityError(e))
- hostsAvailEx = e;
- else if (CassandraHelper.isPreparedStatementClusterError(e))
- prepStatEx = e;
- else
- unknownEx = e;
- }
- }
-
- // For an error which we don't know how to handle, we will not try next attempts and terminate.
- if (unknownEx != null)
- throw new IgniteException(errorMsg, unknownEx);
-
- // If there are no errors occurred it means that operation successfully completed and we can return.
- if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null)
- return assistant.processedData();
-
- if (tblAbsenceEx != null) {
- // If there are table absence error and it is not required for the operation we can return.
- if (!assistant.tableExistenceRequired())
- return assistant.processedData();
-
- error = tblAbsenceEx;
- handleTableAbsenceError(assistant.getPersistenceSettings());
- }
-
- if (hostsAvailEx != null) {
- error = hostsAvailEx;
- handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg);
- }
-
- if (prepStatEx != null) {
- error = prepStatEx;
- handlePreparedStatementClusterError(prepStatEx);
- }
-
- sleeper.sleep();
-
- attempt++;
- }
- }
- catch (Throwable e) {
- error = e;
- }
- finally {
- decrementSessionRefs();
- }
-
- errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) +
- " of " + dataSize + " elements, during " + assistant.operationName() +
- " operation with Cassandra";
-
- log.error(errorMsg, error);
-
- throw new IgniteException(errorMsg, error);
- }
-
- /** {@inheritDoc} */
- @Override public void execute(BatchLoaderAssistant assistant) {
- int attempt = 0;
- String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
- Throwable error = new IgniteException(errorMsg);
-
- RandomSleeper sleeper = newSleeper();
-
- incrementSessionRefs();
-
- try {
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- if (attempt != 0)
- log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache");
-
- Statement statement = tuneStatementExecutionOptions(assistant.getStatement());
-
- try {
- ResultSetFuture fut = session().executeAsync(statement);
- ResultSet resSet = fut.getUninterruptibly();
-
- if (resSet == null || !resSet.iterator().hasNext())
- return;
-
- for (Row row : resSet)
- assistant.process(row);
-
- return;
- }
- catch (Throwable e) {
- error = e;
-
- if (CassandraHelper.isTableAbsenceError(e))
- return;
- else if (CassandraHelper.isHostsAvailabilityError(e))
- handleHostsAvailabilityError(e, attempt, errorMsg);
- else if (CassandraHelper.isPreparedStatementClusterError(e))
- handlePreparedStatementClusterError(e);
- else
- // For an error which we don't know how to handle, we will not try next attempts and terminate.
- throw new IgniteException(errorMsg, e);
- }
-
- sleeper.sleep();
-
- attempt++;
- }
- }
- catch (Throwable e) {
- error = e;
- }
- finally {
- decrementSessionRefs();
- }
-
- log.error(errorMsg, error);
-
- throw new IgniteException(errorMsg, error);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- if (decrementSessionRefs() == 0 && ses != null) {
- SessionPool.put(this, ses);
- ses = null;
- }
- }
-
- /**
- * Recreates Cassandra driver session.
- */
- private synchronized void refresh() {
- //make sure that session removed from the pool
- SessionPool.get(this);
-
- //closing and reopening session
- CassandraHelper.closeSession(ses);
- ses = null;
- session();
-
- synchronized (sesStatements) {
- sesStatements.clear();
- }
- }
-
- /**
- * @return Cassandra driver session.
- */
- private synchronized Session session() {
- if (ses != null)
- return ses;
-
- ses = SessionPool.get(this);
-
- if (ses != null)
- return ses;
-
- synchronized (sesStatements) {
- sesStatements.clear();
- }
-
- try {
- return ses = builder.build().connect();
- }
- catch (Throwable e) {
- throw new IgniteException("Failed to establish session with Cassandra database", e);
- }
- }
-
- /**
- * Increments number of references to Cassandra driver session (required for multithreaded environment).
- */
- private synchronized void incrementSessionRefs() {
- refCnt++;
- }
-
- /**
- * Decrements number of references to Cassandra driver session (required for multithreaded environment).
- */
- private synchronized int decrementSessionRefs() {
- if (refCnt != 0)
- refCnt--;
-
- return refCnt;
- }
-
- /**
- * Prepares CQL statement using current Cassandra driver session.
- *
- * @param statement CQL statement.
- * @param settings Persistence settings.
- * @param tblExistenceRequired Flag indicating if table existence is required for the statement.
- * @return Prepared statement.
- */
- private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings,
- boolean tblExistenceRequired) {
-
- int attempt = 0;
- Throwable error = null;
- String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement;
-
- RandomSleeper sleeper = newSleeper();
-
- incrementSessionRefs();
-
- try {
- synchronized (sesStatements) {
- if (sesStatements.containsKey(statement))
- return sesStatements.get(statement);
- }
-
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- try {
- PreparedStatement prepStatement = session().prepare(statement);
-
- synchronized (sesStatements) {
- sesStatements.put(statement, prepStatement);
- }
-
- return prepStatement;
- }
- catch (Throwable e) {
- if (CassandraHelper.isTableAbsenceError(e)) {
- if (!tblExistenceRequired)
- return null;
-
- handleTableAbsenceError(settings);
- }
- else if (CassandraHelper.isHostsAvailabilityError(e))
- handleHostsAvailabilityError(e, attempt, errorMsg);
- else
- throw new IgniteException(errorMsg, e);
-
- error = e;
- }
-
- sleeper.sleep();
-
- attempt++;
- }
- }
- finally {
- decrementSessionRefs();
- }
-
- throw new IgniteException(errorMsg, error);
- }
-
- /**
- * Creates Cassandra keyspace.
- *
- * @param settings Persistence settings.
- */
- private void createKeyspace(KeyValuePersistenceSettings settings) {
- int attempt = 0;
- Throwable error = null;
- String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'";
-
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- try {
- log.info("-----------------------------------------------------------------------");
- log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'");
- log.info("-----------------------------------------------------------------------\n\n" +
- settings.getKeyspaceDDLStatement() + "\n");
- log.info("-----------------------------------------------------------------------");
- session().execute(settings.getKeyspaceDDLStatement());
- log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created");
- return;
- }
- catch (AlreadyExistsException ignored) {
- log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist");
- return;
- }
- catch (Throwable e) {
- if (!CassandraHelper.isHostsAvailabilityError(e))
- throw new IgniteException(errorMsg, e);
-
- handleHostsAvailabilityError(e, attempt, errorMsg);
-
- error = e;
- }
-
- attempt++;
- }
-
- throw new IgniteException(errorMsg, error);
- }
-
- /**
- * Creates Cassandra table.
- *
- * @param settings Persistence settings.
- */
- private void createTable(KeyValuePersistenceSettings settings) {
- int attempt = 0;
- Throwable error = null;
- String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'";
-
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- try {
- log.info("-----------------------------------------------------------------------");
- log.info("Creating Cassandra table '" + settings.getTableFullName() + "'");
- log.info("-----------------------------------------------------------------------\n\n" +
- settings.getTableDDLStatement() + "\n");
- log.info("-----------------------------------------------------------------------");
- session().execute(settings.getTableDDLStatement());
- log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created");
- return;
- }
- catch (AlreadyExistsException ignored) {
- log.info("Cassandra table '" + settings.getTableFullName() + "' already exist");
- return;
- }
- catch (Throwable e) {
- if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e))
- throw new IgniteException(errorMsg, e);
-
- if (CassandraHelper.isKeyspaceAbsenceError(e)) {
- log.warning("Failed to create Cassandra table '" + settings.getTableFullName() +
- "' cause appropriate keyspace doesn't exist", e);
- createKeyspace(settings);
- }
- else if (CassandraHelper.isHostsAvailabilityError(e))
- handleHostsAvailabilityError(e, attempt, errorMsg);
-
- error = e;
- }
-
- attempt++;
- }
-
- throw new IgniteException(errorMsg, error);
- }
-
- /**
- * Creates Cassandra table indexes.
- *
- * @param settings Persistence settings.
- */
- private void createTableIndexes(KeyValuePersistenceSettings settings) {
- if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty())
- return;
-
- int attempt = 0;
- Throwable error = null;
- String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName();
-
- while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- try {
- log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'");
-
- for (String statement : settings.getIndexDDLStatements()) {
- try {
- session().execute(statement);
- }
- catch (AlreadyExistsException ignored) {
- }
- catch (Throwable e) {
- if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists"))
- throw new IgniteException(errorMsg, e);
- }
- }
-
- log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created");
-
- return;
- }
- catch (Throwable e) {
- if (CassandraHelper.isHostsAvailabilityError(e))
- handleHostsAvailabilityError(e, attempt, errorMsg);
- else if (CassandraHelper.isTableAbsenceError(e))
- createTable(settings);
- else
- throw new IgniteException(errorMsg, e);
-
- error = e;
- }
-
- attempt++;
- }
-
- throw new IgniteException(errorMsg, error);
- }
-
- /**
- * Tunes CQL statement execution options (consistency level, fetch option and etc.).
- *
- * @param statement Statement.
- * @return Modified statement.
- */
- private Statement tuneStatementExecutionOptions(Statement statement) {
- String qry = "";
-
- if (statement instanceof BoundStatement)
- qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase();
- else if (statement instanceof PreparedStatement)
- qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase();
-
- boolean readStatement = qry.startsWith("select");
- boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement ||
- qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update");
-
- if (readStatement && readConsistency != null)
- statement.setConsistencyLevel(readConsistency);
-
- if (writeStatement && writeConsistency != null)
- statement.setConsistencyLevel(writeConsistency);
-
- if (fetchSize != null)
- statement.setFetchSize(fetchSize);
-
- return statement;
- }
-
- /**
- * Handles situation when Cassandra table doesn't exist.
- *
- * @param settings Persistence settings.
- */
- private void handleTableAbsenceError(KeyValuePersistenceSettings settings) {
- int hndNum = tblAbsenceHandlersCnt.incrementAndGet();
-
- try {
- synchronized (tblAbsenceHandlersCnt) {
- // Oooops... I am not the first thread who tried to handle table absence problem.
- if (hndNum != 0) {
- log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
- "Another thread already fixed it.");
- return;
- }
-
- log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
- "Trying to create table.");
-
- IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName());
-
- int attempt = 0;
-
- while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
- error = null;
-
- try {
- createKeyspace(settings);
- createTable(settings);
- createTableIndexes(settings);
- }
- catch (Throwable e) {
- if (CassandraHelper.isHostsAvailabilityError(e))
- handleHostsAvailabilityError(e, attempt, null);
- else
- throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e);
-
- error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e);
- }
-
- attempt++;
- }
-
- if (error != null)
- throw error;
- }
- }
- finally {
- if (hndNum == 0)
- tblAbsenceHandlersCnt.set(-1);
- }
- }
-
- /**
- * Handles situation when prepared statement execution failed cause session to the cluster was released.
- *
- */
- private void handlePreparedStatementClusterError(Throwable e) {
- int hndNum = prepStatementHandlersCnt.incrementAndGet();
-
- try {
- synchronized (prepStatementHandlersCnt) {
- // Oooops... I am not the first thread who tried to handle prepared statement problem.
- if (hndNum != 0) {
- log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e);
- return;
- }
-
- log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e);
-
- refresh();
-
- log.warning("Cassandra session refreshed");
- }
- }
- finally {
- if (hndNum == 0)
- prepStatementHandlersCnt.set(-1);
- }
- }
-
- /**
- * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable.
- *
- * @param e Exception to handle.
- * @param attempt Number of attempts.
- * @param msg Error message.
- * @return {@code true} if host unavailability was successfully handled.
- */
- private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) {
- if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) {
- log.error("Host availability problem detected. " +
- "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT +
- ", exception will be thrown to upper execution layer.", e);
- throw msg == null ? new IgniteException(e) : new IgniteException(msg, e);
- }
-
- if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4 ||
- attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 ||
- attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4 ||
- attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) {
- log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
- "refreshing Cassandra session", e);
-
- refresh();
-
- log.warning("Cassandra session refreshed");
-
- return true;
- }
-
- log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
- "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e);
-
- try {
- Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT);
- }
- catch (InterruptedException ignored) {
- }
-
- log.warning("Sleep completed");
-
- return false;
- }
-
- /**
- * @return New random sleeper.
- */
- private RandomSleeper newSleeper() {
- return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT,
- CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT,
- CQL_ATTEMPTS_TIMEOUT_INCREMENT, log);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
deleted file mode 100644
index 867f58d..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Row;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-
-/**
- * Provides information for single operations (load, delete, write) of Ignite cache
- * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
- *
- * @param <R> type of the result returned from operation.
- */
-public interface ExecutionAssistant<R> {
- /**
- * Indicates if Cassandra table existence is required for operation.
- *
- * @return true if table existence required.
- */
- public boolean tableExistenceRequired();
-
- /**
- * Returns CQL statement to be used for operation.
- *
- * @return CQL statement.
- */
- public String getStatement();
-
- /**
- * Binds prepared statement.
- *
- * @param statement prepared statement.
- *
- * @return bound statement.
- */
- public BoundStatement bindStatement(PreparedStatement statement);
-
- /**
- * Persistence settings to use for operation.
- *
- * @return persistence settings.
- */
- public KeyValuePersistenceSettings getPersistenceSettings();
-
- /**
- * Returns operation name.
- *
- * @return operation name.
- */
- public String operationName();
-
- /**
- * Processes Cassandra database table row returned by specified CQL statement.
- *
- * @param row Cassandra database table row.
- *
- * @return result of the operation.
- */
- public R process(Row row);
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
deleted file mode 100644
index 17494dd..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.Row;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}.
- *
- * @param <R> Type of the result returned from batch operation
- * @param <V> Type of the value used in batch operation
- */
-public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> {
- /** Identifiers of already processed objects. */
- private Set<Integer> processed = new HashSet<>();
-
- /** {@inheritDoc} */
- @Override public void process(Row row, int seqNum) {
- if (processed.contains(seqNum))
- return;
-
- process(row);
-
- processed.add(seqNum);
- }
-
- /** {@inheritDoc} */
- @Override public boolean alreadyProcessed(int seqNum) {
- return processed.contains(seqNum);
- }
-
- /** {@inheritDoc} */
- @Override public int processedCount() {
- return processed.size();
- }
-
- /** {@inheritDoc} */
- @Override public R processedData() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean tableExistenceRequired() {
- return false;
- }
-
- /**
- * Processes particular row inside batch operation.
- *
- * @param row Row to process.
- */
- protected void process(Row row) {
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
deleted file mode 100644
index d3ace7d..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
-import org.apache.ignite.lang.IgniteBiInClosure;
-
-/**
- * Worker for load cache using custom user query.
- *
- * @param <K> Key type.
- * @param <V> Value type.
- */
-public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
- /** Cassandra session to execute CQL query */
- private final CassandraSession ses;
-
- /** User query. */
- private final String qry;
-
- /** Persistence controller */
- private final PersistenceController ctrl;
-
- /** Logger */
- private final IgniteLogger log;
-
- /** Closure for loaded values. */
- private final IgniteBiInClosure<K, V> clo;
-
- /**
- * @param clo Closure for loaded values.
- */
- public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
- IgniteLogger log, IgniteBiInClosure<K, V> clo) {
- this.ses = ses;
- this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
- this.ctrl = ctrl;
- this.log = log;
- this.clo = clo;
- }
-
- /** {@inheritDoc} */
- @Override public Void call() throws Exception {
- ses.execute(new BatchLoaderAssistant() {
- /** {@inheritDoc} */
- @Override public String operationName() {
- return "loadCache";
- }
-
- /** {@inheritDoc} */
- @Override public Statement getStatement() {
- return new SimpleStatement(qry);
- }
-
- /** {@inheritDoc} */
- @Override public void process(Row row) {
- K key;
- V val;
-
- try {
- key = (K)ctrl.buildKeyObject(row);
- }
- catch (Throwable e) {
- log.error("Failed to build Ignite key object from provided Cassandra row", e);
-
- throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e);
- }
-
- try {
- val = (V)ctrl.buildValueObject(row);
- }
- catch (Throwable e) {
- log.error("Failed to build Ignite value object from provided Cassandra row", e);
-
- throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e);
- }
-
- clo.apply(key, val);
- }
- });
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
deleted file mode 100644
index ecbbe78..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains classes responsible for handling sessions and communication with Cassandra
- */
-package org.apache.ignite.cache.store.cassandra.session;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
deleted file mode 100644
index fc4a907..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session.pool;
-
-import com.datastax.driver.core.Session;
-import java.lang.Thread.State;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
-
-/**
- * Cassandra driver sessions pool.
- */
-public class SessionPool {
- /**
- * Monitors session pool and closes unused session.
- */
- private static class SessionMonitor extends Thread {
- /** {@inheritDoc} */
- @Override public void run() {
- try {
- while (true) {
- try {
- Thread.sleep(SLEEP_TIMEOUT);
- }
- catch (InterruptedException ignored) {
- return;
- }
-
- List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>();
-
- int sessionsCnt;
-
- synchronized (sessions) {
- sessionsCnt = sessions.size();
-
- for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) {
- if (entry.getValue().expired())
- expiredSessions.add(entry);
- }
-
- for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
- sessions.remove(entry.getKey());
- }
-
- for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
- entry.getValue().release();
-
- // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool
- if (sessionsCnt == expiredSessions.size())
- return;
- }
- }
- finally {
- release();
- }
- }
- }
-
- /** Sessions monitor sleep timeout. */
- private static final long SLEEP_TIMEOUT = 60000; // 1 minute.
-
- /** Sessions which were returned to pool. */
- private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>();
-
- /** Singleton instance. */
- private static SessionMonitor monitorSingleton;
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override public void run() {
- release();
- }
- });
- }
-
- /**
- * Returns Cassandra driver session to sessions pool.
- *
- * @param cassandraSes Session wrapper.
- * @param driverSes Driver session.
- */
- public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
- if (cassandraSes == null || driverSes == null)
- return;
-
- SessionWrapper old;
-
- synchronized (sessions) {
- old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
-
- if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
- monitorSingleton = new SessionMonitor();
- monitorSingleton.setDaemon(true);
- monitorSingleton.setName("Cassandra-sessions-pool");
- monitorSingleton.start();
- }
- }
-
- if (old != null)
- old.release();
- }
-
- /**
- * Extracts Cassandra driver session from pool.
- *
- * @param cassandraSes Session wrapper.
- * @return Cassandra driver session.
- */
- public static Session get(CassandraSessionImpl cassandraSes) {
- if (cassandraSes == null)
- return null;
-
- SessionWrapper wrapper;
-
- synchronized (sessions) {
- wrapper = sessions.remove(cassandraSes);
- }
-
- return wrapper == null ? null : wrapper.driverSession();
- }
-
- /**
- * Releases all session from pool and closes all their connections to Cassandra database.
- */
- public static void release() {
- Collection<SessionWrapper> wrappers;
-
- synchronized (sessions) {
- try {
- if (sessions.size() == 0)
- return;
-
- wrappers = new LinkedList<>();
-
- for (SessionWrapper wrapper : sessions.values())
- wrappers.add(wrapper);
-
- sessions.clear();
- }
- finally {
- if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) {
- try {
- monitorSingleton.interrupt();
- }
- catch (Throwable ignored) {
- }
- }
- }
- }
-
- for (SessionWrapper wrapper : wrappers)
- wrapper.release();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
deleted file mode 100644
index 7c5722b..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session.pool;
-
-import com.datastax.driver.core.Session;
-import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
-
-/**
- * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
- */
-public class SessionWrapper {
- /** Expiration timeout for Cassandra driver session. */
- public static final long DFLT_EXPIRATION_TIMEOUT = 300000; // 5 minutes.
-
- /** Cassandra driver session. */
- private Session ses;
-
- /** Wrapper creation time. */
- private long time;
-
- /**
- * Creates instance of Cassandra driver session wrapper.
- *
- * @param ses Cassandra driver session.
- */
- public SessionWrapper(Session ses) {
- this.ses = ses;
- this.time = System.currentTimeMillis();
- }
-
- /**
- * Checks if Cassandra driver session expired.
- *
- * @return true if session expired.
- */
- public boolean expired() {
- return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
- }
-
- /**
- * Returns wrapped Cassandra driver session.
- *
- * @return Cassandra driver session.
- */
- public Session driverSession() {
- return ses;
- }
-
- /**
- * Closes wrapped Cassandra driver session
- */
- public void release() {
- CassandraHelper.closeSession(ses);
- ses = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
deleted file mode 100644
index 21c292f..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains session pool implenetation for Cassandra sessions
- */
-package org.apache.ignite.cache.store.cassandra.session.pool;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
deleted file mode 100644
index 4f40478..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.utils;
-
-import java.io.File;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-
-/**
- * Generates Cassandra DDL statements from persistence descriptor xml file.
- */
-public class DDLGenerator {
- /**
- * DDLGenerator entry point.
- *
- * @param args Arguments for DDLGenerator.
- */
- public static void main(String[] args) {
- if (args == null || args.length == 0)
- return;
-
- for (String arg : args) {
- File file = new File(arg);
- if (!file.isFile()) {
- System.out.println("-------------------------------------------------------------");
- System.out.println("Incorrect file specified: " + arg);
- System.out.println("-------------------------------------------------------------");
- continue;
- }
-
- try {
- KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file);
- System.out.println("-------------------------------------------------------------");
- System.out.println("DDL for keyspace/table from file: " + arg);
- System.out.println("-------------------------------------------------------------");
- System.out.println();
- System.out.println(settings.getKeyspaceDDLStatement());
- System.out.println();
- System.out.println(settings.getTableDDLStatement());
- System.out.println();
- }
- catch (Throwable e) {
- System.out.println("-------------------------------------------------------------");
- System.out.println("Incorrect file specified: " + arg);
- System.out.println("-------------------------------------------------------------");
- e.printStackTrace();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
deleted file mode 100644
index 2460dfe..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains utility classes
- */
-package org.apache.ignite.cache.store.cassandra.utils;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/README.txt
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/bootstrap/aws/README.txt b/modules/cassandra/src/test/bootstrap/aws/README.txt
deleted file mode 100644
index 4457d81..0000000
--- a/modules/cassandra/src/test/bootstrap/aws/README.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-Shell scripts to spin up Ignite, Cassandra and Load tests clusters in AWS.
-
-1) cassandra - bootstrap scripts for Cassandra cluster nodes
-2) ganglia - bootstrap scripts for Ganglia master and agents
-3) ignite - bootstrap scripts for Ignite cluster nodes
-4) tests - bootstrap scripts for Load Tests cluster nodes
-5) common.sh - definitions for common functions
-6) env.sh - definitions for common variables
-7) log-collector.sh - log collector daemon script, to collect logs and upload them to S3
-
-For more details please look at the documentation:
-
- https://apacheignite.readme.io/docs/aws-infrastructure-deployment
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh b/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
deleted file mode 100644
index 017b1b1..0000000
--- a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
+++ /dev/null
@@ -1,336 +0,0 @@
-#!/bin/sh
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# -----------------------------------------------------------------------------------------------
-# Bootstrap script to spin up Cassandra cluster
-# -----------------------------------------------------------------------------------------------
-
-# URL to download AWS CLI tools
-AWS_CLI_DOWNLOAD_URL=https://s3.amazonaws.com/aws-cli/awscli-bundle.zip
-
-# URL to download JDK
-JDK_DOWNLOAD_URL=http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz
-
-# URL to download Ignite-Cassandra tests package - you should previously package and upload it to this place
-TESTS_PACKAGE_DONLOAD_URL=s3://<bucket>/<folder>/ignite-cassandra-tests-<version>.zip
-
-# Terminates script execution and upload logs to S3
-terminate()
-{
- SUCCESS_URL=$S3_CASSANDRA_BOOTSTRAP_SUCCESS
- FAILURE_URL=$S3_CASSANDRA_BOOTSTRAP_FAILURE
-
- if [ -n "$SUCCESS_URL" ] && [[ "$SUCCESS_URL" != */ ]]; then
- SUCCESS_URL=${SUCCESS_URL}/
- fi
-
- if [ -n "$FAILURE_URL" ] && [[ "$FAILURE_URL" != */ ]]; then
- FAILURE_URL=${FAILURE_URL}/
- fi
-
- host_name=$(hostname -f | tr '[:upper:]' '[:lower:]')
- msg=$host_name
-
- if [ -n "$1" ]; then
- echo "[ERROR] $1"
- echo "[ERROR]-----------------------------------------------------"
- echo "[ERROR] Cassandra node bootstrap failed"
- echo "[ERROR]-----------------------------------------------------"
- msg=$1
-
- if [ -z "$FAILURE_URL" ]; then
- exit 1
- fi
-
- reportFolder=${FAILURE_URL}${host_name}
- reportFile=$reportFolder/__error__
- else
- echo "[INFO]-----------------------------------------------------"
- echo "[INFO] Cassandra node bootstrap successfully completed"
- echo "[INFO]-----------------------------------------------------"
-
- if [ -z "$SUCCESS_URL" ]; then
- exit 0
- fi
-
- reportFolder=${SUCCESS_URL}${host_name}
- reportFile=$reportFolder/__success__
- fi
-
- echo $msg > /opt/bootstrap-result
-
- aws s3 rm --recursive $reportFolder
- if [ $? -ne 0 ]; then
- echo "[ERROR] Failed to drop report folder: $reportFolder"
- fi
-
- aws s3 cp --sse AES256 /opt/bootstrap-result $reportFile
- if [ $? -ne 0 ]; then
- echo "[ERROR] Failed to report bootstrap result to: $reportFile"
- fi
-
- rm -f /opt/bootstrap-result
-
- if [ -n "$1" ]; then
- exit 1
- fi
-
- exit 0
-}
-
-# Downloads specified package
-downloadPackage()
-{
- echo "[INFO] Downloading $3 package from $1 into $2"
-
- for i in 0 9;
- do
- if [[ "$1" == s3* ]]; then
- aws s3 cp $1 $2
- code=$?
- else
- curl "$1" -o "$2"
- code=$?
- fi
-
- if [ $code -eq 0 ]; then
- echo "[INFO] $3 package successfully downloaded from $1 into $2"
- return 0
- fi
-
- echo "[WARN] Failed to download $3 package from $i attempt, sleeping extra 5sec"
- sleep 5s
- done
-
- terminate "All 10 attempts to download $3 package from $1 are failed"
-}
-
-# Downloads and setup JDK
-setupJava()
-{
- rm -Rf /opt/java /opt/jdk.tar.gz
-
- echo "[INFO] Downloading 'jdk'"
- wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "$JDK_DOWNLOAD_URL" -O /opt/jdk.tar.gz
- if [ $? -ne 0 ]; then
- terminate "Failed to download 'jdk'"
- fi
-
- echo "[INFO] Untaring 'jdk'"
- tar -xvzf /opt/jdk.tar.gz -C /opt
- if [ $? -ne 0 ]; then
- terminate "Failed to untar 'jdk'"
- fi
-
- rm -Rf /opt/jdk.tar.gz
-
- unzipDir=$(ls /opt | grep "jdk")
- if [ "$unzipDir" != "java" ]; then
- mv /opt/$unzipDir /opt/java
- fi
-}
-
-# Downloads and setup AWS CLI
-setupAWSCLI()
-{
- echo "[INFO] Installing 'awscli'"
- pip install --upgrade awscli
- if [ $? -eq 0 ]; then
- return 0
- fi
-
- echo "[ERROR] Failed to install 'awscli' using pip"
- echo "[INFO] Trying to install awscli using zip archive"
- echo "[INFO] Downloading awscli zip"
-
- downloadPackage "$AWS_CLI_DOWNLOAD_URL" "/opt/awscli-bundle.zip" "awscli"
-
- echo "[INFO] Unzipping awscli zip"
- unzip /opt/awscli-bundle.zip -d /opt
- if [ $? -ne 0 ]; then
- terminate "Failed to unzip awscli zip"
- fi
-
- rm -Rf /opt/awscli-bundle.zip
-
- echo "[INFO] Installing awscli"
- /opt/awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
- if [ $? -ne 0 ]; then
- terminate "Failed to install awscli"
- fi
-
- echo "[INFO] Successfully installed awscli from zip archive"
-}
-
-# Setup all the pre-requisites (packages, settings and etc.)
-setupPreRequisites()
-{
- echo "[INFO] Installing 'wget' package"
- yum -y install wget
- if [ $? -ne 0 ]; then
- terminate "Failed to install 'wget' package"
- fi
-
- echo "[INFO] Installing 'net-tools' package"
- yum -y install net-tools
- if [ $? -ne 0 ]; then
- terminate "Failed to install 'net-tools' package"
- fi
-
- echo "[INFO] Installing 'python' package"
- yum -y install python
- if [ $? -ne 0 ]; then
- terminate "Failed to install 'python' package"
- fi
-
- echo "[INFO] Installing 'unzip' package"
- yum -y install unzip
- if [ $? -ne 0 ]; then
- terminate "Failed to install 'unzip' package"
- fi
-
- downloadPackage "https://bootstrap.pypa.io/get-pip.py" "/opt/get-pip.py" "get-pip.py"
-
- echo "[INFO] Installing 'pip'"
- python /opt/get-pip.py
- if [ $? -ne 0 ]; then
- terminate "Failed to install 'pip'"
- fi
-}
-
-# Downloads and setup tests package
-setupTestsPackage()
-{
- downloadPackage "$TESTS_PACKAGE_DONLOAD_URL" "/opt/ignite-cassandra-tests.zip" "Tests"
-
- rm -Rf /opt/ignite-cassandra-tests
-
- unzip /opt/ignite-cassandra-tests.zip -d /opt
- if [ $? -ne 0 ]; then
- terminate "Failed to unzip tests package"
- fi
-
- rm -f /opt/ignite-cassandra-tests.zip
-
- unzipDir=$(ls /opt | grep "ignite-cassandra")
- if [ "$unzipDir" != "ignite-cassandra-tests" ]; then
- mv /opt/$unzipDir /opt/ignite-cassandra-tests
- fi
-
- find /opt/ignite-cassandra-tests -type f -name "*.sh" -exec chmod ug+x {} \;
-
- . /opt/ignite-cassandra-tests/bootstrap/aws/common.sh "cassandra"
-
- setupNTP
-
- echo "[INFO] Starting logs collector daemon"
-
- HOST_NAME=$(hostname -f | tr '[:upper:]' '[:lower:]')
- /opt/ignite-cassandra-tests/bootstrap/aws/logs-collector.sh "$S3_LOGS_TRIGGER" "$S3_CASSANDRA_LOGS/$HOST_NAME" "/opt/cassandra/logs" "/opt/cassandra/cassandra-start.log" > /opt/logs-collector.log &
-
- echo "[INFO] Logs collector daemon started: $!"
-
- echo "----------------------------------------------------------------------------------------"
- printInstanceInfo
- echo "----------------------------------------------------------------------------------------"
- tagInstance
- bootstrapGangliaAgent "cassandra" 8641
-}
-
-# Downloads Cassandra package
-downloadCassandra()
-{
- downloadPackage "$CASSANDRA_DOWNLOAD_URL" "/opt/apache-cassandra.tar.gz" "Cassandra"
-
- rm -Rf /opt/cassandra
-
- echo "[INFO] Untaring Cassandra package"
- tar -xvzf /opt/apache-cassandra.tar.gz -C /opt
- if [ $? -ne 0 ]; then
- terminate "Failed to untar Cassandra package"
- fi
-
- rm -f /opt/apache-cassandra.tar.gz
-
- unzipDir=$(ls /opt | grep "cassandra" | grep "apache")
- if [ "$unzipDir" != "cassandra" ]; then
- mv /opt/$unzipDir /opt/cassandra
- fi
-}
-
-# Setups Cassandra
-setupCassandra()
-{
- echo "[INFO] Creating 'cassandra' group"
- exists=$(cat /etc/group | grep cassandra)
- if [ -z "$exists" ]; then
- groupadd cassandra
- if [ $? -ne 0 ]; then
- terminate "Failed to create 'cassandra' group"
- fi
- fi
-
- echo "[INFO] Creating 'cassandra' user"
- exists=$(cat /etc/passwd | grep cassandra)
- if [ -z "$exists" ]; then
- useradd -g cassandra cassandra
- if [ $? -ne 0 ]; then
- terminate "Failed to create 'cassandra' user"
- fi
- fi
-
- rm -f /opt/cassandra/conf/cassandra-env.sh /opt/cassandra/conf/cassandra-template.yaml
-
- cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-env.sh /opt/cassandra/conf
- cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-template.yaml /opt/cassandra/conf
-
- chown -R cassandra:cassandra /opt/cassandra /opt/ignite-cassandra-tests
-
- createCassandraStorageLayout
-
- cat /opt/cassandra/conf/cassandra-template.yaml | sed -r "s/\\\$\{CASSANDRA_DATA_DIR\}/$CASSANDRA_DATA_DIR/g" > /opt/cassandra/conf/cassandra-template-1.yaml
- cat /opt/cassandra/conf/cassandra-template-1.yaml | sed -r "s/\\\$\{CASSANDRA_COMMITLOG_DIR\}/$CASSANDRA_COMMITLOG_DIR/g" > /opt/cassandra/conf/cassandra-template-2.yaml
- cat /opt/cassandra/conf/cassandra-template-2.yaml | sed -r "s/\\\$\{CASSANDRA_CACHES_DIR\}/$CASSANDRA_CACHES_DIR/g" > /opt/cassandra/conf/cassandra-template-3.yaml
-
- rm -f /opt/cassandra/conf/cassandra-template.yaml /opt/cassandra/conf/cassandra-template-1.yaml /opt/cassandra/conf/cassandra-template-2.yaml
- mv /opt/cassandra/conf/cassandra-template-3.yaml /opt/cassandra/conf/cassandra-template.yaml
-
- echo "export JAVA_HOME=/opt/java" >> $1
- echo "export CASSANDRA_HOME=/opt/cassandra" >> $1
- echo "export PATH=\$JAVA_HOME/bin:\$CASSANDRA_HOME/bin:\$PATH" >> $1
-}
-
-###################################################################################################################
-
-echo "[INFO]-----------------------------------------------------------------"
-echo "[INFO] Bootstrapping Cassandra node"
-echo "[INFO]-----------------------------------------------------------------"
-
-setupPreRequisites
-setupJava
-setupAWSCLI
-setupTestsPackage
-downloadCassandra
-setupCassandra "/root/.bash_profile"
-
-cmd="/opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-start.sh"
-
-#sudo -u cassandra -g cassandra sh -c "$cmd | tee /opt/cassandra/cassandra-start.log"
-
-$cmd | tee /opt/cassandra/cassandra-start.log
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh b/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh
deleted file mode 100644
index ba76401..0000000
--- a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh
+++ /dev/null
@@ -1,287 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# -----------------------------------------------------------------------------------------------
-# Environment setup script from Cassandra distribution
-# -----------------------------------------------------------------------------------------------
-
-calculate_heap_sizes()
-{
- case "`uname`" in
- Linux)
- system_memory_in_mb=`free -m | awk '/:/ {print $2;exit}'`
- system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo`
- ;;
- FreeBSD)
- system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
- system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
- system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
- ;;
- SunOS)
- system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'`
- system_cpu_cores=`psrinfo | wc -l`
- ;;
- Darwin)
- system_memory_in_bytes=`sysctl hw.memsize | awk '{print $2}'`
- system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
- system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
- ;;
- *)
- # assume reasonable defaults for e.g. a modern desktop or
- # cheap server
- system_memory_in_mb="2048"
- system_cpu_cores="2"
- ;;
- esac
-
- # some systems like the raspberry pi don't report cores, use at least 1
- if [ "$system_cpu_cores" -lt "1" ]
- then
- system_cpu_cores="1"
- fi
-
- # set max heap size based on the following
- # max(min(1/2 ram, 1024MB), min(1/4 ram, 8GB))
- # calculate 1/2 ram and cap to 1024MB
- # calculate 1/4 ram and cap to 8192MB
- # pick the max
- half_system_memory_in_mb=`expr $system_memory_in_mb / 2`
- quarter_system_memory_in_mb=`expr $half_system_memory_in_mb / 2`
- if [ "$half_system_memory_in_mb" -gt "1024" ]
- then
- half_system_memory_in_mb="1024"
- fi
- if [ "$quarter_system_memory_in_mb" -gt "8192" ]
- then
- quarter_system_memory_in_mb="8192"
- fi
- if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]
- then
- max_heap_size_in_mb="$half_system_memory_in_mb"
- else
- max_heap_size_in_mb="$quarter_system_memory_in_mb"
- fi
- MAX_HEAP_SIZE="${max_heap_size_in_mb}M"
-
- # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size)
- max_sensible_yg_per_core_in_mb="100"
- max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores`
-
- desired_yg_in_mb=`expr $max_heap_size_in_mb / 4`
-
- if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
- then
- HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
- else
- HEAP_NEWSIZE="${desired_yg_in_mb}M"
- fi
-}
-
-# Determine the sort of JVM we'll be running on.
-java_ver_output=`"${JAVA:-java}" -version 2>&1`
-jvmver=`echo "$java_ver_output" | grep '[openjdk|java] version' | awk -F'"' 'NR==1 {print $2}'`
-JVM_VERSION=${jvmver%_*}
-JVM_PATCH_VERSION=${jvmver#*_}
-
-if [ "$JVM_VERSION" \< "1.8" ] ; then
- echo "Cassandra 3.0 and later require Java 8u40 or later."
- exit 1;
-fi
-
-if [ "$JVM_VERSION" \< "1.8" ] && [ "$JVM_PATCH_VERSION" \< "40" ] ; then
- echo "Cassandra 3.0 and later require Java 8u40 or later."
- exit 1;
-fi
-
-jvm=`echo "$java_ver_output" | grep -A 1 'java version' | awk 'NR==2 {print $1}'`
-case "$jvm" in
- OpenJDK)
- JVM_VENDOR=OpenJDK
- # this will be "64-Bit" or "32-Bit"
- JVM_ARCH=`echo "$java_ver_output" | awk 'NR==3 {print $2}'`
- ;;
- "Java(TM)")
- JVM_VENDOR=Oracle
- # this will be "64-Bit" or "32-Bit"
- JVM_ARCH=`echo "$java_ver_output" | awk 'NR==3 {print $3}'`
- ;;
- *)
- # Help fill in other JVM values
- JVM_VENDOR=other
- JVM_ARCH=unknown
- ;;
-esac
-
-# Override these to set the amount of memory to allocate to the JVM at
-# start-up. For production use you may wish to adjust this for your
-# environment. MAX_HEAP_SIZE is the total amount of memory dedicated
-# to the Java heap. HEAP_NEWSIZE refers to the size of the young
-# generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should be either set
-# or not (if you set one, set the other).
-#
-# The main trade-off for the young generation is that the larger it
-# is, the longer GC pause times will be. The shorter it is, the more
-# expensive GC will be (usually).
-#
-# The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause
-# times. If in doubt, and if you do not particularly want to tweak, go with
-# 100 MB per physical CPU core.
-
-#MAX_HEAP_SIZE="4G"
-#HEAP_NEWSIZE="800M"
-
-# Set this to control the amount of arenas per-thread in glibc
-#export MALLOC_ARENA_MAX=4
-
-# only calculate the size if it's not set manually
-if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then
- calculate_heap_sizes
-else
- if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then
- echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)"
- exit 1
- fi
-fi
-
-if [ "x$MALLOC_ARENA_MAX" = "x" ] ; then
- export MALLOC_ARENA_MAX=4
-fi
-
-#GC log path has to be defined here because it needs to access CASSANDRA_HOME
-JVM_OPTS="$JVM_OPTS -Xloggc:${CASSANDRA_HOME}/logs/gc.log"
-
-# Here we create the arguments that will get passed to the jvm when
-# starting cassandra.
-
-# Read user-defined JVM options from jvm.options file
-JVM_OPTS_FILE=$CASSANDRA_CONF/jvm.options
-for opt in `grep "^-" $JVM_OPTS_FILE`
-do
- JVM_OPTS="$JVM_OPTS $opt"
-done
-
-# Check what parameters were defined on jvm.options file to avoid conflicts
-echo $JVM_OPTS | grep -q Xmn
-DEFINED_XMN=$?
-echo $JVM_OPTS | grep -q Xmx
-DEFINED_XMX=$?
-echo $JVM_OPTS | grep -q Xms
-DEFINED_XMS=$?
-echo $JVM_OPTS | grep -q UseConcMarkSweepGC
-USING_CMS=$?
-
-# We only set -Xms and -Xmx if they were not defined on jvm.options file
-# If defined, both Xmx and Xms should be defined together.
-if [ $DEFINED_XMX -ne 0 ] && [ $DEFINED_XMS -ne 0 ]; then
- JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}"
- JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}"
-elif [ $DEFINED_XMX -ne 0 ] || [ $DEFINED_XMS -ne 0 ]; then
- echo "Please set or unset -Xmx and -Xms flags in pairs on jvm.options file."
- exit 1
-fi
-
-# We only set -Xmn flag if it was not defined in jvm.options file
-# and if the CMS GC is being used
-# If defined, both Xmn and Xmx should be defined together.
-if [ $DEFINED_XMN -eq 0 ] && [ $DEFINED_XMX -ne 0 ]; then
- echo "Please set or unset -Xmx and -Xmn flags in pairs on jvm.options file."
- exit 1
-elif [ $DEFINED_XMN -ne 0 ] && [ $USING_CMS -eq 0 ]; then
- JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}"
-fi
-
-if [ "$JVM_ARCH" = "64-Bit" ] && [ $USING_CMS -eq 0 ]; then
- JVM_OPTS="$JVM_OPTS -XX:+UseCondCardMark"
-fi
-
-# provides hints to the JIT compiler
-JVM_OPTS="$JVM_OPTS -XX:CompileCommandFile=$CASSANDRA_CONF/hotspot_compiler"
-
-# add the jamm javaagent
-JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.3.0.jar"
-
-# set jvm HeapDumpPath with CASSANDRA_HEAPDUMP_DIR
-if [ "x$CASSANDRA_HEAPDUMP_DIR" != "x" ]; then
- JVM_OPTS="$JVM_OPTS -XX:HeapDumpPath=$CASSANDRA_HEAPDUMP_DIR/cassandra-`date +%s`-pid$$.hprof"
-fi
-
-# jmx: metrics and administration interface
-#
-# add this if you're having trouble connecting:
-# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>"
-#
-# see
-# https://blogs.oracle.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole
-# for more on configuring JMX through firewalls, etc. (Short version:
-# get it working with no firewall first.)
-#
-# Cassandra ships with JMX accessible *only* from localhost.
-# To enable remote JMX connections, uncomment lines below
-# with authentication and/or ssl enabled. See https://wiki.apache.org/cassandra/JmxSecurity
-#
-if [ "x$LOCAL_JMX" = "x" ]; then
- LOCAL_JMX=yes
-fi
-
-# Specifies the default port over which Cassandra will be available for
-# JMX connections.
-# For security reasons, you should not expose this port to the internet. Firewall it if needed.
-JMX_PORT="7199"
-
-if [ "$LOCAL_JMX" = "yes" ]; then
-# JVM_OPTS="$JVM_OPTS -Dcassandra.jmx.local.port=$JMX_PORT -XX:+DisableExplicitGC"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.local.only=false"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false"
- JVM_OPTS="$JVM_OPTS -XX:+UnlockCommercialFeatures"
- JVM_OPTS="$JVM_OPTS -XX:+FlightRecorder"
- JVM_OPTS="$JVM_OPTS -XX:FlightRecorderOptions=defaultrecording=true"
-else
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=true"
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.password.file=/etc/cassandra/jmxremote.password"
-# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore"
-# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.keyStorePassword=<keystore-password>"
-# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore"
-# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.trustStorePassword=<truststore-password>"
-# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.need.client.auth=true"
-# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.registry.ssl=true"
-# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.enabled.protocols=<enabled-protocols>"
-# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.enabled.cipher.suites=<enabled-cipher-suites>"
-fi
-
-# To use mx4j, an HTML interface for JMX, add mx4j-tools.jar to the lib/
-# directory.
-# See http://wiki.apache.org/cassandra/Operations#Monitoring_with_MX4J
-# By default mx4j listens on 0.0.0.0:8081. Uncomment the following lines
-# to control its listen address and port.
-#MX4J_ADDRESS="-Dmx4jaddress=127.0.0.1"
-#MX4J_PORT="-Dmx4jport=8081"
-
-# Cassandra uses SIGAR to capture OS metrics CASSANDRA-7838
-# for SIGAR we have to set the java.library.path
-# to the location of the native libraries.
-JVM_OPTS="$JVM_OPTS -Djava.library.path=$CASSANDRA_HOME/lib/sigar-bin"
-
-JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS"
-JVM_OPTS="$JVM_OPTS $MX4J_PORT"
-JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"