You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2017/04/27 11:52:59 UTC

[08/11] ignite git commit: IGNITE-4954 - Configurable expiration timeout for Cassandra session. This closes #1785.

IGNITE-4954 - Configurable expiration timeout for Cassandra session. This closes #1785.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf104974
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf104974
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf104974

Branch: refs/heads/ignite-security-fixes
Commit: bf1049741f7a64728bd433f78262ba273f969848
Parents: 4a1415a
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Mon Apr 17 19:00:30 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Mon Apr 17 19:00:30 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/datasource/DataSource.java  | 26 +++++++++++++++++++-
 .../cassandra/session/CassandraSessionImpl.java | 23 ++++++++++-------
 .../cassandra/session/pool/SessionPool.java     |  6 ++---
 .../cassandra/session/pool/SessionWrapper.java  | 15 ++++++-----
 4 files changed, 51 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bf104974/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
index 915eebd..1fa2a1d 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@ -46,6 +46,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  * Data source abstraction to specify configuration of the Cassandra session to be used.
  */
 public class DataSource {
+    /** Default expiration timeout for Cassandra driver session. */
+    public static final long DFLT_SESSION_EXPIRATION_TIMEOUT = 300000; // 5 minutes.
+
     /** Number of rows to immediately fetch in CQL statement execution. */
     private Integer fetchSize;
 
@@ -123,6 +126,9 @@ public class DataSource {
     /** Netty options to use for connection. */
     private NettyOptions nettyOptions;
 
+    /** Expiration timeout for Cassandra driver session. */
+    private long sessionExpirationTimeout = DFLT_SESSION_EXPIRATION_TIMEOUT;
+
     /** Cassandra session wrapper instance. */
     private volatile CassandraSession ses;
 
@@ -442,6 +448,23 @@ public class DataSource {
     }
 
     /**
+     * Sets expiration timeout for Cassandra driver session. Idle sessions that are not
+     * used during this timeout value will be automatically closed and recreated later
+     * on demand.
+     * <p>
+     * If set to {@code 0}, timeout is disabled.
+     * <p>
+     * Default value is {@link #DFLT_SESSION_EXPIRATION_TIMEOUT}.
+     *
+     * @param sessionExpirationTimeout Expiration timeout for Cassandra driver session.
+     */
+    public void setSessionExpirationTimeout(long sessionExpirationTimeout) {
+        this.sessionExpirationTimeout = sessionExpirationTimeout;
+
+        invalidate();
+    }
+
+    /**
      * Creates Cassandra session wrapper if it wasn't created yet and returns it
      *
      * @param log logger
@@ -523,7 +546,8 @@ public class DataSource {
         if (nettyOptions != null)
             builder = builder.withNettyOptions(nettyOptions);
 
-        return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
+        return ses = new CassandraSessionImpl(
+            builder, fetchSize, readConsistency, writeConsistency, sessionExpirationTimeout, log);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf104974/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index 95b8581..cee776b 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -17,6 +17,13 @@
 
 package org.apache.ignite.cache.store.cassandra.session;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.Cluster;
@@ -30,13 +37,6 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.querybuilder.Batch;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
@@ -82,6 +82,9 @@ public class CassandraSessionImpl implements CassandraSession {
     /** Consistency level for Cassandra WRITE operations (insert/update/delete). */
     private ConsistencyLevel writeConsistency;
 
+    /** Expiration timeout. */
+    private long expirationTimeout;
+
     /** Logger. */
     private IgniteLogger log;
 
@@ -101,11 +104,12 @@ public class CassandraSessionImpl implements CassandraSession {
      * @param log Logger.
      */
     public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
-        ConsistencyLevel writeConsistency, IgniteLogger log) {
+        ConsistencyLevel writeConsistency, long expirationTimeout, IgniteLogger log) {
         this.builder = builder;
         this.fetchSize = fetchSize;
         this.readConsistency = readConsistency;
         this.writeConsistency = writeConsistency;
+        this.expirationTimeout = expirationTimeout;
         this.log = log;
     }
 
@@ -404,7 +408,8 @@ public class CassandraSessionImpl implements CassandraSession {
     /** {@inheritDoc} */
     @Override public synchronized void close() throws IOException {
         if (decrementSessionRefs() == 0 && ses != null) {
-            SessionPool.put(this, ses);
+            SessionPool.put(this, ses, expirationTimeout);
+
             ses = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf104974/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
index fc4a907..86db713 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.cache.store.cassandra.session.pool;
 
-import com.datastax.driver.core.Session;
 import java.lang.Thread.State;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import com.datastax.driver.core.Session;
 import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
 
 /**
@@ -98,14 +98,14 @@ public class SessionPool {
      * @param cassandraSes Session wrapper.
      * @param driverSes Driver session.
      */
-    public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
+    public static void put(CassandraSessionImpl cassandraSes, Session driverSes, long expirationTimeout) {
         if (cassandraSes == null || driverSes == null)
             return;
 
         SessionWrapper old;
 
         synchronized (sessions) {
-            old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
+            old = sessions.put(cassandraSes, new SessionWrapper(driverSes, expirationTimeout));
 
             if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
                 monitorSingleton = new SessionMonitor();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf104974/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
index 7c5722b..68b9dd4 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
@@ -24,12 +24,12 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
  * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
  */
 public class SessionWrapper {
-    /** Expiration timeout for Cassandra driver session. */
-    public static final long DFLT_EXPIRATION_TIMEOUT = 300000;  // 5 minutes.
-
     /** Cassandra driver session. */
     private Session ses;
 
+    /** Expiration timeout. */
+    private long expirationTimeout;
+
     /** Wrapper creation time.  */
     private long time;
 
@@ -38,9 +38,11 @@ public class SessionWrapper {
      *
      * @param ses Cassandra driver session.
      */
-    public SessionWrapper(Session ses) {
+    public SessionWrapper(Session ses, long expirationTimeout) {
         this.ses = ses;
-        this.time = System.currentTimeMillis();
+        this.expirationTimeout = expirationTimeout;
+
+        time = System.currentTimeMillis();
     }
 
     /**
@@ -49,7 +51,7 @@ public class SessionWrapper {
      * @return true if session expired.
      */
     public boolean expired() {
-        return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
+        return expirationTimeout > 0 && System.currentTimeMillis() - time > expirationTimeout;
     }
 
     /**
@@ -66,6 +68,7 @@ public class SessionWrapper {
      */
     public void release() {
         CassandraHelper.closeSession(ses);
+
         ses = null;
     }
 }