You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/10/09 17:37:03 UTC

[2/6] git commit: Handle JMX notification failure for repair

Handle JMX notification failure for repair

patch by yukim; reviewed by pcmanus for CASSANDRA-6097


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe19b8a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe19b8a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe19b8a

Branch: refs/heads/cassandra-2.0
Commit: fbe19b8ad7cd01f810109731b64acaaa4edf8db2
Parents: 1bba280
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Oct 3 11:26:10 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Oct 9 10:34:12 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/tools/NodeProbe.java   | 57 ++++++++++++++------
 2 files changed, 42 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe19b8a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59cc0f1..f0ee993 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Update sstablesPerReadHistogram to use biased sampling (CASSANDRA-6164)
  * Log UnknownColumnfamilyException when closing socket (CASSANDRA-5725)
  * Properly error out on CREATE INDEX for counters table (CASSANDRA-6160)
+ * Handle JMX notification failure for repair (CASSANDRA-6097)
 
 
 1.2.10

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe19b8a/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 7829b60..1557a6a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import javax.management.*;
+import javax.management.remote.JMXConnectionNotification;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
@@ -217,6 +218,7 @@ public class NodeProbe
         RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
         try
         {
+            jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
             if (!runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange))
                 failed = true;
@@ -229,9 +231,10 @@ public class NodeProbe
         {
             try
             {
-               ssProxy.removeNotificationListener(runner);
+                ssProxy.removeNotificationListener(runner);
+                jmxc.removeConnectionNotificationListener(runner);
             }
-            catch (ListenerNotFoundException ignored) {}
+            catch (Throwable ignored) {}
         }
     }
 
@@ -240,6 +243,7 @@ public class NodeProbe
         RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
         try
         {
+            jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
             if (!runner.repairRangeAndWait(ssProxy,  isSequential, isLocal, startToken, endToken))
                 failed = true;
@@ -253,8 +257,9 @@ public class NodeProbe
             try
             {
                 ssProxy.removeNotificationListener(runner);
+                jmxc.removeConnectionNotificationListener(runner);
             }
-            catch (ListenerNotFoundException ignored) {}
+            catch (Throwable ignored) {}
         }
     }
 
@@ -985,7 +990,8 @@ class RepairRunner implements NotificationListener
     private final String keyspace;
     private final String[] columnFamilies;
     private int cmd;
-    private boolean success = true;
+    private volatile boolean success = true;
+    private volatile Exception error = null;
 
     RepairRunner(PrintStream out, String keyspace, String... columnFamilies)
     {
@@ -994,24 +1000,22 @@ class RepairRunner implements NotificationListener
         this.columnFamilies = columnFamilies;
     }
 
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws InterruptedException
+    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws Exception
     {
         cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies);
-        if (cmd > 0)
-        {
-            condition.await();
-        }
-        else
-        {
-            String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
-            out.println(message);
-        }
+        waitForRepair();
         return success;
     }
 
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws InterruptedException
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws Exception
     {
         cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies);
+        waitForRepair();
+        return success;
+    }
+
+    private void waitForRepair() throws Exception
+    {
         if (cmd > 0)
         {
             condition.await();
@@ -1021,7 +1025,10 @@ class RepairRunner implements NotificationListener
             String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
             out.println(message);
         }
-        return success;
+        if (error != null)
+        {
+            throw error;
+        }
     }
 
     public void handleNotification(Notification notification, Object handback)
@@ -1041,5 +1048,23 @@ class RepairRunner implements NotificationListener
                     condition.signalAll();
             }
         }
+        else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType()))
+        {
+            String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s",
+                                           format.format(notification.getTimeStamp()),
+                                           keyspace);
+            out.println(message);
+            success = false;
+            condition.signalAll();
+        }
+        else if (JMXConnectionNotification.FAILED.equals(notification.getType())
+                 || JMXConnectionNotification.CLOSED.equals(notification.getType()))
+        {
+            String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s"
+                                           + "(Subsequent keyspaces are not going to be repaired).",
+                                           keyspace);
+            error = new IOException(message);
+            condition.signalAll();
+        }
     }
 }