You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/04/18 14:00:21 UTC
ignite git commit: IGNITE-4954 - Configurable expiration timeout for
Cassandra session. This closes #1785.
Repository: ignite
Updated Branches:
refs/heads/master 36a6cd012 -> 735ce60da
IGNITE-4954 - Configurable expiration timeout for Cassandra session. This closes #1785.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/735ce60d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/735ce60d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/735ce60d
Branch: refs/heads/master
Commit: 735ce60da02ebadc43aaa29cc97d331b8056df36
Parents: 36a6cd0
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Apr 13 11:29:30 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Apr 18 16:59:50 2017 +0300
----------------------------------------------------------------------
.../store/cassandra/datasource/DataSource.java | 50 ++++++++++++++------
.../cassandra/session/CassandraSessionImpl.java | 23 +++++----
.../cassandra/session/pool/SessionPool.java | 6 +--
.../cassandra/session/pool/SessionWrapper.java | 15 +++---
4 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
index 1ba3c7d..754d902 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@ -17,6 +17,16 @@
package org.apache.ignite.cache.store.cassandra.datasource;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
@@ -31,25 +41,13 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
-
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Data source abstraction to specify configuration of the Cassandra session to be used.
@@ -64,6 +62,9 @@ public class DataSource implements Externalizable {
*/
private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
+ /** Default expiration timeout for Cassandra driver session. */
+ public static final long DFLT_SESSION_EXPIRATION_TIMEOUT = 300000; // 5 minutes.
+
/** Number of rows to immediately fetch in CQL statement execution. */
private Integer fetchSize;
@@ -141,6 +142,9 @@ public class DataSource implements Externalizable {
/** Netty options to use for connection. */
private NettyOptions nettyOptions;
+ /** Expiration timeout for Cassandra driver session. */
+ private long sessionExpirationTimeout = DFLT_SESSION_EXPIRATION_TIMEOUT;
+
/** Cassandra session wrapper instance. */
private volatile CassandraSession ses;
@@ -460,6 +464,23 @@ public class DataSource implements Externalizable {
}
/**
+ * Sets expiration timeout for Cassandra driver session. Idle sessions that are not
+ * used during this timeout value will be automatically closed and recreated later
+ * on demand.
+ * <p>
+ * If set to {@code 0}, timeout is disabled.
+ * <p>
+ * Default value is {@link #DFLT_SESSION_EXPIRATION_TIMEOUT}.
+ *
+ * @param sessionExpirationTimeout Expiration timeout for Cassandra driver session.
+ */
+ public void setSessionExpirationTimeout(long sessionExpirationTimeout) {
+ this.sessionExpirationTimeout = sessionExpirationTimeout;
+
+ invalidate();
+ }
+
+ /**
* Creates Cassandra session wrapper if it wasn't created yet and returns it
*
* @param log logger
@@ -541,7 +562,8 @@ public class DataSource implements Externalizable {
if (nettyOptions != null)
builder = builder.withNettyOptions(nettyOptions);
- return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
+ return ses = new CassandraSessionImpl(
+ builder, fetchSize, readConsistency, writeConsistency, sessionExpirationTimeout, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index ac11686..19b88c9 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -17,6 +17,13 @@
package org.apache.ignite.cache.store.cassandra.session;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
@@ -30,13 +37,6 @@ import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.querybuilder.Batch;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
@@ -83,6 +83,9 @@ public class CassandraSessionImpl implements CassandraSession {
/** Consistency level for Cassandra WRITE operations (insert/update/delete). */
private ConsistencyLevel writeConsistency;
+ /** Expiration timeout. */
+ private long expirationTimeout;
+
/** Logger. */
private IgniteLogger log;
@@ -102,11 +105,12 @@ public class CassandraSessionImpl implements CassandraSession {
* @param log Logger.
*/
public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
- ConsistencyLevel writeConsistency, IgniteLogger log) {
+ ConsistencyLevel writeConsistency, long expirationTimeout, IgniteLogger log) {
this.builder = builder;
this.fetchSize = fetchSize;
this.readConsistency = readConsistency;
this.writeConsistency = writeConsistency;
+ this.expirationTimeout = expirationTimeout;
this.log = log;
}
@@ -504,7 +508,8 @@ public class CassandraSessionImpl implements CassandraSession {
/** {@inheritDoc} */
@Override public synchronized void close() throws IOException {
if (decrementSessionRefs() == 0 && ses != null) {
- SessionPool.put(this, ses);
+ SessionPool.put(this, ses, expirationTimeout);
+
ses = null;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
index 95938bd..4de8516 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -17,13 +17,13 @@
package org.apache.ignite.cache.store.cassandra.session.pool;
-import com.datastax.driver.core.Session;
import java.lang.Thread.State;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.datastax.driver.core.Session;
import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
/**
@@ -98,14 +98,14 @@ public class SessionPool {
* @param cassandraSes Session wrapper.
* @param driverSes Driver session.
*/
- public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
+ public static void put(CassandraSessionImpl cassandraSes, Session driverSes, long expirationTimeout) {
if (cassandraSes == null || driverSes == null)
return;
SessionWrapper old;
synchronized (sessions) {
- old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
+ old = sessions.put(cassandraSes, new SessionWrapper(driverSes, expirationTimeout));
if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
monitorSingleton = new SessionMonitor();
http://git-wip-us.apache.org/repos/asf/ignite/blob/735ce60d/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
index 7c5722b..68b9dd4 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
@@ -24,12 +24,12 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
* Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
*/
public class SessionWrapper {
- /** Expiration timeout for Cassandra driver session. */
- public static final long DFLT_EXPIRATION_TIMEOUT = 300000; // 5 minutes.
-
/** Cassandra driver session. */
private Session ses;
+ /** Expiration timeout. */
+ private long expirationTimeout;
+
/** Wrapper creation time. */
private long time;
@@ -38,9 +38,11 @@ public class SessionWrapper {
*
* @param ses Cassandra driver session.
*/
- public SessionWrapper(Session ses) {
+ public SessionWrapper(Session ses, long expirationTimeout) {
this.ses = ses;
- this.time = System.currentTimeMillis();
+ this.expirationTimeout = expirationTimeout;
+
+ time = System.currentTimeMillis();
}
/**
@@ -49,7 +51,7 @@ public class SessionWrapper {
* @return true if session expired.
*/
public boolean expired() {
- return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
+ return expirationTimeout > 0 && System.currentTimeMillis() - time > expirationTimeout;
}
/**
@@ -66,6 +68,7 @@ public class SessionWrapper {
*/
public void release() {
CassandraHelper.closeSession(ses);
+
ses = null;
}
}