You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/03 15:03:02 UTC

svn commit: r1197123 - in /cassandra/branches/cassandra-1.0: CHANGES.txt src/java/org/apache/cassandra/service/AntiEntropyService.java src/java/org/apache/cassandra/service/StorageService.java src/java/org/apache/cassandra/service/StorageServiceMBean.java

Author: slebresne
Date: Thu Nov  3 14:03:01 2011
New Revision: 1197123

URL: http://svn.apache.org/viewvc?rev=1197123&view=rev
Log:
Add JMX call to remove failed repair sessions
patch by yukim; reviewed by slebresne for CASSANDRA-3316

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1197123&r1=1197122&r2=1197123&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Nov  3 14:03:01 2011
@@ -8,6 +8,7 @@
  * fix DecimalType bytebuffer marshalling (CASSANDRA-3421)
  * fix bug that caused first column in per row indexes to be ignored 
    (CASSANDRA-3441)
+ * add JMX call to clean (failed) repair sessions (CASSANDRA-3316)
 Merged from 0.8:
  * acquire compactionlock during truncate (CASSANDRA-3399)
  * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1197123&r1=1197122&r2=1197123&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Nov  3 14:03:01 2011
@@ -125,6 +125,14 @@ public class AntiEntropyService
         return futureTask;
     }
 
+    public void terminateSessions()
+    {
+        for (RepairSession session : sessions.values())
+        {
+            session.forceShutdown();
+        }
+    }
+
     // for testing only. Create a session corresponding to a fake request and
     // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String... cfnames)
@@ -172,6 +180,9 @@ public class AntiEntropyService
             return;
         }
 
+        if (session.terminated())
+            return;
+
         logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", session.getName(), request.cf.right, request.endpoint));
 
         RepairSession.RepairJob job = session.jobs.peek();
@@ -598,6 +609,8 @@ public class AntiEntropyService
         private final SimpleCondition completed = new SimpleCondition();
         public final Condition differencingDone = new SimpleCondition();
 
+        private volatile boolean terminated = false;
+
         public RepairSession(TreeRequest req, String tablename, String... cfnames)
         {
             this(req.sessionid, req.range, tablename, cfnames);
@@ -697,12 +710,36 @@ public class AntiEntropyService
             {
                 FailureDetector.instance.unregisterFailureDetectionEventListener(this);
                 Gossiper.instance.unregister(this);
+                // mark this session as terminated
+                terminated = true;
                 AntiEntropyService.instance.sessions.remove(getName());
             }
         }
 
+        /**
+         * @return wheather this session is terminated
+         */
+        public boolean terminated()
+        {
+            return terminated;
+        }
+
+        /**
+         * clear all RepairJobs and terminate this session.
+         */
+        public void forceShutdown()
+        {
+            jobs.clear();
+            activeJobs.clear();
+            differencingDone.signalAll();
+            completed.signalAll();
+        }
+
         void completed(Differencer differencer)
         {
+            if (terminated)
+                return;
+
             logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s",
                                        getName(),
                                        differencer.r1.endpoint,
@@ -724,10 +761,7 @@ public class AntiEntropyService
             String errorMsg = String.format("Endpoint %s died", remote);
             exception = new IOException(errorMsg);
             // If a node failed, we stop everything (though there could still be some activity in the background)
-            jobs.clear();
-            activeJobs.clear();
-            differencingDone.signalAll();
-            completed.signalAll();
+            forceShutdown();
         }
 
         public void onJoin(InetAddress endpoint, EndpointState epState) {}

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1197123&r1=1197122&r2=1197123&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java Thu Nov  3 14:03:01 2011
@@ -1757,6 +1757,10 @@ public class StorageService implements I
         return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new String[names.size()]));
     }
 
+    public void forceTerminateAllRepairSessions() {
+        AntiEntropyService.instance.terminateSessions();
+    }
+
     /* End of MBean interface methods */
 
     /**

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1197123&r1=1197122&r2=1197123&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Nov  3 14:03:01 2011
@@ -226,6 +226,8 @@ public interface StorageServiceMBean
      */
     public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies) throws IOException;
 
+    public void forceTerminateAllRepairSessions();
+
     /**
      * transfer this node's data to other machines and remove it from service.
      */