You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/11/16 23:27:05 UTC

[01/10] cassandra git commit: Fail repair if participant dies during sync or anticompaction

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 ebf350740 -> 84b9e7278
  refs/heads/cassandra-3.0 ac279e2f7 -> 14f36fce3
  refs/heads/cassandra-3.X 0cb3128f6 -> a1eef56cc
  refs/heads/trunk fd7857a64 -> 732af7de7


Fail repair if participant dies during sync or anticompaction

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901

This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727

Branch: refs/heads/cassandra-2.2
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
  * Fix Util.spinAssertEquals (CASSANDRA-12283)
  * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AnticompactionRequest;
 import org.apache.cassandra.utils.CassandraVersion;
 
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+                                                                               IFailureDetectionEventListener
 {
     /*
      * Version that anticompaction response is not supported up to.
      * If Cassandra version is more than this, we need to wait for anticompaction response.
      */
     private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+    private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
     private final UUID parentSession;
     private final InetAddress neighbor;
     private final Collection<Range<Token>> successfulRanges;
+    private final AtomicBoolean isFinished = new AtomicBoolean(false);
 
     public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
     {
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
             }
             else
             {
-                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
                 // immediately return after sending request
-                set(neighbor);
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                maybeSetResult(neighbor);
             }
         }
         else
         {
-            setException(new IOException(neighbor + " is down"));
+            maybeSetException(new IOException(neighbor + " is down"));
+        }
+    }
+
+    private boolean maybeSetException(Throwable t)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            setException(t);
+            return true;
         }
+        return false;
+    }
+
+    private boolean maybeSetResult(InetAddress o)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            set(o);
+            return true;
+        }
+        return false;
     }
 
     /**
      * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
      */
-    public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+    public class AnticompactionCallback implements IAsyncCallbackWithFailure
     {
         final AnticompactionTask task;
 
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void response(MessageIn msg)
         {
-            task.set(msg.from);
+            maybeSetResult(msg.from);
         }
 
         public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void onFailure(InetAddress from)
         {
-            task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+            maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+        }
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!neighbor.equals(endpoint))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+        if (maybeSetException(exception))
+        {
+            // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+            logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final Set<InetAddress> endpoints;
     private final long repairedAt;
 
-    // number of validations left to be performed
-    private final AtomicInteger validationRemaining;
-
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.range = range;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
-        this.validationRemaining = new AtomicInteger(cfnames.length);
     }
 
     public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
         task.treeReceived(tree);
-
-        // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-        // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-        // See CASSANDRA-3569
-        if (validationRemaining.decrementAndGet() == 0)
-        {
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
         sessions.put(session.getId(), session);
         // register listeners
-        gossiper.register(session);
-        failureDetector.registerFailureDetectionEventListener(session);
+        registerOnFdAndGossip(session);
 
-        // unregister listeners at completion
+        // remove session at completion
         session.addListener(new Runnable()
         {
             /**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
              */
             public void run()
             {
-                failureDetector.unregisterFailureDetectionEventListener(session);
-                gossiper.unregister(session);
                 sessions.remove(session.getId());
             }
         }, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
+    private <T extends AbstractFuture &
+               IEndpointStateChangeSubscriber &
+               IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+    {
+        gossiper.register(task);
+        failureDetector.registerFailureDetectionEventListener(task);
+
+        // unregister listeners at completion
+        task.addListener(new Runnable()
+        {
+            /**
+             * When repair finished, do clean up
+             */
+            public void run()
+            {
+                failureDetector.unregisterFailureDetectionEventListener(task);
+                gossiper.unregister(task);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+    }
+
     public synchronized void terminateSessions()
     {
         Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         for (InetAddress neighbor : neighbors)
         {
             AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+            registerOnFdAndGossip(task);
             tasks.add(task);
             task.run(); // 'run' is just sending message
         }


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14f36fce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14f36fce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14f36fce

Branch: refs/heads/cassandra-3.X
Commit: 14f36fce33da265db479ce9dc0067e1e073c48d8
Parents: ac279e2 84b9e72
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 15:24:34 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:24:34 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 00f9574,3482052..efc681d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
+  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
   * Fix Util.spinAssertEquals (CASSANDRA-12283)
   * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352,70bfaa6..5fe306d
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -88,13 -87,10 +87,10 @@@ public class RepairSession extends Abst
      private final String[] cfnames;
      public final RepairParallelism parallelismDegree;
      /** Range to repair */
 -    public final Range<Token> range;
 +    public final Collection<Range<Token>> ranges;
      public final Set<InetAddress> endpoints;
 -    private final long repairedAt;
 +    public final long repairedAt;
  
-     // number of validations left to be performed
-     private final AtomicInteger validationRemaining;
- 
      private final AtomicBoolean isFailed = new AtomicBoolean(false);
  
      // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@@ -135,10 -131,9 +131,9 @@@
          this.parallelismDegree = parallelismDegree;
          this.keyspace = keyspace;
          this.cfnames = cfnames;
 -        this.range = range;
 +        this.ranges = ranges;
          this.endpoints = endpoints;
          this.repairedAt = repairedAt;
-         this.validationRemaining = new AtomicInteger(cfnames.length);
      }
  
      public UUID getId()
@@@ -180,15 -175,7 +175,7 @@@
          String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
          logger.info("[repair #{}] {}", getId(), message);
          Tracing.traceRepair(message);
 -        task.treeReceived(tree);
 +        task.treesReceived(trees);
- 
-         // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-         // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-         // See CASSANDRA-3569
-         if (validationRemaining.decrementAndGet() == 0)
-         {
-             FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-         }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------


[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.X' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732af7de
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732af7de
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732af7de

Branch: refs/heads/trunk
Commit: 732af7de7f4b7865c00dfa0d85e4dbf4ee9900e2
Parents: fd7857a a1eef56
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 17:24:41 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 17:24:41 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/732af7de/CHANGES.txt
----------------------------------------------------------------------


[03/10] cassandra git commit: Fail repair if participant dies during sync or anticompaction

Posted by yu...@apache.org.
Fail repair if participant dies during sync or anticompaction

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901

This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727

Branch: refs/heads/cassandra-3.X
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
  * Fix Util.spinAssertEquals (CASSANDRA-12283)
  * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AnticompactionRequest;
 import org.apache.cassandra.utils.CassandraVersion;
 
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+                                                                               IFailureDetectionEventListener
 {
     /*
      * Version that anticompaction response is not supported up to.
      * If Cassandra version is more than this, we need to wait for anticompaction response.
      */
     private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+    private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
     private final UUID parentSession;
     private final InetAddress neighbor;
     private final Collection<Range<Token>> successfulRanges;
+    private final AtomicBoolean isFinished = new AtomicBoolean(false);
 
     public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
     {
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
             }
             else
             {
-                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
                 // immediately return after sending request
-                set(neighbor);
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                maybeSetResult(neighbor);
             }
         }
         else
         {
-            setException(new IOException(neighbor + " is down"));
+            maybeSetException(new IOException(neighbor + " is down"));
+        }
+    }
+
+    private boolean maybeSetException(Throwable t)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            setException(t);
+            return true;
         }
+        return false;
+    }
+
+    private boolean maybeSetResult(InetAddress o)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            set(o);
+            return true;
+        }
+        return false;
     }
 
     /**
      * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
      */
-    public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+    public class AnticompactionCallback implements IAsyncCallbackWithFailure
     {
         final AnticompactionTask task;
 
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void response(MessageIn msg)
         {
-            task.set(msg.from);
+            maybeSetResult(msg.from);
         }
 
         public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void onFailure(InetAddress from)
         {
-            task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+            maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+        }
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!neighbor.equals(endpoint))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+        if (maybeSetException(exception))
+        {
+            // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+            logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final Set<InetAddress> endpoints;
     private final long repairedAt;
 
-    // number of validations left to be performed
-    private final AtomicInteger validationRemaining;
-
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.range = range;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
-        this.validationRemaining = new AtomicInteger(cfnames.length);
     }
 
     public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
         task.treeReceived(tree);
-
-        // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-        // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-        // See CASSANDRA-3569
-        if (validationRemaining.decrementAndGet() == 0)
-        {
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
         sessions.put(session.getId(), session);
         // register listeners
-        gossiper.register(session);
-        failureDetector.registerFailureDetectionEventListener(session);
+        registerOnFdAndGossip(session);
 
-        // unregister listeners at completion
+        // remove session at completion
         session.addListener(new Runnable()
         {
             /**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
              */
             public void run()
             {
-                failureDetector.unregisterFailureDetectionEventListener(session);
-                gossiper.unregister(session);
                 sessions.remove(session.getId());
             }
         }, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
+    private <T extends AbstractFuture &
+               IEndpointStateChangeSubscriber &
+               IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+    {
+        gossiper.register(task);
+        failureDetector.registerFailureDetectionEventListener(task);
+
+        // unregister listeners at completion
+        task.addListener(new Runnable()
+        {
+            /**
+             * When repair finished, do clean up
+             */
+            public void run()
+            {
+                failureDetector.unregisterFailureDetectionEventListener(task);
+                gossiper.unregister(task);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+    }
+
     public synchronized void terminateSessions()
     {
         Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         for (InetAddress neighbor : neighbors)
         {
             AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+            registerOnFdAndGossip(task);
             tasks.add(task);
             task.run(); // 'run' is just sending message
         }


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.X

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1eef56c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1eef56c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1eef56c

Branch: refs/heads/trunk
Commit: a1eef56cc021772619eeb4a048cb785078547515
Parents: 0cb3128 14f36fc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 17:24:29 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 17:24:29 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16a0a12,c5e066d..bc09b38
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@@ -28,8 -33,12 +33,13 @@@ import org.apache.cassandra.config.Data
  import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.EndpointState;
  import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+ import org.apache.cassandra.gms.IFailureDetectionEventListener;
+ import org.apache.cassandra.gms.VersionedValue;
  import org.apache.cassandra.net.IAsyncCallbackWithFailure;
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessagingService;
@@@ -100,9 -132,42 +133,42 @@@ public class AnticompactionTask extend
              return false;
          }
  
 -        public void onFailure(InetAddress from)
 +        public void onFailure(InetAddress from, RequestFailureReason failureReason)
          {
-             task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+             maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+         }
+     }
+ 
+     public void onJoin(InetAddress endpoint, EndpointState epState) {}
+     public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+     public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+     public void onAlive(InetAddress endpoint, EndpointState state) {}
+     public void onDead(InetAddress endpoint, EndpointState state) {}
+ 
+     public void onRemove(InetAddress endpoint)
+     {
+         convict(endpoint, Double.MAX_VALUE);
+     }
+ 
+     public void onRestart(InetAddress endpoint, EndpointState epState)
+     {
+         convict(endpoint, Double.MAX_VALUE);
+     }
+ 
+     public void convict(InetAddress endpoint, double phi)
+     {
+         if (!neighbor.equals(endpoint))
+             return;
+ 
+         // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+         if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+             return;
+ 
+         Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+         if (maybeSetException(exception))
+         {
+             // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+             logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 528115a,5fe306d..00340a1
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -141,8 -134,6 +137,7 @@@ public class RepairSession extends Abst
          this.ranges = ranges;
          this.endpoints = endpoints;
          this.repairedAt = repairedAt;
-         this.validationRemaining = new AtomicInteger(cfnames.length);
 +        this.pullRepair = pullRepair;
      }
  
      public UUID getId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index b69c24a,6f7b1a4..aa8ebc8
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -158,11 -156,9 +158,9 @@@ public class ActiveRepairService implem
               */
              public void run()
              {
-                 failureDetector.unregisterFailureDetectionEventListener(session);
-                 gossiper.unregister(session);
                  sessions.remove(session.getId());
              }
 -        }, MoreExecutors.sameThreadExecutor());
 +        }, MoreExecutors.directExecutor());
          session.start(executor);
          return session;
      }


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.X

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1eef56c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1eef56c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1eef56c

Branch: refs/heads/cassandra-3.X
Commit: a1eef56cc021772619eeb4a048cb785078547515
Parents: 0cb3128 14f36fc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 17:24:29 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 17:24:29 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16a0a12,c5e066d..bc09b38
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@@ -28,8 -33,12 +33,13 @@@ import org.apache.cassandra.config.Data
  import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.EndpointState;
  import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+ import org.apache.cassandra.gms.IFailureDetectionEventListener;
+ import org.apache.cassandra.gms.VersionedValue;
  import org.apache.cassandra.net.IAsyncCallbackWithFailure;
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessagingService;
@@@ -100,9 -132,42 +133,42 @@@ public class AnticompactionTask extend
              return false;
          }
  
 -        public void onFailure(InetAddress from)
 +        public void onFailure(InetAddress from, RequestFailureReason failureReason)
          {
-             task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+             maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+         }
+     }
+ 
+     public void onJoin(InetAddress endpoint, EndpointState epState) {}
+     public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+     public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+     public void onAlive(InetAddress endpoint, EndpointState state) {}
+     public void onDead(InetAddress endpoint, EndpointState state) {}
+ 
+     public void onRemove(InetAddress endpoint)
+     {
+         convict(endpoint, Double.MAX_VALUE);
+     }
+ 
+     public void onRestart(InetAddress endpoint, EndpointState epState)
+     {
+         convict(endpoint, Double.MAX_VALUE);
+     }
+ 
+     public void convict(InetAddress endpoint, double phi)
+     {
+         if (!neighbor.equals(endpoint))
+             return;
+ 
+         // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+         if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+             return;
+ 
+         Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+         if (maybeSetException(exception))
+         {
+             // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+             logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 528115a,5fe306d..00340a1
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -141,8 -134,6 +137,7 @@@ public class RepairSession extends Abst
          this.ranges = ranges;
          this.endpoints = endpoints;
          this.repairedAt = repairedAt;
-         this.validationRemaining = new AtomicInteger(cfnames.length);
 +        this.pullRepair = pullRepair;
      }
  
      public UUID getId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index b69c24a,6f7b1a4..aa8ebc8
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -158,11 -156,9 +158,9 @@@ public class ActiveRepairService implem
               */
              public void run()
              {
-                 failureDetector.unregisterFailureDetectionEventListener(session);
-                 gossiper.unregister(session);
                  sessions.remove(session.getId());
              }
 -        }, MoreExecutors.sameThreadExecutor());
 +        }, MoreExecutors.directExecutor());
          session.start(executor);
          return session;
      }


[02/10] cassandra git commit: Fail repair if participant dies during sync or anticompaction

Posted by yu...@apache.org.
Fail repair if participant dies during sync or anticompaction

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901

This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727

Branch: refs/heads/cassandra-3.0
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
  * Fix Util.spinAssertEquals (CASSANDRA-12283)
  * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AnticompactionRequest;
 import org.apache.cassandra.utils.CassandraVersion;
 
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+                                                                               IFailureDetectionEventListener
 {
     /*
      * Version that anticompaction response is not supported up to.
      * If Cassandra version is more than this, we need to wait for anticompaction response.
      */
     private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+    private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
     private final UUID parentSession;
     private final InetAddress neighbor;
     private final Collection<Range<Token>> successfulRanges;
+    private final AtomicBoolean isFinished = new AtomicBoolean(false);
 
     public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
     {
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
             }
             else
             {
-                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
                 // immediately return after sending request
-                set(neighbor);
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                maybeSetResult(neighbor);
             }
         }
         else
         {
-            setException(new IOException(neighbor + " is down"));
+            maybeSetException(new IOException(neighbor + " is down"));
+        }
+    }
+
+    private boolean maybeSetException(Throwable t)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            setException(t);
+            return true;
         }
+        return false;
+    }
+
+    private boolean maybeSetResult(InetAddress o)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            set(o);
+            return true;
+        }
+        return false;
     }
 
     /**
      * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
      */
-    public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+    public class AnticompactionCallback implements IAsyncCallbackWithFailure
     {
         final AnticompactionTask task;
 
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void response(MessageIn msg)
         {
-            task.set(msg.from);
+            maybeSetResult(msg.from);
         }
 
         public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void onFailure(InetAddress from)
         {
-            task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+            maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+        }
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!neighbor.equals(endpoint))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+        if (maybeSetException(exception))
+        {
+            // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+            logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final Set<InetAddress> endpoints;
     private final long repairedAt;
 
-    // number of validations left to be performed
-    private final AtomicInteger validationRemaining;
-
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.range = range;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
-        this.validationRemaining = new AtomicInteger(cfnames.length);
     }
 
     public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
         task.treeReceived(tree);
-
-        // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-        // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-        // See CASSANDRA-3569
-        if (validationRemaining.decrementAndGet() == 0)
-        {
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
         sessions.put(session.getId(), session);
         // register listeners
-        gossiper.register(session);
-        failureDetector.registerFailureDetectionEventListener(session);
+        registerOnFdAndGossip(session);
 
-        // unregister listeners at completion
+        // remove session at completion
         session.addListener(new Runnable()
         {
             /**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
              */
             public void run()
             {
-                failureDetector.unregisterFailureDetectionEventListener(session);
-                gossiper.unregister(session);
                 sessions.remove(session.getId());
             }
         }, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
+    private <T extends AbstractFuture &
+               IEndpointStateChangeSubscriber &
+               IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+    {
+        gossiper.register(task);
+        failureDetector.registerFailureDetectionEventListener(task);
+
+        // unregister listeners at completion
+        task.addListener(new Runnable()
+        {
+            /**
+             * When repair finished, do clean up
+             */
+            public void run()
+            {
+                failureDetector.unregisterFailureDetectionEventListener(task);
+                gossiper.unregister(task);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+    }
+
     public synchronized void terminateSessions()
     {
         Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         for (InetAddress neighbor : neighbors)
         {
             AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+            registerOnFdAndGossip(task);
             tasks.add(task);
             task.run(); // 'run' is just sending message
         }


[04/10] cassandra git commit: Fail repair if participant dies during sync or anticompaction

Posted by yu...@apache.org.
Fail repair if participant dies during sync or anticompaction

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901

This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727

Branch: refs/heads/trunk
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
  * Fix Util.spinAssertEquals (CASSANDRA-12283)
  * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AnticompactionRequest;
 import org.apache.cassandra.utils.CassandraVersion;
 
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+                                                                               IFailureDetectionEventListener
 {
     /*
      * Version that anticompaction response is not supported up to.
      * If Cassandra version is more than this, we need to wait for anticompaction response.
      */
     private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+    private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
     private final UUID parentSession;
     private final InetAddress neighbor;
     private final Collection<Range<Token>> successfulRanges;
+    private final AtomicBoolean isFinished = new AtomicBoolean(false);
 
     public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
     {
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
             }
             else
             {
-                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
                 // immediately return after sending request
-                set(neighbor);
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                maybeSetResult(neighbor);
             }
         }
         else
         {
-            setException(new IOException(neighbor + " is down"));
+            maybeSetException(new IOException(neighbor + " is down"));
+        }
+    }
+
+    private boolean maybeSetException(Throwable t)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            setException(t);
+            return true;
         }
+        return false;
+    }
+
+    private boolean maybeSetResult(InetAddress o)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            set(o);
+            return true;
+        }
+        return false;
     }
 
     /**
      * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
      */
-    public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+    public class AnticompactionCallback implements IAsyncCallbackWithFailure
     {
         final AnticompactionTask task;
 
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void response(MessageIn msg)
         {
-            task.set(msg.from);
+            maybeSetResult(msg.from);
         }
 
         public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void onFailure(InetAddress from)
         {
-            task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+            maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+        }
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!neighbor.equals(endpoint))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+        if (maybeSetException(exception))
+        {
+            // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+            logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final Set<InetAddress> endpoints;
     private final long repairedAt;
 
-    // number of validations left to be performed
-    private final AtomicInteger validationRemaining;
-
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.range = range;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
-        this.validationRemaining = new AtomicInteger(cfnames.length);
     }
 
     public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
         task.treeReceived(tree);
-
-        // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-        // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-        // See CASSANDRA-3569
-        if (validationRemaining.decrementAndGet() == 0)
-        {
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
         sessions.put(session.getId(), session);
         // register listeners
-        gossiper.register(session);
-        failureDetector.registerFailureDetectionEventListener(session);
+        registerOnFdAndGossip(session);
 
-        // unregister listeners at completion
+        // remove session at completion
         session.addListener(new Runnable()
         {
             /**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
              */
             public void run()
             {
-                failureDetector.unregisterFailureDetectionEventListener(session);
-                gossiper.unregister(session);
                 sessions.remove(session.getId());
             }
         }, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
+    private <T extends AbstractFuture &
+               IEndpointStateChangeSubscriber &
+               IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+    {
+        gossiper.register(task);
+        failureDetector.registerFailureDetectionEventListener(task);
+
+        // unregister listeners at completion
+        task.addListener(new Runnable()
+        {
+            /**
+             * When repair finished, do clean up
+             */
+            public void run()
+            {
+                failureDetector.unregisterFailureDetectionEventListener(task);
+                gossiper.unregister(task);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+    }
+
     public synchronized void terminateSessions()
     {
         Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         for (InetAddress neighbor : neighbors)
         {
             AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+            registerOnFdAndGossip(task);
             tasks.add(task);
             task.run(); // 'run' is just sending message
         }


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14f36fce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14f36fce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14f36fce

Branch: refs/heads/trunk
Commit: 14f36fce33da265db479ce9dc0067e1e073c48d8
Parents: ac279e2 84b9e72
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 15:24:34 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:24:34 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 00f9574,3482052..efc681d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
+  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
   * Fix Util.spinAssertEquals (CASSANDRA-12283)
   * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352,70bfaa6..5fe306d
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -88,13 -87,10 +87,10 @@@ public class RepairSession extends Abst
      private final String[] cfnames;
      public final RepairParallelism parallelismDegree;
      /** Range to repair */
 -    public final Range<Token> range;
 +    public final Collection<Range<Token>> ranges;
      public final Set<InetAddress> endpoints;
 -    private final long repairedAt;
 +    public final long repairedAt;
  
-     // number of validations left to be performed
-     private final AtomicInteger validationRemaining;
- 
      private final AtomicBoolean isFailed = new AtomicBoolean(false);
  
      // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@@ -135,10 -131,9 +131,9 @@@
          this.parallelismDegree = parallelismDegree;
          this.keyspace = keyspace;
          this.cfnames = cfnames;
 -        this.range = range;
 +        this.ranges = ranges;
          this.endpoints = endpoints;
          this.repairedAt = repairedAt;
-         this.validationRemaining = new AtomicInteger(cfnames.length);
      }
  
      public UUID getId()
@@@ -180,15 -175,7 +175,7 @@@
          String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
          logger.info("[repair #{}] {}", getId(), message);
          Tracing.traceRepair(message);
 -        task.treeReceived(tree);
 +        task.treesReceived(trees);
- 
-         // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-         // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-         // See CASSANDRA-3569
-         if (validationRemaining.decrementAndGet() == 0)
-         {
-             FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-         }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14f36fce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14f36fce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14f36fce

Branch: refs/heads/cassandra-3.0
Commit: 14f36fce33da265db479ce9dc0067e1e073c48d8
Parents: ac279e2 84b9e72
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 15:24:34 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:24:34 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 00f9574,3482052..efc681d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
+  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
   * Fix Util.spinAssertEquals (CASSANDRA-12283)
   * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352,70bfaa6..5fe306d
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -88,13 -87,10 +87,10 @@@ public class RepairSession extends Abst
      private final String[] cfnames;
      public final RepairParallelism parallelismDegree;
      /** Range to repair */
 -    public final Range<Token> range;
 +    public final Collection<Range<Token>> ranges;
      public final Set<InetAddress> endpoints;
 -    private final long repairedAt;
 +    public final long repairedAt;
  
-     // number of validations left to be performed
-     private final AtomicInteger validationRemaining;
- 
      private final AtomicBoolean isFailed = new AtomicBoolean(false);
  
      // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@@ -135,10 -131,9 +131,9 @@@
          this.parallelismDegree = parallelismDegree;
          this.keyspace = keyspace;
          this.cfnames = cfnames;
 -        this.range = range;
 +        this.ranges = ranges;
          this.endpoints = endpoints;
          this.repairedAt = repairedAt;
-         this.validationRemaining = new AtomicInteger(cfnames.length);
      }
  
      public UUID getId()
@@@ -180,15 -175,7 +175,7 @@@
          String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
          logger.info("[repair #{}] {}", getId(), message);
          Tracing.traceRepair(message);
 -        task.treeReceived(tree);
 +        task.treesReceived(trees);
- 
-         // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-         // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-         // See CASSANDRA-3569
-         if (validationRemaining.decrementAndGet() == 0)
-         {
-             FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-         }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------