You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/07/30 02:59:54 UTC

[1/6] cassandra git commit: Ensure atomicity inside thrift and stream session

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 ab6e4854d -> b334f519f
  refs/heads/trunk b5045356d -> 88e534a4f


Ensure atomicity inside thrift and stream session

patch by Paulo Motta; reviewed by yukim for CASSANDRA-7757


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a4728f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a4728f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a4728f6

Branch: refs/heads/trunk
Commit: 0a4728f62b51095706bf7155e8f60b39ec5fa082
Parents: dce303b
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 29 19:52:40 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:53:07 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                 | 1 +
 src/java/org/apache/cassandra/streaming/StreamSession.java  | 9 ++++++---
 .../org/apache/cassandra/thrift/ThriftSessionManager.java   | 9 ++++++---
 3 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4ef77ed..5cfc347 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
    config file or environment variables (CASSANDRA-9544)
  * Remove repair snapshot leftover on startup (CASSANDRA-7357)
  * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
+ * Ensure atomicity inside thrift and stream session (CASSANDRA-7757)
 Merged from 2.0:
  * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
  * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1edfedb..63219d8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -137,7 +137,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;
@@ -369,8 +369,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             StreamTransferTask task = transfers.get(cfId);
             if (task == null)
             {
-                task = new StreamTransferTask(this, cfId);
-                transfers.put(cfId, task);
+                //guarantee atomicity
+                StreamTransferTask newTask = new StreamTransferTask(this, cfId);
+                task = transfers.putIfAbsent(cfId, newTask);
+                if (task == null)
+                    task = newTask;
             }
             task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt);
             iter.remove();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index ed3df6d..6caa558 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -36,7 +36,7 @@ public class ThriftSessionManager
     public final static ThriftSessionManager instance = new ThriftSessionManager();
 
     private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<>();
-    private final Map<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>();
 
     /**
      * @param socket the address on which the current thread will work on requests for until further notice
@@ -57,8 +57,11 @@ public class ThriftSessionManager
         ThriftClientState cState = activeSocketSessions.get(socket);
         if (cState == null)
         {
-            cState = new ThriftClientState(socket);
-            activeSocketSessions.put(socket, cState);
+            //guarantee atomicity
+            ThriftClientState newState = new ThriftClientState(socket);
+            cState = activeSocketSessions.putIfAbsent(socket, newState);
+            if (cState == null)
+                cState = newState;
         }
         return cState;
     }


[4/6] cassandra git commit: fix CHANGES.txt

Posted by yu...@apache.org.
fix CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b334f519
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b334f519
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b334f519

Branch: refs/heads/trunk
Commit: b334f519f35507e8793fb7aa6d333bc06f67a31b
Parents: ab6e485
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 29 19:59:34 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:59:34 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b334f519/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 28e6c4a..2fe6a22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,5 @@
 2.2.1
  * UDF / UDA execution time in trace (CASSANDRA-9723)
- * Remove repair snapshot leftover on startup (CASSANDRA-7357)
- * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
  * Fix broken internode SSL (CASSANDRA-9884)
 Merged from 2.1:
  * Fix handling of enable/disable autocompaction (CASSANDRA-9899)


[3/6] cassandra git commit: Merge branch 'cassandra-2.2' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a892dd81
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a892dd81
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a892dd81

Branch: refs/heads/trunk
Commit: a892dd8129a046dc043f291101d7b965aac78398
Parents: b504535 ab6e485
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 29 19:58:36 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:58:36 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                 | 3 +++
 src/java/org/apache/cassandra/streaming/StreamSession.java  | 9 ++++++---
 .../org/apache/cassandra/thrift/ThriftSessionManager.java   | 9 ++++++---
 3 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a892dd81/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a892dd81/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------


[6/6] cassandra git commit: Merge branch 'cassandra-2.2' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88e534a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88e534a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88e534a4

Branch: refs/heads/trunk
Commit: 88e534a4fe2def014d43890c427b2fc3ed23fd60
Parents: a892dd8 b334f51
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 29 19:59:38 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:59:38 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/88e534a4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 73edd83,2fe6a22..13e3acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,33 -1,5 +1,31 @@@
 +3.0
 + * Implement proper sandboxing for UDFs (CASSANDRA-9402)
 + * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
 + * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
 + * Metrics should use up to date nomenclature (CASSANDRA-9448)
 + * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
 + * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
 + * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825,
 +   9848, 9705, 9859, 9867, 9874, 9828, 9801)
 + * Update Guava to 18.0 (CASSANDRA-9653)
 + * Bloom filter false positive ratio is not honoured (CASSANDRA-8413)
 + * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522)
 + * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035)
 + * Add algorithmic token allocation (CASSANDRA-7032)
 + * Add nodetool command to replay batchlog (CASSANDRA-9547)
 + * Make file buffer cache independent of paths being read (CASSANDRA-8897)
 + * Remove deprecated legacy Hadoop code (CASSANDRA-9353)
 + * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
 + * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
 + * Change default garbage collector to G1 (CASSANDRA-7486)
 + * Populate TokenMetadata early during startup (CASSANDRA-9317)
 + * Undeprecate cache recentHitRate (CASSANDRA-6591)
 + * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865)
 + * Materialized Views (CASSANDRA-6477)
 +
 +
  2.2.1
   * UDF / UDA execution time in trace (CASSANDRA-9723)
-  * Remove repair snapshot leftover on startup (CASSANDRA-7357)
-  * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
   * Fix broken internode SSL (CASSANDRA-9884)
  Merged from 2.1:
   * Fix handling of enable/disable autocompaction (CASSANDRA-9899)


[5/6] cassandra git commit: fix CHANGES.txt

Posted by yu...@apache.org.
fix CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b334f519
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b334f519
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b334f519

Branch: refs/heads/cassandra-2.2
Commit: b334f519f35507e8793fb7aa6d333bc06f67a31b
Parents: ab6e485
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 29 19:59:34 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:59:34 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b334f519/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 28e6c4a..2fe6a22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,5 @@
 2.2.1
  * UDF / UDA execution time in trace (CASSANDRA-9723)
- * Remove repair snapshot leftover on startup (CASSANDRA-7357)
- * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
  * Fix broken internode SSL (CASSANDRA-9884)
 Merged from 2.1:
  * Fix handling of enable/disable autocompaction (CASSANDRA-9899)


[2/6] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab6e4854
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab6e4854
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab6e4854

Branch: refs/heads/trunk
Commit: ab6e4854d4b83abfd9d46479a24d22f4585ef63b
Parents: 2da0169 0a4728f
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 29 19:55:35 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:55:35 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                 | 3 +++
 src/java/org/apache/cassandra/streaming/StreamSession.java  | 9 ++++++---
 .../org/apache/cassandra/thrift/ThriftSessionManager.java   | 9 ++++++---
 3 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6e4854/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 50ac561,5cfc347..28e6c4a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,7 +1,26 @@@
 -2.1.9
 +2.2.1
 + * UDF / UDA execution time in trace (CASSANDRA-9723)
 + * Remove repair snapshot leftover on startup (CASSANDRA-7357)
 + * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
 + * Fix broken internode SSL (CASSANDRA-9884)
 +Merged from 2.1:
   * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
 - * Commit log segment recycling is disabled by default (CASSANDRA-9896)
   * Add consistency level to tracing ouput (CASSANDRA-9827)
++ * Remove repair snapshot leftover on startup (CASSANDRA-7357)
++ * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
++ * Ensure atomicity inside thrift and stream session (CASSANDRA-7757)
 +Merged from 2.0:
 + * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
 + * Don't track hotness when opening from snapshot for validation (CASSANDRA-9382)
 +
 +
 +2.2.0
 + * Allow the selection of columns together with aggregates (CASSANDRA-9767)
 + * Fix cqlsh copy methods and other windows specific issues (CASSANDRA-9795)
 + * Don't wrap byte arrays in SequentialWriter (CASSANDRA-9797)
 + * sum() and avg() functions missing for smallint and tinyint types (CASSANDRA-9671)
 + * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
 +Merged from 2.1:
   * Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)
   * Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837)
   * Handle corrupt files on startup (CASSANDRA-9686)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6e4854/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 7236194,63219d8..366bc33
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -135,9 -135,9 +135,9 @@@ public class StreamSession implements I
      private StreamResultFuture streamResult;
  
      // stream requests to send to the peer
 -    private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
 +    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
      // streaming tasks are created and managed per ColumnFamily ID
-     private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+     private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
      // data receivers, filled after receiving prepare message
      private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
      private final StreamingMetrics metrics;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6e4854/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index 2703b52,6caa558..3603ad5
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@@ -58,8 -57,11 +58,11 @@@ public class ThriftSessionManage
          ThriftClientState cState = activeSocketSessions.get(socket);
          if (cState == null)
          {
-             cState = new ThriftClientState((InetSocketAddress)socket);
-             activeSocketSessions.put(socket, cState);
+             //guarantee atomicity
 -            ThriftClientState newState = new ThriftClientState(socket);
++            ThriftClientState newState = new ThriftClientState((InetSocketAddress)socket);
+             cState = activeSocketSessions.putIfAbsent(socket, newState);
+             if (cState == null)
+                 cState = newState;
          }
          return cState;
      }