You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/04/15 21:59:22 UTC

[GitHub] [cassandra] ekaterinadimitrova2 commented on a diff in pull request #1546: CASSANDRA-17150 trunk: Guardrails for disk usage

ekaterinadimitrova2 commented on code in PR #1546:
URL: https://github.com/apache/cassandra/pull/1546#discussion_r849980492


##########
src/java/org/apache/cassandra/config/GuardrailsOptions.java:
##########
@@ -446,20 +452,23 @@ public void setWriteConsistencyLevelsDisallowed(Set<ConsistencyLevel> consistenc
                                   x -> config.write_consistency_levels_disallowed = x);
     }
 
+    @Override
+    @Nullable
     public DataStorageSpec getCollectionSizeWarnThreshold()
     {
         return config.collection_size_warn_threshold;
     }
 
     @Override
+    @Nullable
     public DataStorageSpec getCollectionSizeFailThreshold()
     {
         return config.collection_size_fail_threshold;
     }
 
-    public void setCollectionSizeThreshold(DataStorageSpec warn, DataStorageSpec fail)
+    public void setCollectionSizeThreshold(@Nullable DataStorageSpec warn, @Nullable DataStorageSpec fail)

Review Comment:
   Good call adding @Nullable!



##########
src/java/org/apache/cassandra/db/guardrails/Guardrails.java:
##########
@@ -228,8 +232,8 @@ public final class Guardrails implements GuardrailsMBean
      */
     public static final Threshold collectionSize =
     new Threshold("collection_size",
-                  state -> CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold().toBytes(),
-                  state -> CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold().toBytes(),
+                  state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold()),

Review Comment:
   What is the reason to convert to -1 when we use null for disabled?



##########
test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+    private static final int NUM_ROWS = 100;
+
+    private static final String WARN_MESSAGE = "Replica disk usage exceeds warning threshold";
+    private static final String FAIL_MESSAGE = "Write request failed because disk usage exceeds failure threshold";
+
+    private static Cluster cluster;
+    private static com.datastax.driver.core.Cluster driverCluster;
+    private static Session driverSession;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        // speed up the task that calculates and propagates the disk usage info
+        CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100);
+
+        // build a 2-node cluster with RF=1
+        cluster = init(Cluster.build(2)
+                              .withInstanceInitializer(DiskStateInjection::install)
+                              .withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
+                                                .set("data_disk_usage_percentage_warn_threshold", 98)
+                                                .set("data_disk_usage_percentage_fail_threshold", 99)
+                                                .set("authenticator", "PasswordAuthenticator"))
+                              .start(), 1);
+
+        // create a regular user, since the default superuser is excluded from guardrails
+        com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
+        try (com.datastax.driver.core.Cluster c = builder.withCredentials("cassandra", "cassandra").build();
+             Session session = c.connect())
+        {
+            session.execute("CREATE USER test WITH PASSWORD 'test'");
+        }
+
+        // connect using that superuser, we use the driver to get access to the client warnings
+        driverCluster = builder.withCredentials("test", "test").build();
+        driverSession = driverCluster.connect();
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (driverSession != null)
+            driverSession.close();
+
+        if (driverCluster != null)
+            driverCluster.close();
+
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Override
+    protected Cluster getCluster()
+    {
+        return cluster;
+    }
+
+    @Test
+    public void testDiskUsage() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+        String insert = format("INSERT INTO %s(k, v) VALUES (?, 0)");
+
+        // With both nodes is SPACIOUS state, we can write without warnings nor failures

Review Comment:
   ```suggestion
          // With both nodes in SPACIOUS state, we can write without warnings nor failures
   ```



##########
src/java/org/apache/cassandra/cql3/statements/BatchStatement.java:
##########
@@ -411,16 +411,22 @@ public ResultMessage execute(QueryState queryState, BatchQueryOptions options, l
         if (options.getSerialConsistency() == null)
             throw new InvalidRequestException("Invalid empty serial consistency level");
 
+        ClientState clientState = queryState.getClientState();
         Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(), options.getSerialConsistency()),
-                                                queryState.getClientState());
+                                                clientState);
+
+        for (int i = 0; i < statements.size(); i++ )
+        {
+            statements.get(i).validateDiskUsage(options.forStatement(i), clientState);
+        }

Review Comment:
   nit: I would probably remove the braces of the for loop



##########
conf/cassandra.yaml:
##########
@@ -1671,6 +1671,18 @@ drop_compact_storage_enabled: false
 # Default -1 to disable.
 # fields_per_udt_warn_threshold: -1
 # fields_per_udt_fail_threshold: -1
+# Guardrail to warn or fail when local data disk usage percentage exceeds threshold. Valid values are in (1, 100].

Review Comment:
   This is a bit different from dynamic_snitch_badness_threshold where 1.0 means 100%, but I find it better than double numbers so I would encourage it actually. Also why not [1,100]? Do we exclude 1? I don't think so. 
   Also, do we plan recommendations on how to choose the values?



##########
src/java/org/apache/cassandra/db/guardrails/Guardrails.java:
##########
@@ -42,11 +46,11 @@ public final class Guardrails implements GuardrailsMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=Guardrails";
 
-    private static final GuardrailsConfigProvider CONFIG_PROVIDER = GuardrailsConfigProvider.instance;
+    public static final GuardrailsConfigProvider CONFIG_PROVIDER = GuardrailsConfigProvider.instance;
     private static final GuardrailsOptions DEFAULT_CONFIG = DatabaseDescriptor.getGuardrailsConfig();
 
     @VisibleForTesting
-    static final Guardrails instance = new Guardrails();
+    public static final Guardrails instance = new Guardrails();

Review Comment:
   Did this one become public for test purposes? 



##########
src/java/org/apache/cassandra/config/GuardrailsOptions.java:
##########
@@ -558,19 +609,44 @@ private static void validateIntThreshold(int warn, int fail, String name)
         validateWarnLowerThanFail(warn, fail, name);
     }
 
+    private static void validatePercentageThreshold(int warn, int fail, String name)

Review Comment:
   nit: I think we should reorder the methods to keep the code style. 
   For instance: `validatePercentage` should be before `validatePositiveNumeric`. Caller before called one, no?



##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -265,6 +265,9 @@
     TEST_IGNORE_SIGAR("cassandra.test.ignore_sigar", "false"),
     PAXOS_EXECUTE_ON_SELF("cassandra.paxos.use_self_execution", "true"),
 
+    // properties for guardrails

Review Comment:
   I would prefer if we add a JavaDoc, the way it was when I introduced the class. I can see that after that people were adding properties just with comment but I think it is better to have a JavaDoc here



##########
src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.function.ToLongFunction;
+
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A {@link Threshold} guardrail whose values represent a percentage
+ * <p>
+ * This works exactly as a {@link Threshold}, but provides slightly more convenient error messages for percentage
+ */
+public class PercentageThreshold extends Threshold

Review Comment:
   Wondering whether this should be final class?



##########
src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java:
##########
@@ -433,4 +441,37 @@
      * @param fail The threshold to prevent creating a UDT with more fields than threshold. -1 means disabled.
      */
     void setFieldsPerUDTThreshold(int warn, int fail);
+
+    /**
+     * @return The threshold to warn when local data disk usage percentage exceeds that threshold.
+     * Allowed values are in the range {@code (1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageWarnThreshold();
+
+    /**
+     * @return The threshold to fail when local data disk usage percentage exceeds that threshold.
+     * Allowed values are in the range {@code (1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageFailThreshold();
+
+    /**
+     * @param warn The threshold to warn when local disk usage percentage exceeds that threshold.
+     *             Allowed values are in the range {@code (1, 100]}, and -1 means disabled.
+     * @param fail The threshold to fail when local disk usage percentage exceeds that threshold.
+     *             Allowed values are in the range {@code (1, 100]}, and -1 means disabled.
+     */
+    public void setDataDiskUsagePercentageThreshold(int warn, int fail);
+
+    /**
+     * @return The max disk size of the data directories when calculating disk usage thresholds, as a string formatted
+     * as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}.
+     */
+    @Nullable
+    String getDataDiskUsageMaxDiskSize();

Review Comment:
   I think we should probably mention `null` in the java doc for this and the following method



##########
src/java/org/apache/cassandra/db/guardrails/Guardrails.java:
##########
@@ -708,6 +749,36 @@ public void setFieldsPerUDTThreshold(int warn, int fail)
         DEFAULT_CONFIG.setFieldsPerUDTThreshold(warn, fail);
     }
 
+    public int getDataDiskUsagePercentageWarnThreshold()

Review Comment:
   Missing @Override



##########
src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java:
##########
@@ -222,4 +226,22 @@
      * @return The threshold to fail when creating a UDT with more fields than threshold.
      */
     int getFieldsPerUDTFailThreshold();
+
+    /**
+     * @return The threshold to warn when local disk usage percentage exceeds that threshold.
+     * Allowed values are in the range {@code (1, 100]}, and -1 means disabled.

Review Comment:
   I believe this needs to be `[1,100]`, not `(1,100]`?



##########
src/java/org/apache/cassandra/config/GuardrailsOptions.java:
##########
@@ -558,19 +609,44 @@ private static void validateIntThreshold(int warn, int fail, String name)
         validateWarnLowerThanFail(warn, fail, name);
     }
 
+    private static void validatePercentageThreshold(int warn, int fail, String name)
+    {
+        validatePercentage(warn, name + "_warn_threshold");
+        validatePercentage(fail, name + "_fail_threshold");
+        validateWarnLowerThanFail(warn, fail, name);
+    }
+
     private static void validateWarnLowerThanFail(long warn, long fail, String name)
     {
-        if (warn == Config.DISABLED_GUARDRAIL || fail == Config.DISABLED_GUARDRAIL)
+        if (warn == -1 || fail == -1)
             return;
 
         if (fail < warn)
             throw new IllegalArgumentException(format("The warn threshold %d for %s_warn_threshold should be lower " +
                                                       "than the fail threshold %d", warn, name, fail));
     }
 
-    private static void validateSizeThreshold(DataStorageSpec warn, DataStorageSpec fail, String name)
+    private static void validateSize(DataStorageSpec size, boolean allowZero, String name)
+    {
+        if (size == null)
+            return;
+
+        if (!allowZero && size.toBytes() == 0)
+            throw new IllegalArgumentException(format("Invalid value for %s: 0 is not allowed; " +
+                                                      "if attempting to disable use null/empty",

Review Comment:
   nit: empty value



##########
src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java:
##########
@@ -387,20 +389,26 @@
     void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);
 
     /**
-     * @return The threshold to warn when encountering larger size of collection data than threshold, in KiB.
+     * @return The threshold to warn when encountering larger size of collection data than threshold, as a string
+     * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}.
      */
-    long getCollectionSizeWarnThresholdInKiB();
+    @Nullable
+    String getCollectionSizeWarnThreshold();
 
     /**
-     * @return The threshold to prevent collections with larger data size than threshold, in KiB.
+     * @return The threshold to prevent collections with larger data size than threshold, as a string formatted as in,
+     * for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}.
      */
-    long getCollectionSizeFailThresholdInKiB();
+    @Nullable
+    String getCollectionSizeFailThreshold();
 
     /**
-     * @param warnInKiB The threshold to warn when encountering larger size of collection data than threshold, in KiB.
-     * @param failInKiB The threshold to prevent collections with larger data size than threshold, in KiB.
+     * @param warnSize The threshold to warn when encountering larger size of collection data than threshold, as a
+     *                 string formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}.
+     * @param failSize The threshold to prevent collections with larger data size than threshold, as a string formatted
+     *                 as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}.

Review Comment:
   I know you documented the null being disabled in cassandra.yaml but I would say it is probably worth it to add a point here too. 



##########
src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java:
##########
@@ -222,4 +226,22 @@
      * @return The threshold to fail when creating a UDT with more fields than threshold.
      */
     int getFieldsPerUDTFailThreshold();
+
+    /**
+     * @return The threshold to warn when local disk usage percentage exceeds that threshold.
+     * Allowed values are in the range {@code (1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageWarnThreshold();
+
+    /**
+     * @return The threshold to fail when local disk usage percentage exceeds that threshold.
+     * Allowed values are in the range {@code (1, 100]}, and -1 means disabled.

Review Comment:
   `[1,100]`



##########
src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+    public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+    private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+    private final Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier;
+
+    private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+    @VisibleForTesting
+    public DiskUsageMonitor()
+    {
+        this.dataDirectoriesSupplier = DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+    }
+
+    @VisibleForTesting
+    public DiskUsageMonitor(Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier)
+    {
+        this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+    }
+
+    /**
+     * Start monitoring local disk usage and call notifier when local disk usage state changed.
+     */
+    public void start(Consumer<DiskUsageState> notifier)
+    {
+        // start the scheduler regardless guardrail is enabled, so we can enable it later without a restart
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+            if (!Guardrails.localDataDiskUsage.enabled(null))
+                return;
+
+            updateLocalState(getDiskUsage(), notifier);
+        }, 0, CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    public void updateLocalState(double usageRatio, Consumer<DiskUsageState> notifier)
+    {
+        double percentage = usageRatio * 100;
+        long percentageCeiling = (long) Math.ceil(percentage);
+
+        DiskUsageState state = getState(percentageCeiling);
+
+        Guardrails.localDataDiskUsage.guard(percentageCeiling, state.toString(), false, null);
+
+        // if state remains unchanged, no need to notify peers
+        if (state == localState)
+            return;
+
+        localState = state;
+        notifier.accept(state);
+    }
+
+    /**
+     * @return local node disk usage state
+     */
+    @VisibleForTesting
+    public DiskUsageState state()
+    {
+        return localState;
+    }
+
+    /**
+     * @return disk usage (including all memtable sizes) ratio
+     */
+    @VisibleForTesting
+    public double getDiskUsage()
+    {
+        // using BigDecimal to handle large file system
+        BigDecimal used = BigDecimal.ZERO; // space used by data directories
+        BigDecimal usable = BigDecimal.ZERO; // free space on disks
+
+        for (Map.Entry<FileStore, Collection<Directories.DataDirectory>> e : dataDirectoriesSupplier.get().asMap().entrySet())
+        {
+            usable = usable.add(BigDecimal.valueOf(usableSpace(e.getKey())));
+
+            for (Directories.DataDirectory dir : e.getValue())
+                used = used.add(BigDecimal.valueOf(dir.getRawSize()));
+        }
+
+        // The total disk size for data directories is the space that is actually used by those directories plus the
+        // free space on disk that might be used for storing those directories in the future.
+        BigDecimal total = used.add(usable);
+
+        // That total space can be limited by the config property data_disk_usage_max_disk_size.
+        DataStorageSpec diskUsageMaxSize = guardrailsConfigSupplier.get().getDataDiskUsageMaxDiskSize();
+        if (diskUsageMaxSize != null)
+            total = total.min(BigDecimal.valueOf(diskUsageMaxSize.toBytes()));

Review Comment:
   I am wondering whether we shouldn't workaround this calculation every time, poking the conversion toBytes.... Maybe we should have it somewhere in bytes and use that one and make the JMX to be able to provide only bytes... Thinking about performance which is hard without having performance testing unfortunately. Thoughts?



##########
src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+    public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+    private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+    private final Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier;
+
+    private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+    @VisibleForTesting
+    public DiskUsageMonitor()
+    {
+        this.dataDirectoriesSupplier = DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+    }
+
+    @VisibleForTesting
+    public DiskUsageMonitor(Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier)
+    {
+        this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+    }
+
+    /**
+     * Start monitoring local disk usage and call notifier when local disk usage state changed.
+     */
+    public void start(Consumer<DiskUsageState> notifier)
+    {
+        // start the scheduler regardless guardrail is enabled, so we can enable it later without a restart
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+            if (!Guardrails.localDataDiskUsage.enabled(null))
+                return;
+
+            updateLocalState(getDiskUsage(), notifier);
+        }, 0, CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    public void updateLocalState(double usageRatio, Consumer<DiskUsageState> notifier)
+    {
+        double percentage = usageRatio * 100;
+        long percentageCeiling = (long) Math.ceil(percentage);
+
+        DiskUsageState state = getState(percentageCeiling);
+
+        Guardrails.localDataDiskUsage.guard(percentageCeiling, state.toString(), false, null);
+
+        // if state remains unchanged, no need to notify peers
+        if (state == localState)
+            return;
+
+        localState = state;
+        notifier.accept(state);
+    }
+
+    /**
+     * @return local node disk usage state
+     */
+    @VisibleForTesting
+    public DiskUsageState state()
+    {
+        return localState;
+    }
+
+    /**
+     * @return disk usage (including all memtable sizes) ratio

Review Comment:
   disk space occupied by data directories?



##########
src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+public enum DiskUsageState
+{
+    NOT_AVAILABLE("Not Available"), // either disk usage guardrail is not enabled or gossip state is not ready

Review Comment:
   Wondering whether we want some java doc here



##########
conf/cassandra.yaml:
##########
@@ -1671,6 +1671,18 @@ drop_compact_storage_enabled: false
 # Default -1 to disable.
 # fields_per_udt_warn_threshold: -1
 # fields_per_udt_fail_threshold: -1
+# Guardrail to warn or fail when local data disk usage percentage exceeds threshold. Valid values are in (1, 100].
+# This is only used for the disks storing data directories, so it won't count any separate disks used for storing
+# the commitlog, hints nor saved caches.
+# The two thresholds default to -1 to disable.
+# data_disk_usage_percentage_warn_threshold: -1
+# data_disk_usage_percentage_fail_threshold: -1
+# Allows defining the max disk size of the data directories when calculating thresholds for
+# disk_usage_percentage_warn_threshold and disk_usage_percentage_fail_threshold, so if this is greater than zero they
+# become percentages of a fixed size on disk instead of percentages of the physically available disk size.
+# Valid values are in (1, max available disk size of all data directories].

Review Comment:
   [1, max available disk size of all data directories]



##########
src/java/org/apache/cassandra/db/guardrails/Guardrail.java:
##########
@@ -75,6 +85,9 @@ protected void warn(String message)
 
     protected void warn(String message, String redactedMessage)
     {
+        if (skipNotifying(true))

Review Comment:
   I guess if this is configurable with the trade-offs explained we can leave it to the operators to decide what and with what frequency they would like to log depending on their environment? Does it make sense?



##########
test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java:
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.NOT_AVAILABLE;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.SPACIOUS;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+    private static int defaultDataDiskUsagePercentageWarnThreshold;
+    private static int defaultDataDiskUsagePercentageFailThreshold;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        defaultDataDiskUsagePercentageWarnThreshold = Guardrails.instance.getDataDiskUsagePercentageWarnThreshold();
+        defaultDataDiskUsagePercentageFailThreshold = Guardrails.instance.getDataDiskUsagePercentageFailThreshold();
+
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(-1, -1);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(defaultDataDiskUsagePercentageWarnThreshold,
+                                                                defaultDataDiskUsagePercentageFailThreshold);
+    }
+
+    @Test
+    public void testConfigValidation()
+    {
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize(null));
+        assertNull(guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0B"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0KiB"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0MiB"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0GiB"), "0 is not allowed");
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("10B"));
+        assertEquals("10B", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("20KiB"));
+        assertEquals("20KiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("30MiB"));
+        assertEquals("30MiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("40GiB"));
+        assertEquals("40GiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize(Long.MAX_VALUE + "GiB"), "are actually available on disk");
+
+        // warn threshold smaller than lower bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(0, 80), "0 is not allowed");
+
+        // fail threshold bigger than upper bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(1, 110), "maximum allowed value is 100");
+
+        // warn threshold larger than fail threshold
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(60, 50),
+                          "The warn threshold 60 for data_disk_usage_percentage_warn_threshold should be lower than the fail threshold 50");
+    }
+
+    @Test
+    public void testDiskUsageState()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(10));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(56));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+
+        // shouldn't be possible to go over 100% but just to be sure
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(101));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(500));
+    }
+
+    @Test
+    public void testDiskUsageDetectorWarnDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(0));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageDetectorFailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, -1);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(80));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageGuardrailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, -1);
+
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(0));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(60));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testMemtableSizeIncluded() throws Throwable
+    {
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        createTable(keyspace(), "CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = { 'enabled': false }");
+
+        long memtableSizeBefore = monitor.getAllMemtableSize();
+        int rows = 10;
+        int mb = 1024 * 1024;
+
+        for (int i = 0; i < rows; i++)
+        {
+            char[] chars = new char[mb];
+            Arrays.fill(chars, (char) i);
+            String value = String.copyValueOf(chars);
+            execute("INSERT INTO %s (k, v) VALUES(?, ?)", i, value);
+        }
+
+        // verify memtables are included
+        long memtableSizeAfterInsert = monitor.getAllMemtableSize();
+        assertTrue(String.format("Expect at least 10MB more data, but got before: %s and after: %d",
+                                 memtableSizeBefore, memtableSizeAfterInsert),
+                   memtableSizeAfterInsert - memtableSizeBefore >= rows * mb);
+
+        // verify memtable size are reduced after flush
+        flush();
+        long memtableSizeAfterFlush = monitor.getAllMemtableSize();
+        assertEquals(memtableSizeBefore, memtableSizeAfterFlush, mb);
+    }
+
+    @Test
+    public void testMonitorLogsOnStateChange()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        Guardrails.localDataDiskUsage.resetLastNotifyTime();
+
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+
+        // transit to STUFFED, expect warning
+        assertMonitorStateTransition(0.50001, STUFFED, monitor, true, "Local data disk usage 51%(Stuffed) exceeds warning threshold of 50%");
+
+        // remain as STUFFED, no logging because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to FULL, expect failure
+        assertMonitorStateTransition(0.90001, FULL, monitor, false, "Local data disk usage 91%(Full) exceeds failure threshold of 90%, will stop accepting writes");
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(0.99, FULL, monitor);
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(5.0, FULL, monitor);
+
+        // transit back to STUFFED, no warning  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit back to FULL, no logging  because of min log interval
+        assertMonitorStateTransition(0.900001, FULL, monitor);
+
+        // transit back to STUFFED, no logging  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+    }
+
+    @Test
+    public void testDiskUsageBroadcaster() throws UnknownHostException
+    {
+        DiskUsageBroadcaster broadcaster = new DiskUsageBroadcaster(null);
+        Gossiper.instance.unregister(broadcaster);
+
+        InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.1");
+        InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.2");
+        InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.3");
+
+        // initially it's NOT_AVAILABLE
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+        assertFalse(broadcaster.isFull(node2));
+        assertFalse(broadcaster.isFull(node3));
+
+        // adding 1st node: Spacious, cluster has no Full node
+        broadcaster.onChange(node1, ApplicationState.DISK_USAGE, value(SPACIOUS));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+
+        // adding 2nd node with wrong ApplicationState
+        broadcaster.onChange(node2, ApplicationState.RACK, value(FULL));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node2));
+
+        // adding 2nd node: STUFFED
+        broadcaster.onChange(node2, ApplicationState.DISK_USAGE, value(STUFFED));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isStuffed(node2));
+
+        // adding 3rd node: FULL
+        broadcaster.onChange(node3, ApplicationState.DISK_USAGE, value(FULL));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isFull(node3));
+
+        // remove 2nd node, cluster has Full node
+        broadcaster.onRemove(node2);
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isStuffed(node2));
+
+        // remove 3nd node, cluster has no Full node
+        broadcaster.onRemove(node3);
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node3));
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSize() throws IOException
+    {
+        Directories.DataDirectory directory = mock(Directories.DataDirectory.class);
+        when(directory.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+        FileStore store = mock(FileStore.class);
+        when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 5).toBytes()); // 100GiB disk
+
+        Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create();
+        directories.put(store, directory);
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.05);
+
+        // 5G are used of 10G
+        guardrails().setDataDiskUsageMaxDiskSize("10GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // max disk size = space used
+        guardrails().setDataDiskUsageMaxDiskSize("5GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // max disk size < space used
+        guardrails().setDataDiskUsageMaxDiskSize("1GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSizeAndSmallUnits() throws IOException
+    {
+        FileStore store = mock(FileStore.class);
+        long freeDiskSizeInBytes = DataStorageSpec.inGibibytes(100).toBytes() - DataStorageSpec.inMebibytes(5).toBytes();
+        when(store.getUsableSpace()).thenReturn(DataStorageSpec.inBytes(freeDiskSizeInBytes).toBytes()); // 100GiB disk

Review Comment:
   Maybe good to add to the comment 5GiB used out of 100GiB disk



##########
src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+    public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+    private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+    private final Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier;
+
+    private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+    @VisibleForTesting
+    public DiskUsageMonitor()
+    {
+        this.dataDirectoriesSupplier = DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+    }
+
+    @VisibleForTesting
+    public DiskUsageMonitor(Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier)
+    {
+        this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+    }
+
+    /**
+     * Start monitoring local disk usage and call notifier when local disk usage state changed.
+     */
+    public void start(Consumer<DiskUsageState> notifier)
+    {
+        // start the scheduler regardless guardrail is enabled, so we can enable it later without a restart
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+            if (!Guardrails.localDataDiskUsage.enabled(null))
+                return;
+
+            updateLocalState(getDiskUsage(), notifier);
+        }, 0, CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    public void updateLocalState(double usageRatio, Consumer<DiskUsageState> notifier)
+    {
+        double percentage = usageRatio * 100;
+        long percentageCeiling = (long) Math.ceil(percentage);
+
+        DiskUsageState state = getState(percentageCeiling);
+
+        Guardrails.localDataDiskUsage.guard(percentageCeiling, state.toString(), false, null);
+
+        // if state remains unchanged, no need to notify peers
+        if (state == localState)
+            return;
+
+        localState = state;
+        notifier.accept(state);
+    }
+
+    /**
+     * @return local node disk usage state
+     */
+    @VisibleForTesting
+    public DiskUsageState state()
+    {
+        return localState;
+    }
+
+    /**
+     * @return disk usage (including all memtable sizes) ratio
+     */
+    @VisibleForTesting
+    public double getDiskUsage()
+    {
+        // using BigDecimal to handle large file system
+        BigDecimal used = BigDecimal.ZERO; // space used by data directories
+        BigDecimal usable = BigDecimal.ZERO; // free space on disks
+
+        for (Map.Entry<FileStore, Collection<Directories.DataDirectory>> e : dataDirectoriesSupplier.get().asMap().entrySet())
+        {
+            usable = usable.add(BigDecimal.valueOf(usableSpace(e.getKey())));
+
+            for (Directories.DataDirectory dir : e.getValue())
+                used = used.add(BigDecimal.valueOf(dir.getRawSize()));
+        }
+
+        // The total disk size for data directories is the space that is actually used by those directories plus the
+        // free space on disk that might be used for storing those directories in the future.
+        BigDecimal total = used.add(usable);
+
+        // That total space can be limited by the config property data_disk_usage_max_disk_size.
+        DataStorageSpec diskUsageMaxSize = guardrailsConfigSupplier.get().getDataDiskUsageMaxDiskSize();

Review Comment:
   This feels to me as one of the advanced config parameters that we would hide from the yaml maybe, not sure. So with the new version of the calculations warnings can be actually faster reached. And things require more advanced knowledge and recommendations and testing in order to decide what to set. Btw we need to add also NEWS.txt



##########
test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java:
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.NOT_AVAILABLE;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.SPACIOUS;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+    private static int defaultDataDiskUsagePercentageWarnThreshold;
+    private static int defaultDataDiskUsagePercentageFailThreshold;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        defaultDataDiskUsagePercentageWarnThreshold = Guardrails.instance.getDataDiskUsagePercentageWarnThreshold();
+        defaultDataDiskUsagePercentageFailThreshold = Guardrails.instance.getDataDiskUsagePercentageFailThreshold();
+
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(-1, -1);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(defaultDataDiskUsagePercentageWarnThreshold,
+                                                                defaultDataDiskUsagePercentageFailThreshold);
+    }
+
+    @Test
+    public void testConfigValidation()
+    {
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize(null));
+        assertNull(guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0B"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0KiB"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0MiB"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0GiB"), "0 is not allowed");
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("10B"));
+        assertEquals("10B", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("20KiB"));
+        assertEquals("20KiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("30MiB"));
+        assertEquals("30MiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("40GiB"));
+        assertEquals("40GiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize(Long.MAX_VALUE + "GiB"), "are actually available on disk");
+
+        // warn threshold smaller than lower bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(0, 80), "0 is not allowed");
+
+        // fail threshold bigger than upper bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(1, 110), "maximum allowed value is 100");
+
+        // warn threshold larger than fail threshold
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(60, 50),
+                          "The warn threshold 60 for data_disk_usage_percentage_warn_threshold should be lower than the fail threshold 50");
+    }
+
+    @Test
+    public void testDiskUsageState()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(10));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(56));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+
+        // shouldn't be possible to go over 100% but just to be sure
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(101));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(500));
+    }
+
+    @Test
+    public void testDiskUsageDetectorWarnDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(0));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageDetectorFailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, -1);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(80));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageGuardrailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, -1);
+
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(0));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(60));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testMemtableSizeIncluded() throws Throwable
+    {
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        createTable(keyspace(), "CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = { 'enabled': false }");
+
+        long memtableSizeBefore = monitor.getAllMemtableSize();
+        int rows = 10;
+        int mb = 1024 * 1024;
+
+        for (int i = 0; i < rows; i++)
+        {
+            char[] chars = new char[mb];
+            Arrays.fill(chars, (char) i);
+            String value = String.copyValueOf(chars);
+            execute("INSERT INTO %s (k, v) VALUES(?, ?)", i, value);
+        }
+
+        // verify memtables are included
+        long memtableSizeAfterInsert = monitor.getAllMemtableSize();
+        assertTrue(String.format("Expect at least 10MB more data, but got before: %s and after: %d",
+                                 memtableSizeBefore, memtableSizeAfterInsert),
+                   memtableSizeAfterInsert - memtableSizeBefore >= rows * mb);
+
+        // verify memtable size are reduced after flush
+        flush();
+        long memtableSizeAfterFlush = monitor.getAllMemtableSize();
+        assertEquals(memtableSizeBefore, memtableSizeAfterFlush, mb);
+    }
+
+    @Test
+    public void testMonitorLogsOnStateChange()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        Guardrails.localDataDiskUsage.resetLastNotifyTime();
+
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+
+        // transit to STUFFED, expect warning
+        assertMonitorStateTransition(0.50001, STUFFED, monitor, true, "Local data disk usage 51%(Stuffed) exceeds warning threshold of 50%");
+
+        // remain as STUFFED, no logging because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to FULL, expect failure
+        assertMonitorStateTransition(0.90001, FULL, monitor, false, "Local data disk usage 91%(Full) exceeds failure threshold of 90%, will stop accepting writes");
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(0.99, FULL, monitor);
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(5.0, FULL, monitor);
+
+        // transit back to STUFFED, no warning  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit back to FULL, no logging  because of min log interval
+        assertMonitorStateTransition(0.900001, FULL, monitor);
+
+        // transit back to STUFFED, no logging  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+    }
+
+    @Test
+    public void testDiskUsageBroadcaster() throws UnknownHostException
+    {
+        DiskUsageBroadcaster broadcaster = new DiskUsageBroadcaster(null);
+        Gossiper.instance.unregister(broadcaster);
+
+        InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.1");
+        InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.2");
+        InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.3");
+
+        // initially it's NOT_AVAILABLE
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+        assertFalse(broadcaster.isFull(node2));
+        assertFalse(broadcaster.isFull(node3));
+
+        // adding 1st node: Spacious, cluster has no Full node
+        broadcaster.onChange(node1, ApplicationState.DISK_USAGE, value(SPACIOUS));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+
+        // adding 2nd node with wrong ApplicationState
+        broadcaster.onChange(node2, ApplicationState.RACK, value(FULL));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node2));
+
+        // adding 2nd node: STUFFED
+        broadcaster.onChange(node2, ApplicationState.DISK_USAGE, value(STUFFED));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isStuffed(node2));
+
+        // adding 3rd node: FULL
+        broadcaster.onChange(node3, ApplicationState.DISK_USAGE, value(FULL));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isFull(node3));
+
+        // remove 2nd node, cluster has Full node
+        broadcaster.onRemove(node2);
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isStuffed(node2));
+
+        // remove 3nd node, cluster has no Full node
+        broadcaster.onRemove(node3);
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node3));
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSize() throws IOException
+    {
+        Directories.DataDirectory directory = mock(Directories.DataDirectory.class);
+        when(directory.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+        FileStore store = mock(FileStore.class);
+        when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 5).toBytes()); // 100GiB disk
+
+        Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create();
+        directories.put(store, directory);
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.05);
+
+        // 5G are used of 10G
+        guardrails().setDataDiskUsageMaxDiskSize("10GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // max disk size = space used
+        guardrails().setDataDiskUsageMaxDiskSize("5GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // max disk size < space used
+        guardrails().setDataDiskUsageMaxDiskSize("1GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSizeAndSmallUnits() throws IOException

Review Comment:
   I like this test :-) 



##########
src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+    public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+    private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+    private final Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier;
+
+    private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+    @VisibleForTesting
+    public DiskUsageMonitor()
+    {
+        this.dataDirectoriesSupplier = DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+    }
+
+    @VisibleForTesting
+    public DiskUsageMonitor(Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier)
+    {
+        this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+    }
+
+    /**
+     * Start monitoring local disk usage and call notifier when local disk usage state changed.
+     */
+    public void start(Consumer<DiskUsageState> notifier)
+    {
+        // start the scheduler regardless guardrail is enabled, so we can enable it later without a restart
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+            if (!Guardrails.localDataDiskUsage.enabled(null))
+                return;
+
+            updateLocalState(getDiskUsage(), notifier);
+        }, 0, CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    public void updateLocalState(double usageRatio, Consumer<DiskUsageState> notifier)
+    {
+        double percentage = usageRatio * 100;
+        long percentageCeiling = (long) Math.ceil(percentage);
+
+        DiskUsageState state = getState(percentageCeiling);
+
+        Guardrails.localDataDiskUsage.guard(percentageCeiling, state.toString(), false, null);
+
+        // if state remains unchanged, no need to notify peers
+        if (state == localState)
+            return;
+
+        localState = state;
+        notifier.accept(state);
+    }
+
+    /**
+     * @return local node disk usage state
+     */
+    @VisibleForTesting
+    public DiskUsageState state()
+    {
+        return localState;
+    }
+
+    /**
+     * @return disk usage (including all memtable sizes) ratio
+     */
+    @VisibleForTesting
+    public double getDiskUsage()
+    {
+        // using BigDecimal to handle large file system
+        BigDecimal used = BigDecimal.ZERO; // space used by data directories
+        BigDecimal usable = BigDecimal.ZERO; // free space on disks
+
+        for (Map.Entry<FileStore, Collection<Directories.DataDirectory>> e : dataDirectoriesSupplier.get().asMap().entrySet())
+        {
+            usable = usable.add(BigDecimal.valueOf(usableSpace(e.getKey())));
+
+            for (Directories.DataDirectory dir : e.getValue())
+                used = used.add(BigDecimal.valueOf(dir.getRawSize()));
+        }
+
+        // The total disk size for data directories is the space that is actually used by those directories plus the
+        // free space on disk that might be used for storing those directories in the future.
+        BigDecimal total = used.add(usable);
+
+        // That total space can be limited by the config property data_disk_usage_max_disk_size.
+        DataStorageSpec diskUsageMaxSize = guardrailsConfigSupplier.get().getDataDiskUsageMaxDiskSize();
+        if (diskUsageMaxSize != null)
+            total = total.min(BigDecimal.valueOf(diskUsageMaxSize.toBytes()));

Review Comment:
   One more thing. Maybe a good thing will be to have a getter for the value in the default internal unit; returns the parameter value in the unit we should use internally which is the one used in Config class. In this case we use Bytes. Then everywhere internally we can use that getter ensuring we don't get tempted to use wrong conversion, going up for example and loosing precision. The reason why we have the smallest unit introduced for users to ensure they don't run into that problem with the old parameters I transferred without changing what the parameters use internally. Every parameter is different of course so we always need to think what is the best for the particular one. Nothing has changed from before in regards to thinking what should be the boundaries for a parameter that we want to accept and what units we want to encourage or not, except time/storage and rate are not negative things.. At the end of the day the new types are only wrappers that we need for a  parser but 
 upper boundary is something we still need to consider on a per case basis. 
   I am actually thinking maybe to move to abstract Boundary classes and split into Int and long ones to make people think more in particular about that... That is a topic for another ticket of course, just sharing thoughts.



##########
test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java:
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.NOT_AVAILABLE;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.SPACIOUS;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+    private static int defaultDataDiskUsagePercentageWarnThreshold;
+    private static int defaultDataDiskUsagePercentageFailThreshold;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        defaultDataDiskUsagePercentageWarnThreshold = Guardrails.instance.getDataDiskUsagePercentageWarnThreshold();
+        defaultDataDiskUsagePercentageFailThreshold = Guardrails.instance.getDataDiskUsagePercentageFailThreshold();
+
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(-1, -1);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(defaultDataDiskUsagePercentageWarnThreshold,
+                                                                defaultDataDiskUsagePercentageFailThreshold);
+    }
+
+    @Test
+    public void testConfigValidation()
+    {
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize(null));
+        assertNull(guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0B"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0KiB"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0MiB"), "0 is not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0GiB"), "0 is not allowed");
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("10B"));
+        assertEquals("10B", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("20KiB"));
+        assertEquals("20KiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("30MiB"));
+        assertEquals("30MiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("40GiB"));
+        assertEquals("40GiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize(Long.MAX_VALUE + "GiB"), "are actually available on disk");
+
+        // warn threshold smaller than lower bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(0, 80), "0 is not allowed");
+
+        // fail threshold bigger than upper bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(1, 110), "maximum allowed value is 100");
+
+        // warn threshold larger than fail threshold
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(60, 50),
+                          "The warn threshold 60 for data_disk_usage_percentage_warn_threshold should be lower than the fail threshold 50");
+    }
+
+    @Test
+    public void testDiskUsageState()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(10));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(56));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+
+        // shouldn't be possible to go over 100% but just to be sure
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(101));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(500));
+    }
+
+    @Test
+    public void testDiskUsageDetectorWarnDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(0));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageDetectorFailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, -1);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(80));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageGuardrailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, -1);
+
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(0));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(60));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testMemtableSizeIncluded() throws Throwable
+    {
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        createTable(keyspace(), "CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = { 'enabled': false }");
+
+        long memtableSizeBefore = monitor.getAllMemtableSize();
+        int rows = 10;
+        int mb = 1024 * 1024;
+
+        for (int i = 0; i < rows; i++)
+        {
+            char[] chars = new char[mb];
+            Arrays.fill(chars, (char) i);
+            String value = String.copyValueOf(chars);
+            execute("INSERT INTO %s (k, v) VALUES(?, ?)", i, value);
+        }
+
+        // verify memtables are included
+        long memtableSizeAfterInsert = monitor.getAllMemtableSize();
+        assertTrue(String.format("Expect at least 10MB more data, but got before: %s and after: %d",
+                                 memtableSizeBefore, memtableSizeAfterInsert),
+                   memtableSizeAfterInsert - memtableSizeBefore >= rows * mb);
+
+        // verify memtable size are reduced after flush
+        flush();
+        long memtableSizeAfterFlush = monitor.getAllMemtableSize();
+        assertEquals(memtableSizeBefore, memtableSizeAfterFlush, mb);
+    }
+
+    @Test
+    public void testMonitorLogsOnStateChange()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        Guardrails.localDataDiskUsage.resetLastNotifyTime();
+
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+
+        // transit to STUFFED, expect warning
+        assertMonitorStateTransition(0.50001, STUFFED, monitor, true, "Local data disk usage 51%(Stuffed) exceeds warning threshold of 50%");
+
+        // remain as STUFFED, no logging because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to FULL, expect failure
+        assertMonitorStateTransition(0.90001, FULL, monitor, false, "Local data disk usage 91%(Full) exceeds failure threshold of 90%, will stop accepting writes");
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(0.99, FULL, monitor);
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(5.0, FULL, monitor);
+
+        // transit back to STUFFED, no warning  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit back to FULL, no logging  because of min log interval
+        assertMonitorStateTransition(0.900001, FULL, monitor);
+
+        // transit back to STUFFED, no logging  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+    }
+
+    @Test
+    public void testDiskUsageBroadcaster() throws UnknownHostException
+    {
+        DiskUsageBroadcaster broadcaster = new DiskUsageBroadcaster(null);
+        Gossiper.instance.unregister(broadcaster);
+
+        InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.1");
+        InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.2");
+        InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.3");
+
+        // initially it's NOT_AVAILABLE
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+        assertFalse(broadcaster.isFull(node2));
+        assertFalse(broadcaster.isFull(node3));
+
+        // adding 1st node: Spacious, cluster has no Full node
+        broadcaster.onChange(node1, ApplicationState.DISK_USAGE, value(SPACIOUS));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+
+        // adding 2nd node with wrong ApplicationState
+        broadcaster.onChange(node2, ApplicationState.RACK, value(FULL));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node2));
+
+        // adding 2nd node: STUFFED
+        broadcaster.onChange(node2, ApplicationState.DISK_USAGE, value(STUFFED));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isStuffed(node2));
+
+        // adding 3rd node: FULL
+        broadcaster.onChange(node3, ApplicationState.DISK_USAGE, value(FULL));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isFull(node3));
+
+        // remove 2nd node, cluster has Full node
+        broadcaster.onRemove(node2);
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isStuffed(node2));
+
+        // remove 3nd node, cluster has no Full node
+        broadcaster.onRemove(node3);
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node3));
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSize() throws IOException
+    {
+        Directories.DataDirectory directory = mock(Directories.DataDirectory.class);
+        when(directory.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+        FileStore store = mock(FileStore.class);
+        when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 5).toBytes()); // 100GiB disk
+
+        Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create();
+        directories.put(store, directory);
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.05);
+
+        // 5G are used of 10G
+        guardrails().setDataDiskUsageMaxDiskSize("10GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // max disk size = space used
+        guardrails().setDataDiskUsageMaxDiskSize("5GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // max disk size < space used
+        guardrails().setDataDiskUsageMaxDiskSize("1GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSizeAndSmallUnits() throws IOException
+    {
+        FileStore store = mock(FileStore.class);
+        long freeDiskSizeInBytes = DataStorageSpec.inGibibytes(100).toBytes() - DataStorageSpec.inMebibytes(5).toBytes();
+        when(store.getUsableSpace()).thenReturn(DataStorageSpec.inBytes(freeDiskSizeInBytes).toBytes()); // 100GiB disk
+
+        Directories.DataDirectory directory = mock(Directories.DataDirectory.class);
+        when(directory.getRawSize()).thenReturn(DataStorageSpec.inMebibytes(5).toBytes());
+
+        Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create();
+        directories.put(store, directory);
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.00005);
+
+        // 5MiB are used of 10MiB
+        guardrails().setDataDiskUsageMaxDiskSize("10MiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // max disk size = space used
+        guardrails().setDataDiskUsageMaxDiskSize("5MiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // max disk size < space used
+        guardrails().setDataDiskUsageMaxDiskSize("1MiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSizeAndMultipleVolumes() throws IOException
+    {

Review Comment:
   Nice one!



##########
src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+    public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+    private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+    private final Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier;
+
+    private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+    @VisibleForTesting
+    public DiskUsageMonitor()
+    {
+        this.dataDirectoriesSupplier = DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+    }
+
+    @VisibleForTesting
+    public DiskUsageMonitor(Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier)
+    {
+        this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+    }
+
+    /**
+     * Start monitoring local disk usage and call notifier when local disk usage state changed.
+     */
+    public void start(Consumer<DiskUsageState> notifier)
+    {
+        // start the scheduler regardless guardrail is enabled, so we can enable it later without a restart
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+            if (!Guardrails.localDataDiskUsage.enabled(null))
+                return;
+
+            updateLocalState(getDiskUsage(), notifier);
+        }, 0, CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    public void updateLocalState(double usageRatio, Consumer<DiskUsageState> notifier)
+    {
+        double percentage = usageRatio * 100;
+        long percentageCeiling = (long) Math.ceil(percentage);
+
+        DiskUsageState state = getState(percentageCeiling);
+
+        Guardrails.localDataDiskUsage.guard(percentageCeiling, state.toString(), false, null);
+
+        // if state remains unchanged, no need to notify peers
+        if (state == localState)
+            return;
+
+        localState = state;
+        notifier.accept(state);
+    }
+
+    /**
+     * @return local node disk usage state
+     */
+    @VisibleForTesting
+    public DiskUsageState state()
+    {
+        return localState;
+    }
+
+    /**
+     * @return disk usage (including all memtable sizes) ratio

Review Comment:
   So looking into `getUsableSpace`, I feel it will be nice maybe to explain also on our end a bit the approximations for someone newbie? WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org