You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/11/16 23:27:05 UTC
[01/10] cassandra git commit: Fail repair if participant dies during
sync or anticompaction
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 ebf350740 -> 84b9e7278
refs/heads/cassandra-3.0 ac279e2f7 -> 14f36fce3
refs/heads/cassandra-3.X 0cb3128f6 -> a1eef56cc
refs/heads/trunk fd7857a64 -> 732af7de7
Fail repair if participant dies during sync or anticompaction
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901
This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727
Branch: refs/heads/cassandra-2.2
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
import org.apache.cassandra.utils.CassandraVersion;
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener
{
/*
* Version that anticompaction response is not supported up to.
* If Cassandra version is more than this, we need to wait for anticompaction response.
*/
private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+ private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
private final UUID parentSession;
private final InetAddress neighbor;
private final Collection<Range<Token>> successfulRanges;
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
{
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
// immediately return after sending request
- set(neighbor);
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ maybeSetResult(neighbor);
}
}
else
{
- setException(new IOException(neighbor + " is down"));
+ maybeSetException(new IOException(neighbor + " is down"));
+ }
+ }
+
+ private boolean maybeSetException(Throwable t)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ setException(t);
+ return true;
}
+ return false;
+ }
+
+ private boolean maybeSetResult(InetAddress o)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ set(o);
+ return true;
+ }
+ return false;
}
/**
* Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
*/
- public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+ public class AnticompactionCallback implements IAsyncCallbackWithFailure
{
final AnticompactionTask task;
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void response(MessageIn msg)
{
- task.set(msg.from);
+ maybeSetResult(msg.from);
}
public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void onFailure(InetAddress from)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final Set<InetAddress> endpoints;
private final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.range = range;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
task.treeReceived(tree);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
sessions.put(session.getId(), session);
// register listeners
- gossiper.register(session);
- failureDetector.registerFailureDetectionEventListener(session);
+ registerOnFdAndGossip(session);
- // unregister listeners at completion
+ // remove session at completion
session.addListener(new Runnable()
{
/**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
}, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return session;
}
+ private <T extends AbstractFuture &
+ IEndpointStateChangeSubscriber &
+ IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+ {
+ gossiper.register(task);
+ failureDetector.registerFailureDetectionEventListener(task);
+
+ // unregister listeners at completion
+ task.addListener(new Runnable()
+ {
+ /**
+ * When repair finished, do clean up
+ */
+ public void run()
+ {
+ failureDetector.unregisterFailureDetectionEventListener(task);
+ gossiper.unregister(task);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+
public synchronized void terminateSessions()
{
Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
for (InetAddress neighbor : neighbors)
{
AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+ registerOnFdAndGossip(task);
tasks.add(task);
task.run(); // 'run' is just sending message
}
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14f36fce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14f36fce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14f36fce
Branch: refs/heads/cassandra-3.X
Commit: 14f36fce33da265db479ce9dc0067e1e073c48d8
Parents: ac279e2 84b9e72
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 15:24:34 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:24:34 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 00f9574,3482052..efc681d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
-2.2.9
+3.0.11
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+
+3.0.10
+ * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
+ * Fix partition count log during compaction (CASSANDRA-12184)
+ * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
+ * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
+ * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
+ * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
+ * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
+ * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
+ * Include SSTable filename in compacting large row message (CASSANDRA-12384)
+ * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
+ * Fix ViewTest.testCompaction (CASSANDRA-12789)
+ * Improve avg aggregate functions (CASSANDRA-12417)
+ * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
+ * nodetool stopdaemon errors out (CASSANDRA-12646)
+ * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
+ * mx4j does not work in 3.0.8 (CASSANDRA-12274)
+ * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
+ * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
+ * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
+ * Fix exceptions with new vnode allocation (CASSANDRA-12715)
+ * Unify drain and shutdown processes (CASSANDRA-12509)
+ * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
+ * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
+Merged from 2.2:
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352,70bfaa6..5fe306d
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -88,13 -87,10 +87,10 @@@ public class RepairSession extends Abst
private final String[] cfnames;
public final RepairParallelism parallelismDegree;
/** Range to repair */
- public final Range<Token> range;
+ public final Collection<Range<Token>> ranges;
public final Set<InetAddress> endpoints;
- private final long repairedAt;
+ public final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@@ -135,10 -131,9 +131,9 @@@
this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
- this.range = range;
+ this.ranges = ranges;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@@ -180,15 -175,7 +175,7 @@@
String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
- task.treeReceived(tree);
+ task.treesReceived(trees);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.X' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732af7de
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732af7de
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732af7de
Branch: refs/heads/trunk
Commit: 732af7de7f4b7865c00dfa0d85e4dbf4ee9900e2
Parents: fd7857a a1eef56
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 17:24:41 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 17:24:41 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732af7de/CHANGES.txt
----------------------------------------------------------------------
[03/10] cassandra git commit: Fail repair if participant dies during
sync or anticompaction
Posted by yu...@apache.org.
Fail repair if participant dies during sync or anticompaction
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901
This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727
Branch: refs/heads/cassandra-3.X
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
import org.apache.cassandra.utils.CassandraVersion;
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener
{
/*
* Version that anticompaction response is not supported up to.
* If Cassandra version is more than this, we need to wait for anticompaction response.
*/
private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+ private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
private final UUID parentSession;
private final InetAddress neighbor;
private final Collection<Range<Token>> successfulRanges;
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
{
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
// immediately return after sending request
- set(neighbor);
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ maybeSetResult(neighbor);
}
}
else
{
- setException(new IOException(neighbor + " is down"));
+ maybeSetException(new IOException(neighbor + " is down"));
+ }
+ }
+
+ private boolean maybeSetException(Throwable t)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ setException(t);
+ return true;
}
+ return false;
+ }
+
+ private boolean maybeSetResult(InetAddress o)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ set(o);
+ return true;
+ }
+ return false;
}
/**
* Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
*/
- public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+ public class AnticompactionCallback implements IAsyncCallbackWithFailure
{
final AnticompactionTask task;
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void response(MessageIn msg)
{
- task.set(msg.from);
+ maybeSetResult(msg.from);
}
public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void onFailure(InetAddress from)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final Set<InetAddress> endpoints;
private final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.range = range;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
task.treeReceived(tree);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
sessions.put(session.getId(), session);
// register listeners
- gossiper.register(session);
- failureDetector.registerFailureDetectionEventListener(session);
+ registerOnFdAndGossip(session);
- // unregister listeners at completion
+ // remove session at completion
session.addListener(new Runnable()
{
/**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
}, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return session;
}
+ private <T extends AbstractFuture &
+ IEndpointStateChangeSubscriber &
+ IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+ {
+ gossiper.register(task);
+ failureDetector.registerFailureDetectionEventListener(task);
+
+ // unregister listeners at completion
+ task.addListener(new Runnable()
+ {
+ /**
+ * When repair finished, do clean up
+ */
+ public void run()
+ {
+ failureDetector.unregisterFailureDetectionEventListener(task);
+ gossiper.unregister(task);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+
public synchronized void terminateSessions()
{
Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
for (InetAddress neighbor : neighbors)
{
AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+ registerOnFdAndGossip(task);
tasks.add(task);
task.run(); // 'run' is just sending message
}
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.X
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1eef56c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1eef56c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1eef56c
Branch: refs/heads/trunk
Commit: a1eef56cc021772619eeb4a048cb785078547515
Parents: 0cb3128 14f36fc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 17:24:29 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 17:24:29 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16a0a12,c5e066d..bc09b38
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@@ -28,8 -33,12 +33,13 @@@ import org.apache.cassandra.config.Data
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+ import org.apache.cassandra.gms.IFailureDetectionEventListener;
+ import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@@ -100,9 -132,42 +133,42 @@@ public class AnticompactionTask extend
return false;
}
- public void onFailure(InetAddress from)
+ public void onFailure(InetAddress from, RequestFailureReason failureReason)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 528115a,5fe306d..00340a1
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -141,8 -134,6 +137,7 @@@ public class RepairSession extends Abst
this.ranges = ranges;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
+ this.pullRepair = pullRepair;
}
public UUID getId()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index b69c24a,6f7b1a4..aa8ebc8
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -158,11 -156,9 +158,9 @@@ public class ActiveRepairService implem
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
- }, MoreExecutors.sameThreadExecutor());
+ }, MoreExecutors.directExecutor());
session.start(executor);
return session;
}
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.X
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1eef56c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1eef56c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1eef56c
Branch: refs/heads/cassandra-3.X
Commit: a1eef56cc021772619eeb4a048cb785078547515
Parents: 0cb3128 14f36fc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 17:24:29 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 17:24:29 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16a0a12,c5e066d..bc09b38
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@@ -28,8 -33,12 +33,13 @@@ import org.apache.cassandra.config.Data
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+ import org.apache.cassandra.gms.IFailureDetectionEventListener;
+ import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@@ -100,9 -132,42 +133,42 @@@ public class AnticompactionTask extend
return false;
}
- public void onFailure(InetAddress from)
+ public void onFailure(InetAddress from, RequestFailureReason failureReason)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 528115a,5fe306d..00340a1
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -141,8 -134,6 +137,7 @@@ public class RepairSession extends Abst
this.ranges = ranges;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
+ this.pullRepair = pullRepair;
}
public UUID getId()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index b69c24a,6f7b1a4..aa8ebc8
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -158,11 -156,9 +158,9 @@@ public class ActiveRepairService implem
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
- }, MoreExecutors.sameThreadExecutor());
+ }, MoreExecutors.directExecutor());
session.start(executor);
return session;
}
[02/10] cassandra git commit: Fail repair if participant dies during
sync or anticompaction
Posted by yu...@apache.org.
Fail repair if participant dies during sync or anticompaction
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901
This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727
Branch: refs/heads/cassandra-3.0
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
import org.apache.cassandra.utils.CassandraVersion;
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener
{
/*
* Version that anticompaction response is not supported up to.
* If Cassandra version is more than this, we need to wait for anticompaction response.
*/
private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+ private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
private final UUID parentSession;
private final InetAddress neighbor;
private final Collection<Range<Token>> successfulRanges;
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
{
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
// immediately return after sending request
- set(neighbor);
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ maybeSetResult(neighbor);
}
}
else
{
- setException(new IOException(neighbor + " is down"));
+ maybeSetException(new IOException(neighbor + " is down"));
+ }
+ }
+
+ private boolean maybeSetException(Throwable t)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ setException(t);
+ return true;
}
+ return false;
+ }
+
+ private boolean maybeSetResult(InetAddress o)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ set(o);
+ return true;
+ }
+ return false;
}
/**
* Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
*/
- public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+ public class AnticompactionCallback implements IAsyncCallbackWithFailure
{
final AnticompactionTask task;
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void response(MessageIn msg)
{
- task.set(msg.from);
+ maybeSetResult(msg.from);
}
public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void onFailure(InetAddress from)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final Set<InetAddress> endpoints;
private final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.range = range;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
task.treeReceived(tree);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
sessions.put(session.getId(), session);
// register listeners
- gossiper.register(session);
- failureDetector.registerFailureDetectionEventListener(session);
+ registerOnFdAndGossip(session);
- // unregister listeners at completion
+ // remove session at completion
session.addListener(new Runnable()
{
/**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
}, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return session;
}
+ private <T extends AbstractFuture &
+ IEndpointStateChangeSubscriber &
+ IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+ {
+ gossiper.register(task);
+ failureDetector.registerFailureDetectionEventListener(task);
+
+ // unregister listeners at completion
+ task.addListener(new Runnable()
+ {
+ /**
+ * When repair finished, do clean up
+ */
+ public void run()
+ {
+ failureDetector.unregisterFailureDetectionEventListener(task);
+ gossiper.unregister(task);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+
public synchronized void terminateSessions()
{
Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
for (InetAddress neighbor : neighbors)
{
AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+ registerOnFdAndGossip(task);
tasks.add(task);
task.run(); // 'run' is just sending message
}
[04/10] cassandra git commit: Fail repair if participant dies during
sync or anticompaction
Posted by yu...@apache.org.
Fail repair if participant dies during sync or anticompaction
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901
This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727
Branch: refs/heads/trunk
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
import org.apache.cassandra.utils.CassandraVersion;
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener
{
/*
* Version that anticompaction response is not supported up to.
* If Cassandra version is more than this, we need to wait for anticompaction response.
*/
private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+ private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
private final UUID parentSession;
private final InetAddress neighbor;
private final Collection<Range<Token>> successfulRanges;
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
{
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
// immediately return after sending request
- set(neighbor);
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ maybeSetResult(neighbor);
}
}
else
{
- setException(new IOException(neighbor + " is down"));
+ maybeSetException(new IOException(neighbor + " is down"));
+ }
+ }
+
+ private boolean maybeSetException(Throwable t)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ setException(t);
+ return true;
}
+ return false;
+ }
+
+ private boolean maybeSetResult(InetAddress o)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ set(o);
+ return true;
+ }
+ return false;
}
/**
* Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
*/
- public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+ public class AnticompactionCallback implements IAsyncCallbackWithFailure
{
final AnticompactionTask task;
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void response(MessageIn msg)
{
- task.set(msg.from);
+ maybeSetResult(msg.from);
}
public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void onFailure(InetAddress from)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final Set<InetAddress> endpoints;
private final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.range = range;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
task.treeReceived(tree);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
sessions.put(session.getId(), session);
// register listeners
- gossiper.register(session);
- failureDetector.registerFailureDetectionEventListener(session);
+ registerOnFdAndGossip(session);
- // unregister listeners at completion
+ // remove session at completion
session.addListener(new Runnable()
{
/**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
}, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return session;
}
+ private <T extends AbstractFuture &
+ IEndpointStateChangeSubscriber &
+ IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+ {
+ gossiper.register(task);
+ failureDetector.registerFailureDetectionEventListener(task);
+
+ // unregister listeners at completion
+ task.addListener(new Runnable()
+ {
+ /**
+ * When repair finished, do clean up
+ */
+ public void run()
+ {
+ failureDetector.unregisterFailureDetectionEventListener(task);
+ gossiper.unregister(task);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+
public synchronized void terminateSessions()
{
Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
for (InetAddress neighbor : neighbors)
{
AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+ registerOnFdAndGossip(task);
tasks.add(task);
task.run(); // 'run' is just sending message
}
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14f36fce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14f36fce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14f36fce
Branch: refs/heads/trunk
Commit: 14f36fce33da265db479ce9dc0067e1e073c48d8
Parents: ac279e2 84b9e72
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 15:24:34 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:24:34 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 00f9574,3482052..efc681d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
-2.2.9
+3.0.11
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+
+3.0.10
+ * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
+ * Fix partition count log during compaction (CASSANDRA-12184)
+ * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
+ * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
+ * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
+ * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
+ * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
+ * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
+ * Include SSTable filename in compacting large row message (CASSANDRA-12384)
+ * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
+ * Fix ViewTest.testCompaction (CASSANDRA-12789)
+ * Improve avg aggregate functions (CASSANDRA-12417)
+ * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
+ * nodetool stopdaemon errors out (CASSANDRA-12646)
+ * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
+ * mx4j does not work in 3.0.8 (CASSANDRA-12274)
+ * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
+ * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
+ * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
+ * Fix exceptions with new vnode allocation (CASSANDRA-12715)
+ * Unify drain and shutdown processes (CASSANDRA-12509)
+ * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
+ * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
+Merged from 2.2:
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352,70bfaa6..5fe306d
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -88,13 -87,10 +87,10 @@@ public class RepairSession extends Abst
private final String[] cfnames;
public final RepairParallelism parallelismDegree;
/** Range to repair */
- public final Range<Token> range;
+ public final Collection<Range<Token>> ranges;
public final Set<InetAddress> endpoints;
- private final long repairedAt;
+ public final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@@ -135,10 -131,9 +131,9 @@@
this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
- this.range = range;
+ this.ranges = ranges;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@@ -180,15 -175,7 +175,7 @@@
String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
- task.treeReceived(tree);
+ task.treesReceived(trees);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14f36fce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14f36fce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14f36fce
Branch: refs/heads/cassandra-3.0
Commit: 14f36fce33da265db479ce9dc0067e1e073c48d8
Parents: ac279e2 84b9e72
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 16 15:24:34 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:24:34 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 00f9574,3482052..efc681d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
-2.2.9
+3.0.11
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+
+3.0.10
+ * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
+ * Fix partition count log during compaction (CASSANDRA-12184)
+ * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
+ * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
+ * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
+ * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
+ * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
+ * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
+ * Include SSTable filename in compacting large row message (CASSANDRA-12384)
+ * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
+ * Fix ViewTest.testCompaction (CASSANDRA-12789)
+ * Improve avg aggregate functions (CASSANDRA-12417)
+ * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
+ * nodetool stopdaemon errors out (CASSANDRA-12646)
+ * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
+ * mx4j does not work in 3.0.8 (CASSANDRA-12274)
+ * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
+ * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
+ * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
+ * Fix exceptions with new vnode allocation (CASSANDRA-12715)
+ * Unify drain and shutdown processes (CASSANDRA-12509)
+ * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
+ * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
+Merged from 2.2:
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352,70bfaa6..5fe306d
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -88,13 -87,10 +87,10 @@@ public class RepairSession extends Abst
private final String[] cfnames;
public final RepairParallelism parallelismDegree;
/** Range to repair */
- public final Range<Token> range;
+ public final Collection<Range<Token>> ranges;
public final Set<InetAddress> endpoints;
- private final long repairedAt;
+ public final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@@ -135,10 -131,9 +131,9 @@@
this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
- this.range = range;
+ this.ranges = ranges;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@@ -180,15 -175,7 +175,7 @@@
String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
- task.treeReceived(tree);
+ task.treesReceived(trees);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14f36fce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------