You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/15 10:30:50 UTC
[1/2] cassandra git commit: Fix comparison contract violation in the
dynamic snitch sorting
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 052222615 -> c33ebcd2f
Fix comparison contract violation in the dynamic snitch sorting
patch by slebresne; reviewed by benedict for CASSANDRA-9519
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d44186e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d44186e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d44186e
Branch: refs/heads/cassandra-2.2
Commit: 9d44186ee3d95ae5e75ce5fa88961dadf5a03016
Parents: 1eccced
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jul 9 13:28:38 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 15 10:10:45 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../locator/DynamicEndpointSnitch.java | 34 ++++++++--
.../locator/DynamicEndpointSnitchTest.java | 69 +++++++++++++++++++-
3 files changed, 95 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d44186e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f4fdf2..1e21c8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.9
+ * Complete CASSANDRA-8448 fix (CASSANDRA-9519)
* Handle corrupt files on startup (CASSANDRA-9686)
* Fix clientutil jar and tests (CASSANDRA-9760)
* (cqlsh) Allow the SSL protocol version to be specified through the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d44186e/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index aefce0d..6b6286f 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -43,9 +43,9 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values
private static final int WINDOW_SIZE = 100;
- private int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval();
- private int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval();
- private double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold();
+ private final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval();
+ private final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval();
+ private final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold();
// the score for a merged set of endpoints must be this much worse than the score for separate endpoints to
// warrant not merging two ranges into a single range
@@ -155,7 +155,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private void sortByProximityWithScore(final InetAddress address, List<InetAddress> addresses)
{
- super.sortByProximity(address, addresses);
+ // Scores can change concurrently from a call to this method. But Collections.sort() expects
+ // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration
+ // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current
+ // version of it during this call.
+ final HashMap<InetAddress, Double> scores = this.scores;
+ Collections.sort(addresses, new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress a1, InetAddress a2)
+ {
+ return compareEndpoints(address, a1, a2, scores);
+ }
+ });
}
private void sortByProximityWithBadness(final InetAddress address, List<InetAddress> addresses)
@@ -164,6 +175,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return;
subsnitch.sortByProximity(address, addresses);
+ HashMap<InetAddress, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below
+ // (which wouldn't really matter here but its cleaner that way).
ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size());
for (InetAddress inet : addresses)
{
@@ -190,7 +203,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
}
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ // Compare endpoints given an immutable snapshot of the scores
+ private int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2, Map<InetAddress, Double> scores)
{
Double scored1 = scores.get(a1);
Double scored2 = scores.get(a2);
@@ -215,6 +229,14 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return 1;
}
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ // That function is fundamentally unsafe because the scores can change at any time and so the result of that
+ // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in
+ // sortByProximityWithScore().
+ throw new UnsupportedOperationException("You shouldn't wrap the DynamicEndpointSnitch (within itself or otherwise)");
+ }
+
public void receiveTiming(InetAddress host, long latency) // this is cheap
{
ExponentiallyDecayingSample sample = samples.get(host);
@@ -264,7 +286,6 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
scores = newScores;
}
-
private void reset()
{
for (ExponentiallyDecayingSample sample : samples.values())
@@ -288,6 +309,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
{
return BADNESS_THRESHOLD;
}
+
public String getSubsnitchClassName()
{
return subsnitch.getClass().getName();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d44186e/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index e23bcfa..3f90532 100644
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.service.StorageService;
import org.junit.Test;
@@ -90,4 +90,67 @@ public class DynamicEndpointSnitchTest
order = Arrays.asList(host1, host3, host2);
assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
}
-}
\ No newline at end of file
+
+ @Test
+ public void testConcurrency() throws InterruptedException, IOException, ConfigurationException
+ {
+ // The goal of this test is to check for CASSANDRA-8448/CASSANDRA-9519
+ double badness = DatabaseDescriptor.getDynamicBadnessThreshold();
+ DatabaseDescriptor.setDynamicBadnessThreshold(0.0);
+
+ final int ITERATIONS = 10;
+
+ // do this because SS needs to be initialized before DES can work properly.
+ StorageService.instance.initClient(0);
+ SimpleSnitch ss = new SimpleSnitch();
+ DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
+ InetAddress self = FBUtilities.getBroadcastAddress();
+
+ List<InetAddress> hosts = new ArrayList<>();
+ // We want a giant list of hosts so that sorting it takes time, making it much more likely to reproduce the
+ // problem we're looking for.
+ for (int i = 0; i < 10; i++)
+ for (int j = 0; j < 256; j++)
+ for (int k = 0; k < 256; k++)
+ hosts.add(InetAddress.getByAddress(new byte[]{127, (byte)i, (byte)j, (byte)k}));
+
+ ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
+ updater.start();
+
+ List<InetAddress> result = null;
+ for (int i = 0; i < ITERATIONS; i++)
+ result = dsnitch.getSortedListByProximity(self, hosts);
+
+ updater.stopped = true;
+ updater.join();
+
+ DatabaseDescriptor.setDynamicBadnessThreshold(badness);
+ }
+
+ public static class ScoreUpdater extends Thread
+ {
+ private static final int SCORE_RANGE = 100;
+
+ public volatile boolean stopped;
+
+ private final DynamicEndpointSnitch dsnitch;
+ private final List<InetAddress> hosts;
+ private final Random random = new Random();
+
+ public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddress> hosts)
+ {
+ this.dsnitch = dsnitch;
+ this.hosts = hosts;
+ }
+
+ public void run()
+ {
+ while (!stopped)
+ {
+ InetAddress host = hosts.get(random.nextInt(hosts.size()));
+ int score = random.nextInt(SCORE_RANGE);
+ dsnitch.receiveTiming(host, score);
+ }
+ }
+ }
+}
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by sl...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c33ebcd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c33ebcd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c33ebcd2
Branch: refs/heads/cassandra-2.2
Commit: c33ebcd2fd09704ac1cdfa81d12978b9f582b404
Parents: 0522226 9d44186
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 15 10:30:32 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 15 10:30:32 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../locator/DynamicEndpointSnitch.java | 34 ++++++++--
.../locator/DynamicEndpointSnitchTest.java | 69 +++++++++++++++++++-
3 files changed, 95 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ebcd2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3046ceb,1e21c8d..c2d06e4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
-2.1.9
+2.2.0-rc3
+ * sum() and avg() functions missing for smallint and tinyint types (CASSANDRA-9671)
+ * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
+Merged from 2.1:
+ * Complete CASSANDRA-8448 fix (CASSANDRA-9519)
* Handle corrupt files on startup (CASSANDRA-9686)
* Fix clientutil jar and tests (CASSANDRA-9760)
* (cqlsh) Allow the SSL protocol version to be specified through the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ebcd2/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index b670b6a,6b6286f..3e89dd4
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@@ -216,12 -229,20 +230,20 @@@ public class DynamicEndpointSnitch exte
return 1;
}
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ // That function is fundamentally unsafe because the scores can change at any time and so the result of that
+ // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in
+ // sortByProximityWithScore().
+ throw new UnsupportedOperationException("You shouldn't wrap the DynamicEndpointSnitch (within itself or otherwise)");
+ }
+
public void receiveTiming(InetAddress host, long latency) // this is cheap
{
- ExponentiallyDecayingSample sample = samples.get(host);
+ ExponentiallyDecayingReservoir sample = samples.get(host);
if (sample == null)
{
- ExponentiallyDecayingSample maybeNewSample = new ExponentiallyDecayingSample(WINDOW_SIZE, ALPHA);
+ ExponentiallyDecayingReservoir maybeNewSample = new ExponentiallyDecayingReservoir(WINDOW_SIZE, ALPHA);
sample = samples.putIfAbsent(host, maybeNewSample);
if (sample == null)
sample = maybeNewSample;
@@@ -265,10 -286,10 +287,9 @@@
scores = newScores;
}
-
private void reset()
{
- for (ExponentiallyDecayingSample sample : samples.values())
- sample.clear();
+ samples.clear();
}
public Map<InetAddress, Double> getScores()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ebcd2/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index c7c1f17,3f90532..64da6d3
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@@ -90,4 -90,67 +90,67 @@@ public class DynamicEndpointSnitchTes
order = Arrays.asList(host1, host3, host2);
assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
}
- }
+
+ @Test
+ public void testConcurrency() throws InterruptedException, IOException, ConfigurationException
+ {
+ // The goal of this test is to check for CASSANDRA-8448/CASSANDRA-9519
+ double badness = DatabaseDescriptor.getDynamicBadnessThreshold();
+ DatabaseDescriptor.setDynamicBadnessThreshold(0.0);
+
+ final int ITERATIONS = 10;
+
+ // do this because SS needs to be initialized before DES can work properly.
- StorageService.instance.initClient(0);
++ StorageService.instance.unsafeInitialize();
+ SimpleSnitch ss = new SimpleSnitch();
+ DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
+ InetAddress self = FBUtilities.getBroadcastAddress();
+
+ List<InetAddress> hosts = new ArrayList<>();
+ // We want a giant list of hosts so that sorting it takes time, making it much more likely to reproduce the
+ // problem we're looking for.
+ for (int i = 0; i < 10; i++)
+ for (int j = 0; j < 256; j++)
+ for (int k = 0; k < 256; k++)
+ hosts.add(InetAddress.getByAddress(new byte[]{127, (byte)i, (byte)j, (byte)k}));
+
+ ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
+ updater.start();
+
+ List<InetAddress> result = null;
+ for (int i = 0; i < ITERATIONS; i++)
+ result = dsnitch.getSortedListByProximity(self, hosts);
+
+ updater.stopped = true;
+ updater.join();
+
+ DatabaseDescriptor.setDynamicBadnessThreshold(badness);
+ }
+
+ public static class ScoreUpdater extends Thread
+ {
+ private static final int SCORE_RANGE = 100;
+
+ public volatile boolean stopped;
+
+ private final DynamicEndpointSnitch dsnitch;
+ private final List<InetAddress> hosts;
+ private final Random random = new Random();
+
+ public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddress> hosts)
+ {
+ this.dsnitch = dsnitch;
+ this.hosts = hosts;
+ }
+
+ public void run()
+ {
+ while (!stopped)
+ {
+ InetAddress host = hosts.get(random.nextInt(hosts.size()));
+ int score = random.nextInt(SCORE_RANGE);
+ dsnitch.receiveTiming(host, score);
+ }
+ }
+ }
+ }