You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2021/10/05 16:14:30 UTC

[cassandra] branch trunk updated: Verify correct ownership of attached locations on disk at startup

This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e2db4d  Verify correct ownership of attached locations on disk at startup
2e2db4d is described below

commit 2e2db4dc40c4935305b9a2d5d271580e96dabe42
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Mon Aug 16 12:08:23 2021 -0400

    Verify correct ownership of attached locations on disk at startup
    
    patch by Sam Tunnicliffe; reviewed by Chris Earman, Xiaolong Jiang, and Caleb Rackliffe for CASSANDRA-16879
    
    Co-authored by Sam Tunnicliffe <sa...@beobal.com>
    Co-authored by Josh McKenzie <jm...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/service/CassandraDaemon.java  |   2 +-
 .../service/FileSystemOwnershipCheck.java          | 267 +++++++++++
 .../service/FileSystemOwnershipCheckTest.java      | 489 +++++++++++++++++++++
 4 files changed, 758 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 94ea95a..0208fc4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879)
  * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
  * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896)
  * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290)
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 4844b84..3fe672a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -217,7 +217,7 @@ public class CassandraDaemon
     public CassandraDaemon(boolean runManaged)
     {
         this.runManaged = runManaged;
-        this.startupChecks = new StartupChecks().withDefaultTests();
+        this.startupChecks = new StartupChecks().withDefaultTests().withTest(new FileSystemOwnershipCheck());
         this.setupCompleted = false;
     }
 
diff --git a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
new file mode 100644
index 0000000..dd68d34
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.StartupException;
+
+/**
+ * Ownership markers on disk are compatible with the java property file format.
+ * (https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#load-java.io.Reader-)
+ *
+ * This simple formatting is intended to enable them to be created either
+ * manually or by automated tooling using minimal standard tools (editor/shell
+ * builtins/etc).
+ * The only mandatory property is version, which must be parseable as an int
+ * and upon which the futher set of required properties will depend.
+ *
+ * In version 1, two further property values are required:
+ * - volume_count
+ *   to be parsed as an int representing the number of mounted volumes where
+ *   a marker file is expected.
+ * - ownership_token
+ *   must contain a non-empty token string that can be compared to one
+ *   derived from system properties. For version 1, this is simply the cluster name.
+ *
+ * For this check to succeed as of version 1 then:
+ * - There must be a single properties file found in the fs tree for each
+ *   target directory.
+ * - Every file found must contain the mandatory version property with the
+ *   literal value '1'.
+ * - The value of the ownership_token property in each file must match the
+ *   cluster name
+ * - The value of the volume_count property must be an int which must matches
+ *   the number of distinct marker files found when traversing the filesystem.
+ *
+ * In overridden implementations, you will need to override {@link #constructTokenFromProperties()}
+ * and add the related *_PROPERTY values you will want the system to check on startup to confirm ownership.
+ */
+public class FileSystemOwnershipCheck implements StartupCheck
+{
+    private static final Logger logger = LoggerFactory.getLogger(FileSystemOwnershipCheck.class);
+
+    // System properties
+    static final String ENABLE_FS_OWNERSHIP_CHECK_PROPERTY      = "cassandra.enable_fs_ownership_check";
+    static final String FS_OWNERSHIP_FILENAME_PROPERTY          = "cassandra.fs_ownership_filename";
+    static final String DEFAULT_FS_OWNERSHIP_FILENAME           = ".cassandra_fs_ownership";
+
+    static final String OWNERSHIP_TOKEN                         = "CassandraOwnershipToken";
+
+    // Ownership file properties
+    static final String VERSION                                 = "version";
+    static final String VOLUME_COUNT                            = "volume_count";
+    static final String TOKEN                                   = "ownership_token";
+
+    // Error strings
+    static final String ERROR_PREFIX                            = "FS ownership check failed; ";
+    static final String MISSING_SYSTEM_PROPERTY                 = "system property '%s' required for fs ownership check not supplied";
+    static final String NO_OWNERSHIP_FILE                       = "no file found in tree for %s";
+    static final String MULTIPLE_OWNERSHIP_FILES                = "multiple files found in tree for %s";
+    static final String INCONSISTENT_FILES_FOUND                = "inconsistent ownership files found on disk: %s";
+    static final String INVALID_FILE_COUNT                      = "number of ownership files found doesn't match expected";
+    static final String MISMATCHING_TOKEN                       = "token found on disk does not match supplied";
+    static final String UNSUPPORTED_VERSION                     = "unsupported version '%s' in ownership file";
+    static final String INVALID_PROPERTY_VALUE                  = "invalid or missing value for property '%s'";
+    static final String READ_EXCEPTION                          = "error when checking for fs ownership file";
+
+    private final Supplier<Iterable<String>> dirs;
+
+    FileSystemOwnershipCheck()
+    {
+        this(() -> Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+                                    Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+                                                  DatabaseDescriptor.getSavedCachesLocation(),
+                                                  DatabaseDescriptor.getHintsDirectory().getAbsolutePath())));
+    }
+
+    @VisibleForTesting
+    FileSystemOwnershipCheck(Supplier<Iterable<String>> dirs)
+    {
+        this.dirs = dirs;
+    }
+
+    public void execute() throws StartupException
+    {
+        if (!Boolean.getBoolean(ENABLE_FS_OWNERSHIP_CHECK_PROPERTY))
+        {
+            logger.info("Filesystem ownership check is not enabled: " + ENABLE_FS_OWNERSHIP_CHECK_PROPERTY);
+            return;
+        }
+
+        String expectedToken = constructTokenFromProperties();
+        String tokenFilename = System.getProperty(FS_OWNERSHIP_FILENAME_PROPERTY, DEFAULT_FS_OWNERSHIP_FILENAME);
+        Map<String, Integer> foundPerTargetDir = new HashMap<>();
+        Map<Path, Properties> foundProperties = new HashMap<>();
+
+        // Step 1: Traverse the filesystem from each target dir upward, looking for marker files
+        for (String dataDir : dirs.get())
+        {
+            logger.info("Checking for fs ownership details in file hierarchy for {}", dataDir);
+            int foundFiles = 0;
+            Path dir = Paths.get(dataDir).normalize();
+            do
+            {
+                File tokenFile = resolve(dir, tokenFilename);
+                if (tokenFile.exists())
+                {
+                    foundFiles++;
+                    if (!foundProperties.containsKey(tokenFile.toPath().toAbsolutePath()))
+                    {
+                        try (BufferedReader reader = Files.newBufferedReader(tokenFile.toPath()))
+                        {
+                            Properties props = new Properties();
+                            props.load(reader);
+                            foundProperties.put(tokenFile.toPath().toAbsolutePath(), props);
+                        }
+                        catch (Exception e)
+                        {
+                            logger.error("Error reading fs ownership file from disk", e);
+                            throw exception(READ_EXCEPTION);
+                        }
+                    }
+                }
+                dir = dir.getParent();
+            } while (dir != null);
+
+            foundPerTargetDir.put(dataDir, foundFiles);
+        }
+
+        // If a marker file couldn't be found for every target directory, error.
+        if (foundPerTargetDir.containsValue(0))
+        {
+            throw exception(String.format(NO_OWNERSHIP_FILE, foundPerTargetDir.entrySet()
+                                                                              .stream()
+                                                                              .filter(e -> e.getValue() == 0)
+                                                                              .map(Map.Entry::getKey)
+                                                                              .collect(Collectors.joining("', '", "'", "'"))));
+        }
+
+        // If more than one marker file was found in the tree for any target directory, error
+        Set<String> multipleTokens = foundPerTargetDir.entrySet()
+                                                      .stream()
+                                                      .filter(e -> e.getValue() > 1)
+                                                      .map(Map.Entry::getKey)
+                                                      .collect(Collectors.toSet());
+        if (!multipleTokens.isEmpty())
+            throw exception(String.format(MULTIPLE_OWNERSHIP_FILES, String.join(",", multipleTokens)));
+
+        // Step 2: assert that the content of each file is identical
+        assert !foundProperties.isEmpty();
+        Multimap<Integer, Path> byHash = HashMultimap.create();
+        foundProperties.forEach((key, value) -> byHash.put(value.hashCode(), key));
+        if (byHash.keySet().size() > 1)
+        {
+            // Group identical files to highlight where the mismatches are, worst case is
+            // they're all unique, but even then the number of individual files should be low
+            throw exception(String.format(INCONSISTENT_FILES_FOUND,
+                                          byHash.keySet()
+                                                .stream()
+                                                .map(hash -> byHash.get(hash)
+                                                                   .stream()
+                                                                   .map(Path::toString)
+                                                                   .sorted()
+                                                                   .collect(Collectors.joining("', '", "['", "']")))
+                                                .sorted()
+                                                .collect(Collectors.joining(", "))));
+        }
+
+        // Step 3: validate the content of the properties from disk
+        // Currently, only version 1 is supported which requires:
+        //   volume_count       that matches the number of unique files we found
+        //   ownership_token    that matches the one constructed from system props
+        Properties fromDisk = foundProperties.entrySet().iterator().next().getValue();
+        int version = getIntProperty(fromDisk, VERSION);
+        if (version != 1)
+            throw exception(String.format(UNSUPPORTED_VERSION, version));
+
+        int volumeCount = getIntProperty(fromDisk, VOLUME_COUNT);
+        if (volumeCount != foundProperties.size())
+            throw exception(INVALID_FILE_COUNT);
+
+        String token = getRequiredProperty(fromDisk, TOKEN);
+        if (!expectedToken.equals(token))
+            throw exception(MISMATCHING_TOKEN);
+
+        logger.info("Successfully verified fs ownership");
+    }
+
+    /** In version 1, we check and return the ownership token. Extend this for custom ownership hierarchies. */
+    protected String constructTokenFromProperties() throws StartupException
+    {
+        String cluster = System.getProperty(OWNERSHIP_TOKEN);
+        if (null == cluster || cluster.isEmpty())
+            throw exception(String.format(MISSING_SYSTEM_PROPERTY, OWNERSHIP_TOKEN));
+        return cluster;
+    }
+
+    private int getIntProperty(Properties props, String key) throws StartupException
+    {
+        String val = getRequiredProperty(props, key);
+        try
+        {
+            return Integer.parseInt(val);
+        }
+        catch (NumberFormatException e)
+        {
+            throw exception(String.format(INVALID_PROPERTY_VALUE, key));
+        }
+    }
+
+    private String getRequiredProperty(Properties props, String key) throws StartupException
+    {
+        String s = props.getProperty(key);
+        if (null == s || s.isEmpty())
+            throw exception(String.format(INVALID_PROPERTY_VALUE, key));
+        return s;
+    }
+
+    private File resolve(Path dir, String filename) throws StartupException
+    {
+        try
+        {
+            return dir.resolve(filename).toFile();
+        }
+        catch (Exception e)
+        {
+            logger.error("Encountered error resolving path ownership file {} relative to dir {}", filename, dir);
+            throw exception(READ_EXCEPTION);
+        }
+    }
+
+    private StartupException exception(String message)
+    {
+        return new StartupException(StartupException.ERR_WRONG_DISK_STATE, ERROR_PREFIX + message);
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/FileSystemOwnershipCheckTest.java b/test/unit/org/apache/cassandra/service/FileSystemOwnershipCheckTest.java
new file mode 100644
index 0000000..f59b7dd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/FileSystemOwnershipCheckTest.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.exceptions.StartupException;
+
+import static org.apache.cassandra.service.FileSystemOwnershipCheck.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FileSystemOwnershipCheckTest
+{
+    private File tempDir;
+    private String token;
+
+    @Before
+    public void setup() throws IOException
+    {
+        cleanTempDir();
+        tempDir = com.google.common.io.Files.createTempDir();
+        token = makeRandomString(10);
+
+        System.setProperty(OWNERSHIP_TOKEN, token);
+        System.setProperty(ENABLE_FS_OWNERSHIP_CHECK_PROPERTY, "true");
+        System.clearProperty(FS_OWNERSHIP_FILENAME_PROPERTY);
+    }
+
+    @After
+    public void teardown() throws IOException
+    {
+        cleanTempDir();
+    }
+
+    private void cleanTempDir()
+    {
+        if (tempDir != null && tempDir.exists())
+            delete(tempDir);
+    }
+
+    private void delete(File file)
+    {
+        file.setReadable(true);
+        file.setWritable(true);
+        File[] files = file.listFiles();
+        if (files != null)
+        {
+            for (File child : files)
+            {
+                delete(child);
+            }
+        }
+        file.delete();
+    }
+
+    // tests for enabling/disabling/configuring the check
+    @Test
+    public void skipCheckIfDisabled() throws Exception
+    {
+        // no exceptions thrown from the supplier because the check is skipped
+        System.clearProperty(ENABLE_FS_OWNERSHIP_CHECK_PROPERTY);
+        checker(() -> { throw new RuntimeException("FAIL"); }).execute();
+    }
+
+    @Test
+    public void checkEnabledButClusterPropertyIsEmpty()
+    {
+        System.setProperty(OWNERSHIP_TOKEN, "");
+        executeAndFail(checker(tempDir), MISSING_SYSTEM_PROPERTY, OWNERSHIP_TOKEN);
+    }
+
+    @Test
+    public void checkEnabledButClusterPropertyIsUnset()
+    {
+        System.clearProperty(OWNERSHIP_TOKEN);
+        executeAndFail(checker(tempDir), MISSING_SYSTEM_PROPERTY, OWNERSHIP_TOKEN);
+    }
+
+    // tests for presence/absence of files in dirs
+    @Test
+    public void noRootDirectoryPresent() throws Exception
+    {
+        executeAndFail(checker("/no/such/location"), NO_OWNERSHIP_FILE, "'/no/such/location'");
+    }
+
+    @Test
+    public void noDirectoryStructureOrTokenFilePresent() throws Exception
+    {
+        // The root directory exists, but is completely empty
+        executeAndFail(checker(tempDir), NO_OWNERSHIP_FILE, quote(tempDir.getAbsolutePath()));
+    }
+
+    @Test
+    public void directoryStructureButNoTokenFiles() throws Exception
+    {
+        File childDir = new File(tempDir, "cassandra/data");
+        assertTrue(childDir.mkdirs());
+        assertTrue(childDir.exists());
+        executeAndFail(checker(childDir), NO_OWNERSHIP_FILE, quote(childDir.getAbsolutePath()));
+    }
+
+    @Test
+    public void multipleFilesFoundInSameTree() throws Exception
+    {
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir, 1, token);
+        writeFile(leafDir.getParentFile(), 1, token);
+        executeAndFail(checker(leafDir), MULTIPLE_OWNERSHIP_FILES, leafDir);
+    }
+
+    @Test
+    public void singleValidFileInEachTree() throws Exception
+    {
+        // Happy path. Each target directory has exactly 1 token file in the
+        // dir above it, they all contain the supplied token and the correct
+        // count.
+        File[] leafDirs = new File[] { mkdirs(tempDir, "d1/data"),
+                                       mkdirs(tempDir, "d2/commitlogs"),
+                                       mkdirs(tempDir, "d3/hints") };
+        for (File dir : leafDirs)
+            writeFile(dir.getParentFile(), 3, token);
+        checker(leafDirs).execute();
+    }
+
+    @Test
+    public void multipleDirsSingleTree() throws Exception
+    {
+        // Happy path. Each target directory has exactly 1 token file in the
+        // dir above it (as they all share a single parent). Each contains
+        // the supplied token and the correct count (1 in this case).
+        File[] leafDirs = new File[] { mkdirs(tempDir, "d1/data"),
+                                       mkdirs(tempDir, "d2/commitlogs"),
+                                       mkdirs(tempDir, "d3/hints") };
+        writeFile(tempDir, 1, token);
+        checker(leafDirs).execute();
+    }
+
+    @Test
+    public void someDirsContainNoFile() throws Exception
+    {
+        File leafDir1 = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir1, 3, token);
+        File leafDir2 = mkdirs(tempDir, "cassandra/commitlogs");
+        writeFile(leafDir2, 3, token);
+        File leafDir3 = mkdirs(tempDir, "cassandra/hints");
+
+        executeAndFail(checker(leafDir1, leafDir2, leafDir3),
+                       NO_OWNERSHIP_FILE,
+                       quote(leafDir3.getAbsolutePath()));
+    }
+
+    @Test
+    public void propsFileUnreadable() throws Exception
+    {
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        File tokenFile = writeFile(leafDir.getParentFile(), 1, token);
+        assertTrue(tokenFile.setReadable(false));
+        executeAndFail(checker(leafDir),
+                       READ_EXCEPTION,
+                       leafDir.getAbsolutePath());
+    }
+
+    @Test
+    public void propsFileIllegalContent() throws Exception
+    {
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        File propsFile = new File(leafDir, DEFAULT_FS_OWNERSHIP_FILENAME);
+        assertTrue(propsFile.createNewFile());
+        try (OutputStream os = Files.newOutputStream(propsFile.toPath()))
+        {
+            os.write(makeRandomString(40).getBytes());
+        }
+        assertTrue(propsFile.canRead());
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, VERSION),
+                       leafDir.getAbsolutePath());
+    }
+
+    @Test
+    public void propsParentDirUnreadable() throws Exception
+    {
+        // The props file itself is readable, but its dir is not
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir, 1, token);
+        assertTrue(leafDir.setReadable(false));
+        checker(leafDir).execute();
+    }
+
+    @Test
+    public void propsParentDirUntraversable() throws Exception
+    {
+        // top level dir can't be listed, so no files are found
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), 1, token);
+        assertTrue(tempDir.setExecutable(false));
+        executeAndFail(checker(leafDir),
+                       NO_OWNERSHIP_FILE,
+                       quote(leafDir.getAbsolutePath()));
+    }
+
+    @Test
+    public void overrideFilename() throws Exception
+    {
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), "other_file", makeProperties(1, 1, token));
+        executeAndFail(checker(leafDir), NO_OWNERSHIP_FILE, quote(leafDir.getAbsolutePath()));
+        System.setProperty(FS_OWNERSHIP_FILENAME_PROPERTY, "other_file");
+        checker(leafDir).execute();
+    }
+
+    // check consistency between discovered files
+    @Test
+    public void differentTokensFoundInTrees() throws Exception
+    {
+        File file1 = writeFile(mkdirs(tempDir, "d1/data"), 3, token);
+        File file2 = writeFile(mkdirs(tempDir, "d2/commitlogs"), 3, token);
+        File file3 = writeFile(mkdirs(tempDir, "d3/hints"), 3, "mismatchingtoken");
+        String errorSuffix = String.format("['%s', '%s'], ['%s']",
+                                           file1.getAbsolutePath(),
+                                           file2.getAbsolutePath(),
+                                           file3.getAbsolutePath());
+
+        executeAndFail(checker(file1.getParentFile(), file2.getParentFile(), file3.getParentFile()),
+                       INCONSISTENT_FILES_FOUND,
+                       errorSuffix);
+    }
+
+    @Test
+    public void differentExpectedCountsFoundInTrees() throws Exception
+    {
+        File file1 = writeFile(mkdirs(tempDir, "d1/data"), 1, token);
+        File file2 = writeFile(mkdirs(tempDir, "d2/commitlogs"), 2, token);
+        File file3 = writeFile(mkdirs(tempDir, "d3/hints"), 3, "mismatchingtoken");
+        String errorSuffix = String.format("['%s'], ['%s'], ['%s']",
+                                           file1.getAbsolutePath(),
+                                           file2.getAbsolutePath(),
+                                           file3.getAbsolutePath());
+        executeAndFail(checker(file1.getParentFile(), file2.getParentFile(), file3.getParentFile()),
+                       INCONSISTENT_FILES_FOUND,
+                       errorSuffix);
+    }
+
+    // tests on property values in discovered files
+    @Test
+    public void emptyPropertiesFile() throws Exception
+    {
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, new Properties());
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, VERSION),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void missingVersionProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VOLUME_COUNT, "1");
+        p.setProperty(TOKEN, "foo");
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, VERSION),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void nonNumericVersionProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "abc");
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, VERSION),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void unsupportedVersionProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "99");
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        executeAndFail(checker(leafDir),
+                       String.format(UNSUPPORTED_VERSION, "99"),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void missingVolumeCountProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "1");
+        p.setProperty(TOKEN, token);
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, VOLUME_COUNT),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void nonNumericVolumeCountProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "1");
+        p.setProperty(VOLUME_COUNT, "bar");
+        p.setProperty(TOKEN, token);
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, VOLUME_COUNT),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void missingTokenProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "1");
+        p.setProperty(VOLUME_COUNT, "1");
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, TOKEN),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void emptyTokenProp() throws Exception
+    {
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), 1, "");
+        executeAndFail(checker(leafDir),
+                       String.format(INVALID_PROPERTY_VALUE, TOKEN),
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void mismatchingTokenProp() throws Exception
+    {
+        // Ownership token file exists in parent, but content doesn't match property
+        File leafDir = mkdirs(tempDir, "cassandra/data");
+        writeFile(leafDir.getParentFile(), 1, makeRandomString(15));
+        executeAndFail(checker(leafDir),
+                       MISMATCHING_TOKEN,
+                       leafDir.getParentFile().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+
+    // Validate volume_count prop values match number of files found
+    @Test
+    public void expectedVolumeCountMoreThanActual() throws Exception
+    {
+        // The files on disk indicate that we should expect 2 ownership files,
+        // but we only read 1, implying a disk mount is missing
+        File[] leafDirs = new File[] { mkdirs(tempDir, "d1/data"),
+                                       mkdirs(tempDir, "d2/commitlogs"),
+                                       mkdirs(tempDir, "d3/hints") };
+        writeFile(tempDir, 2, token);
+        executeAndFail(checker(leafDirs), INVALID_FILE_COUNT);
+    }
+
+    @Test
+    public void expectedVolumeCountLessThanActual() throws Exception
+    {
+        // The files on disk indicate that we should expect 1 ownership file,
+        // but we read 2, implying a extra unexpected disk mount is mounted
+        File leafDir1 = mkdirs(tempDir, "d1/data");
+        writeFile(leafDir1, 1, token);
+        File leafDir2 = mkdirs(tempDir, "d2/commitlogs");
+        writeFile(leafDir2, 1, token);
+        executeAndFail(checker(leafDir1, leafDir2), INVALID_FILE_COUNT);
+    }
+
+    private static void executeAndFail(FileSystemOwnershipCheck checker, String messageTemplate, Object...messageArgs)
+    {
+        try
+        {
+            checker.execute();
+            fail("Expected an exception but none thrown");
+        } catch (StartupException e) {
+            String expected = ERROR_PREFIX + String.format(messageTemplate, messageArgs);
+            assertEquals(expected, e.getMessage());
+        }
+    }
+
+    private static Properties makeProperties(int version, int volumeCount, String token)
+    {
+        Properties props = new Properties();
+        props.setProperty(VERSION, Integer.toString(version));
+        props.setProperty(VOLUME_COUNT, Integer.toString(volumeCount));
+        props.setProperty(TOKEN, token);
+        return props;
+    }
+
+    private static File writeFile(File dir, int volumeCount, String token) throws IOException
+    {
+        return writeFile(dir, DEFAULT_FS_OWNERSHIP_FILENAME, 1, volumeCount, token);
+    }
+
+    private static File writeFile(File dir, final String filename, int version, int volumeCount, String token)
+    throws IOException
+    {
+        return writeFile(dir, filename, makeProperties(version, volumeCount, token));
+    }
+
+    private static File writeFile(File dir, String filename, Properties props) throws IOException
+    {
+        File tokenFile = new File(dir, filename);
+        assertTrue(tokenFile.createNewFile());
+        try (OutputStream os = Files.newOutputStream(tokenFile.toPath()))
+        {
+            props.store(os, "Test properties");
+        }
+        assertTrue(tokenFile.canRead());
+        return tokenFile;
+    }
+
+    private static File mkdirs(File parent, String path)
+    {
+        File childDir = new File(parent, path);
+        assertTrue(childDir.mkdirs());
+        assertTrue(childDir.exists());
+        return childDir;
+    }
+
+    private static FileSystemOwnershipCheck checker(Supplier<Iterable<String>> dirs)
+    {
+        return new FileSystemOwnershipCheck(dirs);
+    }
+
+    private static FileSystemOwnershipCheck checker(File...dirs)
+    {
+        return checker(() -> Arrays.stream(dirs).map(File::getAbsolutePath).collect(Collectors.toList()));
+    }
+
+    private static FileSystemOwnershipCheck checker(String...dirs)
+    {
+        return checker(() -> Arrays.asList(dirs));
+    }
+
+    public static String makeRandomString(int length)
+    {
+        Random random = new Random();
+        char[] chars = new char[length];
+        for (int i = 0; i < length; ++i)
+            chars[i] = (char) ('a' + random.nextInt('z' - 'a' + 1));
+        return new String(chars);
+    }
+
+    private String quote(String toQuote)
+    {
+        return String.format("'%s'", toQuote);
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org