You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2016/07/22 22:24:44 UTC
cassandra git commit: Remove compaction Severity from
DynamicEndpointSnitch patch by jbellis;
reviewed by Jeremiah Jordan for CASSANDRA-11738
Repository: cassandra
Updated Branches:
refs/heads/trunk a8f6a6945 -> 286f6a143
Remove compaction Severity from DynamicEndpointSnitch
patch by jbellis; reviewed by Jeremiah Jordan for CASSANDRA-11738
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/286f6a14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/286f6a14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/286f6a14
Branch: refs/heads/trunk
Commit: 286f6a143573de267c1595fe4dd83108ed5356fc
Parents: a8f6a69
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jul 22 17:20:29 2016 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jul 22 17:24:22 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/db/compaction/CompactionInfo.java | 20 ---
.../locator/DynamicEndpointSnitch.java | 25 ++-
.../locator/DynamicEndpointSnitchMBean.java | 25 ++-
.../cassandra/metrics/CompactionMetrics.java | 4 -
.../cassandra/service/StorageService.java | 20 ---
.../utils/BackgroundActivityMonitor.java | 171 -------------------
7 files changed, 46 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f71489..efda89e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@
* Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
* Add supplied username to authentication error messages (CASSANDRA-12076)
* Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+
3.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index fe81eac..535217f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -138,8 +138,6 @@ public final class CompactionInfo implements Serializable
{
private volatile boolean stopRequested = false;
public abstract CompactionInfo getCompactionInfo();
- double load = StorageMetrics.load.getCount();
- double reportedSeverity = 0d;
public void stop()
{
@@ -150,23 +148,5 @@ public final class CompactionInfo implements Serializable
{
return stopRequested;
}
- /**
- * report event on the size of the compaction.
- */
- public void started()
- {
- reportedSeverity = getCompactionInfo().getTotal() / load;
- StorageService.instance.reportSeverity(reportedSeverity);
- }
-
- /**
- * remove the event complete
- */
- public void finished()
- {
- if (reportedSeverity != 0d)
- StorageService.instance.reportSeverity(-(reportedSeverity));
- reportedSeverity = 0d;
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 70aecb0..08f6aa6 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
@@ -31,9 +32,14 @@ import javax.management.ObjectName;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.ftpserver.command.impl.STOR;
/**
@@ -283,7 +289,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
// finally, add the severity without any weighting, since hosts scale this relative to their own load and the size of the task causing the severity.
// "Severity" is basically a measure of compaction activity (CASSANDRA-3722).
if (USE_SEVERITY)
- score += StorageService.instance.getSeverity(entry.getKey());
+ score += getSeverity(entry.getKey());
// lowest score (least amount of badness) wins.
newScores.put(entry.getKey(), score);
}
@@ -333,12 +339,25 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public void setSeverity(double severity)
{
- StorageService.instance.reportManualSeverity(severity);
+ Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, StorageService.instance.valueFactory.severity(severity));
+ }
+
+ private double getSeverity(InetAddress endpoint)
+ {
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state == null)
+ return 0.0;
+
+ VersionedValue event = state.getApplicationState(ApplicationState.SEVERITY);
+ if (event == null)
+ return 0.0;
+
+ return Double.parseDouble(event.value);
}
public double getSeverity()
{
- return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress());
+ return getSeverity(FBUtilities.getBroadcastAddress());
}
public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
index a413bc5..552a16d 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
@@ -29,11 +29,30 @@ public interface DynamicEndpointSnitchMBean {
public double getBadnessThreshold();
public String getSubsnitchClassName();
public List<Double> dumpTimings(String hostname) throws UnknownHostException;
+
/**
- * Use this if you want to specify a severity; it can be negative
- * Example: Page cache is cold and you want data to be sent
- * though it is not preferred one.
+ * Setting a Severity allows operators to inject preference information into the Dynamic Snitch
+ * replica selection.
+ *
+ * When choosing which replicas to participate in a read request, the DSnitch sorts replicas
+ * by response latency, and selects the fastest replicas. Latencies are normalized to a score
+ * from 0 to 1, with lower scores being faster.
+ *
+ * The Severity injected here will be added to the normalized score.
+ *
+ * Thus, adding a Severity greater than 1 will mean the replica will never be contacted
+ * (unless needed for ALL or if it is added later for rapid read protection).
+ *
+ * Conversely, adding a negative Severity means the replica will *always* be contacted.
+ *
+ * (The "Severity" term is historical and dates to when this was used to represent how
+ * badly background tasks like compaction were affecting a replica's performance.
+ * See CASSANDRA-3722 for when this was introduced and CASSANDRA-11738 for why it was removed.)
*/
public void setSeverity(double severity);
+
+ /**
+ * @return the current manually injected Severity.
+ */
public double getSeverity();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index 9d2863f..2cddfff 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -140,15 +140,11 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
public void beginCompaction(CompactionInfo.Holder ci)
{
- // notify
- ci.started();
compactions.add(ci);
}
public void finishCompaction(CompactionInfo.Holder ci)
{
- // notify
- ci.finished();
compactions.remove(ci);
bytesCompacted.inc(ci.getCompactionInfo().getTotal());
totalCompactionsCompleted.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2883e24..d64fc04 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -189,8 +189,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
- private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
-
private final ObjectName jmxObjectName;
private Collection<Token> bootstrapTokens = null;
@@ -1468,24 +1466,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Increment about the known Compaction severity of the events in this node
- */
- public void reportSeverity(double incr)
- {
- bgMonitor.incrCompactionSeverity(incr);
- }
-
- public void reportManualSeverity(double incr)
- {
- bgMonitor.incrManualSeverity(incr);
- }
-
- public double getSeverity(InetAddress endpoint)
- {
- return bgMonitor.getSeverity(endpoint);
- }
-
- /**
* for a keyspace, return the ranges and corresponding listen addresses.
* @param keyspace
* @return the endpoint map
http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
deleted file mode 100644
index 1799d10..0000000
--- a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package org.apache.cassandra.utils;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.util.StringTokenizer;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class BackgroundActivityMonitor
-{
- private static final Logger logger = LoggerFactory.getLogger(BackgroundActivityMonitor.class);
-
- public static final int USER_INDEX = 0;
- public static final int NICE_INDEX = 1;
- public static final int SYS_INDEX = 2;
- public static final int IDLE_INDEX = 3;
- public static final int IOWAIT_INDEX = 4;
- public static final int IRQ_INDEX = 5;
- public static final int SOFTIRQ_INDEX = 6;
-
- private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
- private static final String PROC_STAT_PATH = "/proc/stat";
-
- private final AtomicDouble compaction_severity = new AtomicDouble();
- private final AtomicDouble manual_severity = new AtomicDouble();
- private final ScheduledExecutorService reportThread = new DebuggableScheduledThreadPoolExecutor("Background_Reporter");
-
- private RandomAccessFile statsFile;
- private long[] lastReading;
-
- public BackgroundActivityMonitor()
- {
- try
- {
- statsFile = new RandomAccessFile(PROC_STAT_PATH, "r");
- lastReading = readAndCompute();
- }
- catch (IOException ex)
- {
- if (FBUtilities.hasProcFS())
- logger.warn("Couldn't open /proc/stats");
- statsFile = null;
- }
- reportThread.scheduleAtFixedRate(new BackgroundActivityReporter(), 1, 1, TimeUnit.SECONDS);
- }
-
- private long[] readAndCompute() throws IOException
- {
- statsFile.seek(0);
- StringTokenizer tokenizer = new StringTokenizer(statsFile.readLine());
- String name = tokenizer.nextToken();
- assert name.equalsIgnoreCase("cpu");
- long[] returned = new long[tokenizer.countTokens()];
- for (int i = 0; i < returned.length; i++)
- returned[i] = Long.parseLong(tokenizer.nextToken());
- return returned;
- }
-
- private float compareAtIndex(long[] reading1, long[] reading2, int index)
- {
- long total1 = 0, total2 = 0;
- for (int i = 0; i <= SOFTIRQ_INDEX; i++)
- {
- total1 += reading1[i];
- total2 += reading2[i];
- }
- float totalDiff = total2 - total1;
-
- long intrested1 = reading1[index], intrested2 = reading2[index];
- float diff = intrested2 - intrested1;
- if (diff == 0)
- return 0f;
- return (diff / totalDiff) * 100; // yes it is hard coded to 100 [update
- // unit?]
- }
-
- public void incrCompactionSeverity(double sev)
- {
- compaction_severity.addAndGet(sev);
- }
-
- public void incrManualSeverity(double sev)
- {
- manual_severity.addAndGet(sev);
- }
-
- public double getIOWait() throws IOException
- {
- if (statsFile == null)
- return -1d;
- long[] newComp = readAndCompute();
- double value = compareAtIndex(lastReading, newComp, IOWAIT_INDEX);
- lastReading = newComp;
- return value;
- }
-
- public double getNormalizedLoadAvg()
- {
- double avg = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
- return avg / NUM_CPUS;
- }
-
- public double getSeverity(InetAddress endpoint)
- {
- VersionedValue event;
- EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
- return Double.parseDouble(event.value);
- return 0.0;
- }
-
- public class BackgroundActivityReporter implements Runnable
- {
- public void run()
- {
- double report = -1;
- try
- {
- report = getIOWait();
- }
- catch (IOException e)
- {
- // ignore;
- if (FBUtilities.hasProcFS())
- logger.warn("Couldn't read /proc/stats");
- }
- if (report == -1d)
- report = compaction_severity.get();
-
- if (!Gossiper.instance.isEnabled())
- return;
- report += manual_severity.get(); // add manual severity setting.
- VersionedValue updated = StorageService.instance.valueFactory.severity(report);
- Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
- }
- }
-}