You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/03/31 01:15:59 UTC
[2/2] git commit: Dsnitch uses 'severity', latency,
and time since last reply for scores. Patch by vijay and
brandonwilliams, reviewed by vijay for CASSANDRA-3722
Dsnitch uses 'severity', latency, and time since last reply for scores.
Patch by vijay and brandonwilliams, reviewed by vijay for CASSANDRA-3722
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98a70bde
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98a70bde
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98a70bde
Branch: refs/heads/trunk
Commit: 98a70bdebb26955ea629fe3a21212253fc7b7d17
Parents: 08345fa
Author: Brandon Williams <br...@apache.org>
Authored: Fri Mar 30 16:44:35 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Mar 30 18:15:50 2012 -0500
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionInfo.java | 21 +++++++
.../cassandra/db/compaction/CompactionManager.java | 4 ++
.../org/apache/cassandra/gms/ApplicationState.java | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 2 +-
.../org/apache/cassandra/gms/VersionedValue.java | 5 ++
.../cassandra/locator/DynamicEndpointSnitch.java | 43 ++++++++++++++-
.../locator/DynamicEndpointSnitchMBean.java | 7 +++
.../org/apache/cassandra/net/MessagingService.java | 6 ++
.../apache/cassandra/service/StorageService.java | 23 ++++++++-
.../locator/DynamicEndpointSnitchTest.java | 11 ++++-
10 files changed, 117 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index fda8dd3..594b639 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.service.StorageService;
+
/** Implements serializable to allow structured info to be returned via JMX. */
public final class CompactionInfo implements Serializable
{
@@ -103,6 +105,8 @@ public final class CompactionInfo implements Serializable
{
private volatile boolean isStopped = false;
public abstract CompactionInfo getCompactionInfo();
+ double load = StorageService.instance.getLoad();
+ boolean reportedSeverity = false;
public void stop()
{
@@ -113,5 +117,22 @@ public final class CompactionInfo implements Serializable
{
return isStopped;
}
+ /**
+ * report event on the size of the compaction.
+ */
+ public void started()
+ {
+ reportedSeverity = StorageService.instance.reportSeverity(getCompactionInfo().getTotalBytes()/load);
+ }
+
+ /**
+ * remove the event complete
+ */
+ public void finished()
+ {
+ if (reportedSeverity)
+ StorageService.instance.reportSeverity(-(getCompactionInfo().getTotalBytes()/load));
+ reportedSeverity = false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 848abee..36f05c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1017,11 +1017,15 @@ public class CompactionManager implements CompactionManagerMBean
public void beginCompaction(CompactionInfo.Holder ci)
{
+ // notify
+ ci.started();
compactions.add(ci);
}
public void finishCompaction(CompactionInfo.Holder ci)
{
+ // notify
+ ci.finished();
compactions.remove(ci);
totalBytesCompacted += ci.getCompactionInfo().getTotalBytes();
totalCompactionsCompleted += 1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 48e8d84..4520426 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -28,6 +28,7 @@ public enum ApplicationState
REMOVAL_COORDINATOR,
INTERNAL_IP,
RPC_ADDRESS,
+ SEVERITY,
// pad to allow adding new states to existing cluster
X1,
X2,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e6a2a3c..2a948a6 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1160,7 +1160,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public boolean isEnabled()
{
- return !scheduledGossipTask.isCancelled();
+ return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index eccca1f..36ff1d9 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -178,6 +178,11 @@ public class VersionedValue implements Comparable<VersionedValue>
{
return new VersionedValue(private_ip);
}
+
+ public VersionedValue severity(double value)
+ {
+ return new VersionedValue(String.valueOf(value));
+ }
}
private static class VersionedValueSerializer implements IVersionedSerializer<VersionedValue>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 0f3aaca..3b80e67 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -49,6 +49,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private boolean registered = false;
private final ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap<InetAddress, Double>();
+ private final ConcurrentHashMap<InetAddress, Long> lastReceived = new ConcurrentHashMap<InetAddress, Long>();
private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = new ConcurrentHashMap<InetAddress, BoundedStatsDeque>();
private final AtomicInteger intervalupdates = new AtomicInteger(0);
@@ -56,7 +57,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public DynamicEndpointSnitch(IEndpointSnitch snitch)
{
- mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch,instance="+hashCode();
+ this(snitch, null);
+ }
+ public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+ {
+ mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
+ if (instance != null)
+ mbeanName += ",instance=" + instance;
subsnitch = snitch;
Runnable update = new Runnable()
{
@@ -195,12 +202,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public void receiveTiming(InetAddress host, Double latency) // this is cheap
{
+ lastReceived.put(host, System.currentTimeMillis());
if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
return;
BoundedStatsDeque deque = windows.get(host);
if (deque == null)
{
- BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE);
+ BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE);
deque = windows.putIfAbsent(host, maybeNewDeque);
if (deque == null)
deque = maybeNewDeque;
@@ -222,13 +230,32 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
}
+ double maxLatency = 1;
+ long maxPenalty = 1;
+ HashMap<InetAddress, Long> penalties = new HashMap<InetAddress, Long>();
+ for (Map.Entry<InetAddress, BoundedStatsDeque> entry : windows.entrySet())
+ {
+ double mean = entry.getValue().mean();
+ if (mean > maxLatency)
+ maxLatency = mean;
+ long timePenalty = lastReceived.containsKey(entry.getKey()) ? lastReceived.get(entry.getKey()) : System.currentTimeMillis();
+ timePenalty = System.currentTimeMillis() - timePenalty;
+ timePenalty = timePenalty > UPDATE_INTERVAL_IN_MS ? UPDATE_INTERVAL_IN_MS : timePenalty;
+ penalties.put(entry.getKey(), timePenalty);
+ if (timePenalty > maxPenalty)
+ maxPenalty = timePenalty;
+ }
for (Map.Entry<InetAddress, BoundedStatsDeque> entry: windows.entrySet())
{
- scores.put(entry.getKey(), entry.getValue().mean());
+ double score = entry.getValue().mean() / maxLatency;
+ score += penalties.get(entry.getKey()) / maxPenalty;
+ score += StorageService.instance.getSeverity(entry.getKey());
+ scores.put(entry.getKey(), score);
}
intervalupdates.set(0);
}
+
private void reset()
{
for (BoundedStatsDeque deque : windows.values())
@@ -274,4 +301,14 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return timings;
}
+ public void setSeverity(double severity)
+ {
+ StorageService.instance.reportSeverity(severity);
+ }
+
+ public double getSeverity()
+ {
+ return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
index 7c423c8..becbacf 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
@@ -29,4 +29,11 @@ public interface DynamicEndpointSnitchMBean {
public double getBadnessThreshold();
public String getSubsnitchClassName();
public List<Double> dumpTimings(String hostname) throws UnknownHostException;
+ /**
+ * Use this if you want to specify a severity it can be -ve
+ * Example: Page cache is cold and you want data to be sent
+ * though it is not preferred one.
+ */
+ public void setSeverity(double severity);
+ public double getSeverity();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index f5379de..f3e2600 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -715,6 +715,12 @@ public final class MessagingService implements MessagingServiceMBean
return pendingTasks;
}
+ public int getCommandPendingTasks(InetAddress address)
+ {
+ OutboundTcpConnectionPool connection = connectionManagers.get(address);
+ return connection == null ? 0 : connection.cmdCon.getPendingMessages();
+ }
+
public Map<String, Long> getCommandCompletedTasks()
{
Map<String, Long> completedTasks = new HashMap<String, Long>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4e43964..4b12383 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.SSTableLoader;
@@ -785,6 +784,28 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
/**
+ * Gossip about the known severity of the events in this node
+ */
+ public synchronized boolean reportSeverity(double incr)
+ {
+ if (!Gossiper.instance.isEnabled())
+ return false;
+ double update = getSeverity(FBUtilities.getBroadcastAddress()) + incr;
+ VersionedValue updated = StorageService.instance.valueFactory.severity(update);
+ Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
+ return true;
+ }
+
+ public double getSeverity(InetAddress endpoint)
+ {
+ VersionedValue event;
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
+ return Double.parseDouble(event.value);
+ return 0.0;
+ }
+
+ /**
* for a keyspace, return the ranges and corresponding listen addresses.
* @param keyspace
* @return
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index 7d44f27..decd59a 100644
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@ -37,7 +37,8 @@ public class DynamicEndpointSnitchTest
// do this because SS needs to be initialized before DES can work properly.
StorageService.instance.initClient(0);
int sleeptime = 150;
- DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch());
+ SimpleSnitch ss = new SimpleSnitch();
+ DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
InetAddress self = FBUtilities.getBroadcastAddress();
ArrayList<InetAddress> order = new ArrayList<InetAddress>();
InetAddress host1 = InetAddress.getByName("127.0.0.1");
@@ -61,6 +62,8 @@ public class DynamicEndpointSnitchTest
// make host1 a little worse
dsnitch.receiveTiming(host1, 2.0);
+ dsnitch.receiveTiming(host2, 1.0);
+ dsnitch.receiveTiming(host3, 1.0);
Thread.sleep(sleeptime);
order.clear();
@@ -71,6 +74,8 @@ public class DynamicEndpointSnitchTest
// make host2 as bad as host1
dsnitch.receiveTiming(host2, 2.0);
+ dsnitch.receiveTiming(host1, 1.0);
+ dsnitch.receiveTiming(host3, 1.0);
Thread.sleep(sleeptime);
order.clear();
@@ -82,6 +87,8 @@ public class DynamicEndpointSnitchTest
// make host3 the worst
for (int i = 0; i < 2; i++)
{
+ dsnitch.receiveTiming(host1, 1.0);
+ dsnitch.receiveTiming(host2, 1.0);
dsnitch.receiveTiming(host3, 2.0);
}
Thread.sleep(sleeptime);
@@ -95,6 +102,8 @@ public class DynamicEndpointSnitchTest
// make host3 equal to the others
for (int i = 0; i < 2; i++)
{
+ dsnitch.receiveTiming(host1, 1.0);
+ dsnitch.receiveTiming(host2, 1.0);
dsnitch.receiveTiming(host3, 1.0);
}
Thread.sleep(sleeptime);