You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2021/02/04 14:05:12 UTC

[cassandra] branch trunk updated: Improve system tables handling in case of disk failures

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f69b11e  Improve system tables handling in case of disk failures
f69b11e is described below

commit f69b11eee9605add3a006de46eedb773a984d90b
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Thu Mar 19 12:57:28 2020 +0100

    Improve system tables handling in case of disk failures
    
    patch by Benjamin Lerer; reviewed by Andrés de la Peña and  Marcus
    Eriksson for CASSANDRA-14793
---
 .circleci/config.yml                               |  97 +++++++++++
 .circleci/config.yml.HIGHRES                       |  98 +++++++++++
 .circleci/config.yml.LOWRES                        |  97 +++++++++++
 .circleci/config.yml.MIDRES                        |  97 +++++++++++
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 build.xml                                          |  38 +++++
 conf/cassandra.yaml                                |   7 +
 doc/source/configuration/cass_yaml_file.rst        |  11 ++
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  97 +++++++++--
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 110 +++++++++++--
 src/java/org/apache/cassandra/db/Directories.java  | 183 +++++++++++++++------
 .../apache/cassandra/db/DiskBoundaryManager.java   |   1 -
 .../org/apache/cassandra/db/SystemKeyspace.java    |  13 +-
 .../apache/cassandra/io/FSDiskFullWriteError.java  |  10 +-
 ...or.java => FSNoDiskAvailableForWriteError.java} |  20 +--
 src/java/org/apache/cassandra/io/FSWriteError.java |   5 +
 .../org/apache/cassandra/io/util/FileUtils.java    |  68 ++++++++
 .../apache/cassandra/service/CassandraDaemon.java  |  89 +++++++++-
 .../cassandra/service/DefaultFSErrorHandler.java   |  16 +-
 .../apache/cassandra/service/StartupChecks.java    |   1 +
 .../apache/cassandra/service/StorageService.java   |  24 ++-
 .../cassandra/service/StorageServiceMBean.java     |  14 ++
 test/conf/system_keyspaces_directory.yaml          |   1 +
 .../cassandra/distributed/impl/Instance.java       |   1 +
 .../cassandra/OffsetAwareConfigurationLoader.java  |   3 +
 .../org/apache/cassandra/db/DirectoriesTest.java   |  97 ++++++++---
 .../apache/cassandra/io/util/FileUtilsTest.java    |  92 +++++++++++
 .../apache/cassandra/tools/ClearSnapshotTest.java  |   2 +-
 .../cassandra/tools/NodeToolTPStatsTest.java       |   2 +-
 31 files changed, 1179 insertions(+), 130 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index 68f5691..5b158fc 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -2183,6 +2183,97 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
+    resource_class: medium
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 4
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently executed on every circle container
+          # so the goal here is to get the circleci script to return the tests *this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of containers we have
+          set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' '{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' '{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory -Dtest.timeout="$test_timeout" -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  -Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_dtest_jars_build:
     docker:
     - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
@@ -2289,6 +2380,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/.circleci/config.yml.HIGHRES b/.circleci/config.yml.HIGHRES
index 56e28ec..026afed 100644
--- a/.circleci/config.yml.HIGHRES
+++ b/.circleci/config.yml.HIGHRES
@@ -2183,6 +2183,98 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
+    resource_class: xlarge
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 100
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently executed on every circle container
+          # so the goal here is to get the circleci script to return the tests *this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of containers we have
+          set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' '{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' '{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory -Dtest.timeout="$test_timeout" -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  -Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+
   j8_dtest_jars_build:
     docker:
     - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
@@ -2289,6 +2381,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/.circleci/config.yml.LOWRES b/.circleci/config.yml.LOWRES
index 68f5691..5b158fc 100644
--- a/.circleci/config.yml.LOWRES
+++ b/.circleci/config.yml.LOWRES
@@ -2183,6 +2183,97 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
+    resource_class: medium
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 4
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently executed on every circle container
+          # so the goal here is to get the circleci script to return the tests *this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of containers we have
+          set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' '{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' '{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory -Dtest.timeout="$test_timeout" -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  -Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_dtest_jars_build:
     docker:
     - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
@@ -2289,6 +2380,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/.circleci/config.yml.MIDRES b/.circleci/config.yml.MIDRES
index f9e1843..5d98643 100644
--- a/.circleci/config.yml.MIDRES
+++ b/.circleci/config.yml.MIDRES
@@ -2183,6 +2183,97 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
+    resource_class: medium
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 25
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently executed on every circle container
+          # so the goal here is to get the circleci script to return the tests *this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of containers we have
+          set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' '{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' '{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory -Dtest.timeout="$test_timeout" -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  -Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_dtest_jars_build:
     docker:
     - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210105
@@ -2289,6 +2380,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/CHANGES.txt b/CHANGES.txt
index 323ba0e..73bfff3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Improve system tables handling in case of disk failures (CASSANDRA-14793)
  * Add access and datacenters to unreserved keywords (CASSANDRA-16398)
  * Fix nodetool ring, status output when DNS resolution or port printing are in use (CASSANDRA-16283)
  * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365)
diff --git a/NEWS.txt b/NEWS.txt
index 0002a9e..3e8193f 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,14 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - The data of the system keyspaces using a local strategy (at the exception of the system.batches,
+      system.paxos, system.compaction_history, system.prepared_statements and system.repair tables)
+      is now stored by default in the first data directory, instead of being distributed among all
+      the data directories. This approach will allow the server to tolerate the failure of the other disks.
+      To ensure that a disk failure will not bring a node down, it is possible to use the system_data_file_directory
+      yaml property to store the local system keyspaces data on a directory that provides redundancy.
+      On node startup the local system keyspaces data will be automatically migrated if needed to the
+      correct location.
     - Nodes will now bootstrap all intra-cluster connections at startup by default and wait
       10 seconds for the all but one node in the local data center to be connected and marked
       UP in gossip. This prevents nodes from coordinating requests and failing because they
diff --git a/build.xml b/build.xml
index 523ee90..b2d0919 100644
--- a/build.xml
+++ b/build.xml
@@ -1529,6 +1529,7 @@
       <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/commitlog${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/cdc_raw${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/data${fileSep}@{poffset}"/>
+      <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/system_data${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/saved_caches${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/hints${fileSep}@{poffset}"/>
     </sequential>
@@ -1609,6 +1610,28 @@
     </sequential>
   </macrodef>
 
+  <macrodef name="testlist-system-keyspace-directory">
+    <attribute name="test.file.list" />
+    <attribute name="testlist.offset" />
+    <sequential>
+      <property name="system_keyspaces_directory_yaml" value="${build.test.dir}/cassandra.system.yaml"/>
+      <concat destfile="${system_keyspaces_directory_yaml}">
+        <fileset file="${test.conf}/cassandra.yaml"/>
+        <fileset file="${test.conf}/system_keyspaces_directory.yaml"/>
+      </concat>
+      <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}"
+                       exclude="**/*.java" timeout="${test.timeout}" testtag="system_keyspace_directory">
+        <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+        <jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
+        <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
+        <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
+        <jvmarg value="-Dcassandra.config=file:///${system_keyspaces_directory_yaml}"/>
+        <jvmarg value="-Dcassandra.skip_sync=true" />
+        <jvmarg value="-Dcassandra.config.loader=org.apache.cassandra.OffsetAwareConfigurationLoader"/>
+      </testmacrohelper>
+    </sequential>
+  </macrodef>
+
   <!--
     Run named ant task with jacoco, such as "ant jacoco-run -Dtaskname=test"
     the target run must enable the jacoco agent if usejacoco is 'yes' -->
@@ -1676,6 +1699,14 @@
     <testparallel testdelegate="testlist-cdc" />
   </target>
 
+  <target name="test-system-keyspace-directory" depends="build-test" description="Execute unit tests with a system keyspaces directory configured">
+    <path id="all-test-classes-path">
+      <fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+    </path>
+    <property name="all-test-classes" refid="all-test-classes-path"/>
+    <testparallel testdelegate="testlist-system-keyspace-directory" />
+  </target>
+
   <target name="msg-ser-gen-test" depends="build-test" description="Generates message serializations">
     <testmacro inputdir="${test.unit.src}"
         timeout="${test.timeout}" filter="**/SerializationsTest.java">
@@ -2056,6 +2087,13 @@
       <property name="all-test-classes" refid="all-test-classes-path"/>
       <testparallel testdelegate="testlist-cdc"/>
   </target>
+  <target name="testclasslist-system-keyspace-directory" depends="build-test" description="Parallel-run tests given in file -Dtest.classlistfile (one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)">
+      <path id="all-test-classes-path">
+          <fileset dir="${test.dir}/${test.classlistprefix}" includesfile="${test.classlistfile}"/>
+      </path>
+      <property name="all-test-classes" refid="all-test-classes-path"/>
+      <testparallel testdelegate="testlist-system-keyspace-directory"/>
+  </target>
 
   <!-- In-JVM dtest targets -->
   <target name="list-jvm-dtests" depends="build-test">
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5a67071..c704437 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,6 +206,13 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # data_file_directories:
 #     - /var/lib/cassandra/data
 
+# Directory were Cassandra should store the data of the local system keyspaces.
+# By default Cassandra will store the data of the local system keyspaces in the first of the data directories specified
+# by data_file_directories.
+# This approach ensures that if one of the other disks is lost Cassandra can continue to operate. For extra security
+# this setting allows to store those data on a different directory that provides redundancy.
+# local_system_data_file_directory:
+
 # commit log.  when running on magnetic HDD, this should be a
 # separate spindle than the data directories.
 # If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
diff --git a/doc/source/configuration/cass_yaml_file.rst b/doc/source/configuration/cass_yaml_file.rst
index 49471ef..e3babbc 100644
--- a/doc/source/configuration/cass_yaml_file.rst
+++ b/doc/source/configuration/cass_yaml_file.rst
@@ -319,6 +319,17 @@ If not set, the default directory is $CASSANDRA_HOME/data/data.
 
     #     - /var/lib/cassandra/data
 
+``local_system_data_file_directory``
+-------------------------
+*This option is commented out by default.*
+
+Directory were Cassandra should store the data of the local system keyspaces.
+By default Cassandra will store the data of the local system keyspaces (at the exception of the system.batches,
+system.paxos, system.compaction_history, system.prepared_statements and system.repair tables) in the first of the data
+directories specified by data_file_directories.
+This approach ensures that if one of the other disks is lost Cassandra can continue to operate. For extra security
+this setting allows to store those data on a different directory that provides redundancy.
+
 ``commitlog_directory``
 -----------------------
 *This option is commented out by default.*
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index dec37ef..98d9994 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -234,6 +234,12 @@ public class Config
 
     public String[] data_file_directories = new String[0];
 
+    /**
+     * The directory to use for storing the system keyspaces data.
+     * If unspecified the data will be stored in the first of the data_file_directories.
+     */
+    public String local_system_data_file_directory;
+
     public String saved_caches_directory;
 
     // Commit Log
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 7ddba08..90716b6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -72,6 +72,7 @@ import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CacheService.CacheType;
 import org.apache.cassandra.utils.FBUtilities;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -612,6 +613,8 @@ public class DatabaseDescriptor
         {
             if (datadir == null)
                 throw new ConfigurationException("data_file_directories must not contain empty entry", false);
+            if (datadir.equals(conf.local_system_data_file_directory))
+                throw new ConfigurationException("local_system_data_file_directory must not be the same as any data_file_directories", false);
             if (datadir.equals(conf.commitlog_directory))
                 throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories", false);
             if (datadir.equals(conf.hints_directory))
@@ -619,21 +622,28 @@ public class DatabaseDescriptor
             if (datadir.equals(conf.saved_caches_directory))
                 throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories", false);
 
-            try
-            {
-                dataFreeBytes = saturatedSum(dataFreeBytes, guessFileStore(datadir).getUnallocatedSpace());
-            }
-            catch (IOException e)
-            {
-                logger.debug("Error checking disk space", e);
-                throw new ConfigurationException(String.format("Unable to check disk space available to %s. Perhaps the Cassandra user does not have the necessary permissions",
-                                                               datadir), e);
-            }
+            dataFreeBytes = saturatedSum(dataFreeBytes, getUnallocatedSpace(datadir));
         }
         if (dataFreeBytes < 64 * ONE_GB) // 64 GB
             logger.warn("Only {} free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots",
                         FBUtilities.prettyPrintMemory(dataFreeBytes));
 
+        if (conf.local_system_data_file_directory != null)
+        {
+            if (conf.local_system_data_file_directory.equals(conf.commitlog_directory))
+                throw new ConfigurationException("local_system_data_file_directory must not be the same as the commitlog_directory", false);
+            if (conf.local_system_data_file_directory.equals(conf.saved_caches_directory))
+                throw new ConfigurationException("local_system_data_file_directory must not be the same as the saved_caches_directory", false);
+            if (conf.local_system_data_file_directory.equals(conf.hints_directory))
+                throw new ConfigurationException("local_system_data_file_directory must not be the same as the hints_directory", false);
+
+            long freeBytes = getUnallocatedSpace(conf.local_system_data_file_directory);
+
+            if (freeBytes < ONE_GB)
+                logger.warn("Only {} free in the system data volume. Consider adding more capacity or removing obsolete snapshots",
+                            FBUtilities.prettyPrintMemory(freeBytes));
+        }
+
         if (conf.commitlog_directory.equals(conf.saved_caches_directory))
             throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory", false);
         if (conf.commitlog_directory.equals(conf.hints_directory))
@@ -1200,6 +1210,20 @@ public class DatabaseDescriptor
         }
     }
 
+    private static long getUnallocatedSpace(String directory)
+    {
+        try
+        {
+            return guessFileStore(directory).getUnallocatedSpace();
+        }
+        catch (IOException e)
+        {
+            logger.debug("Error checking disk space", e);
+            throw new ConfigurationException(String.format("Unable to check disk space available to %s. Perhaps the Cassandra user does not have the necessary permissions",
+                                                           directory), e);
+        }
+    }
+
     public static IEndpointSnitch createEndpointSnitch(boolean dynamic, String snitchClassName) throws ConfigurationException
     {
         if (!snitchClassName.contains("."))
@@ -1367,6 +1391,9 @@ public class DatabaseDescriptor
             for (String dataFileDirectory : conf.data_file_directories)
                 FileUtils.createDirectory(dataFileDirectory);
 
+            if (conf.local_system_data_file_directory != null)
+                FileUtils.createDirectory(conf.local_system_data_file_directory);
+
             if (conf.commitlog_directory == null)
                 throw new ConfigurationException("commitlog_directory must be specified", false);
             FileUtils.createDirectory(conf.commitlog_directory);
@@ -1749,7 +1776,7 @@ public class DatabaseDescriptor
 
     public static int getFlushWriters()
     {
-            return conf.memtable_flush_writers;
+        return conf.memtable_flush_writers;
     }
 
     public static int getConcurrentCompactors()
@@ -1830,11 +1857,57 @@ public class DatabaseDescriptor
         conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value;
     }
 
-    public static String[] getAllDataFileLocations()
+    /**
+     * Checks if the local system data must be stored in a specific location which supports redundancy.
+     *
+     * @return {@code true} if the local system keyspaces data must be stored in a different location,
+     * {@code false} otherwise.
+     */
+    public static boolean useSpecificLocationForLocalSystemData()
+    {
+        return conf.local_system_data_file_directory != null;
+    }
+
+    /**
+     * Returns the locations where the local system keyspaces data should be stored.
+     *
+     * <p>If the {@code local_system_data_file_directory} was unspecified, the local system keyspaces data should be stored
+     * in the first data directory. This approach guarantees that the server can tolerate the lost of all the disks but the first one.</p>
+     *
+     * @return the locations where should be stored the local system keyspaces data
+     */
+    public static String[] getLocalSystemKeyspacesDataFileLocations()
+    {
+        if (useSpecificLocationForLocalSystemData())
+            return new String[] {conf.local_system_data_file_directory};
+
+        return conf.data_file_directories.length == 0  ? conf.data_file_directories
+                                                       : new String[] {conf.data_file_directories[0]};
+    }
+
+    /**
+     * Returns the locations where the non local system keyspaces data should be stored.
+     *
+     * @return the locations where the non local system keyspaces data should be stored.
+     */
+    public static String[] getNonLocalSystemKeyspacesDataFileLocations()
     {
         return conf.data_file_directories;
     }
 
+    /**
+     * Returns the list of all the directories where the data files can be stored (for local system and non local system keyspaces).
+     *
+     * @return the list of all the directories where the data files can be stored.
+     */
+    public static String[] getAllDataFileLocations()
+    {
+        if (conf.local_system_data_file_directory == null)
+            return conf.data_file_directories;
+
+        return ArrayUtils.addFirst(conf.data_file_directories, conf.local_system_data_file_directory);
+    }
+
     public static String getCommitLogLocation()
     {
         return conf.commitlog_directory;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d4818ac..64ea6a6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -115,20 +115,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                                              new NamedThreadFactory("MemtableFlushWriter"),
                                                                                              "internal");
 
-    private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
-
-    static
-    {
-        for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++)
-        {
-            perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                                                        Stage.KEEP_ALIVE_SECONDS,
-                                                                        TimeUnit.SECONDS,
-                                                                        new LinkedBlockingQueue<Runnable>(),
-                                                                        new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
-                                                                        "internal");
-        }
-    }
+    private static final PerDiskFlushExecutors perDiskflushExecutors = new PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(),
+                                                                                                 DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(),
+                                                                                                 DatabaseDescriptor.useSpecificLocationForLocalSystemData());
 
     // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
     private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
@@ -232,9 +221,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
     {
-        List<ExecutorService> executors = new ArrayList<>(perDiskflushExecutors.length + 3);
+        List<ExecutorService> executors = new ArrayList<>();
         Collections.addAll(executors, reclaimExecutor, postFlushExecutor, flushExecutor);
-        Collections.addAll(executors, perDiskflushExecutors);
+        perDiskflushExecutors.appendAllExecutors(executors);
         ExecutorUtils.shutdownAndWait(timeout, unit, executors);
     }
 
@@ -1108,9 +1097,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 {
                     // flush the memtable
                     flushRunnables = memtable.flushRunnables(txn);
+                    ExecutorService[] executors = perDiskflushExecutors.getExecutorsFor(keyspace.getName(), name);
 
                     for (int i = 0; i < flushRunnables.size(); i++)
-                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+                        futures.add(executors[i].submit(flushRunnables.get(i)));
 
                     /**
                      * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
@@ -2806,4 +2796,88 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         return neverPurgeTombstones;
     }
-}
\ No newline at end of file
+
+    /**
+     * The thread pools used to flush memtables.
+     *
+     * <p>Each disk has its own set of thread pools to perform memtable flushes.</p>
+     * <p>Based on the configuration. Local system keyspaces can have their own disk
+     * to allow for special redundancy mechanism. If it is the case the executor services returned for
+     * local system keyspaces will be different from the ones for the other keyspaces.</p>
+     */
+    private static final class PerDiskFlushExecutors
+    {
+        /**
+         * The flush executors for non local system keyspaces.
+         */
+        private final ExecutorService[] nonLocalSystemflushExecutors;
+
+        /**
+         * The flush executors for the local system keyspaces.
+         */
+        private final ExecutorService[] localSystemDiskFlushExecutors;
+
+        /**
+         * {@code true} if local system keyspaces are stored in their own directory and use an extra flush executor,
+         * {@code false} otherwise.
+         */
+        private final boolean useSpecificExecutorForSystemKeyspaces;
+
+        public PerDiskFlushExecutors(int flushWriters,
+                                     String[] locationsForNonSystemKeyspaces,
+                                     boolean useSpecificLocationForSystemKeyspaces)
+        {
+            ExecutorService[] flushExecutors = createPerDiskFlushWriters(locationsForNonSystemKeyspaces.length, flushWriters);
+            nonLocalSystemflushExecutors = flushExecutors;
+            useSpecificExecutorForSystemKeyspaces = useSpecificLocationForSystemKeyspaces;
+            localSystemDiskFlushExecutors = useSpecificLocationForSystemKeyspaces ? new ExecutorService[] {newThreadPool("LocalSystemKeyspacesDiskMemtableFlushWriter", flushWriters)}
+                                                                                  : new ExecutorService[] {flushExecutors[0]};
+        }
+
+        private static ExecutorService[] createPerDiskFlushWriters(int numberOfExecutors, int flushWriters)
+        {
+            ExecutorService[] flushExecutors = new ExecutorService[numberOfExecutors];
+
+            for (int i = 0; i < numberOfExecutors; i++)
+            {
+                flushExecutors[i] = newThreadPool("PerDiskMemtableFlushWriter_" + i, flushWriters);
+            }
+            return flushExecutors;
+        }
+
+        private static JMXEnabledThreadPoolExecutor newThreadPool(String poolName, int size)
+        {
+            return new JMXEnabledThreadPoolExecutor(size,
+                                                    Stage.KEEP_ALIVE_SECONDS,
+                                                    TimeUnit.SECONDS,
+                                                    new LinkedBlockingQueue<>(),
+                                                    new NamedThreadFactory(poolName),
+                                                    "internal");
+        }
+
+        /**
+         * Returns the flush executors for the specified keyspace.
+         *
+         * @param keyspaceName the keyspace name
+         * @param tableName the table name
+         * @return the flush executors that should be used for flushing the memtables of the specified keyspace.
+         */
+        public ExecutorService[] getExecutorsFor(String keyspaceName, String tableName)
+        {
+            return Directories.isStoredInLocalSystemKeyspacesDataLocation(keyspaceName, tableName) ? localSystemDiskFlushExecutors
+                                                                                                   : nonLocalSystemflushExecutors;
+        }
+
+        /**
+         * Appends all the {@code ExecutorService} used for flushes to the collection.
+         *
+         * @param collection the collection to append to.
+         */
+        public void appendAllExecutors(Collection<ExecutorService> collection)
+        {
+            Collections.addAll(collection, nonLocalSystemflushExecutors);
+            if (useSpecificExecutorForSystemKeyspaces)
+                Collections.addAll(collection, localSystemDiskFlushExecutors);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 9a620a2..cf4238c 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -17,18 +17,11 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOError;
-import java.io.IOException;
-import java.nio.file.FileStore;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.*;
+import java.nio.file.*;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiPredicate;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -42,9 +35,11 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSDiskFullWriteError;
 import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSNoDiskAvailableForWriteError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.DirectorySizeCalculator;
 import org.apache.cassandra.utils.FBUtilities;
@@ -93,15 +88,11 @@ public class Directories
     public static final String TMP_SUBDIR = "tmp";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
-    public static final DataDirectory[] dataDirectories;
-
-    static
-    {
-        String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        dataDirectories = new DataDirectory[locations.length];
-        for (int i = 0; i < locations.length; ++i)
-            dataDirectories[i] = new DataDirectory(new File(locations[i]));
-    }
+    /**
+     * The directories used to store keyspaces data.
+     */
+    public static final DataDirectories dataDirectories = new DataDirectories(DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(),
+                                                                              DatabaseDescriptor.getLocalSystemKeyspacesDataFileLocations());
 
     /**
      * Checks whether Cassandra has RWX permissions to the specified directory.  Logs an error with
@@ -184,7 +175,7 @@ public class Directories
 
     public Directories(final TableMetadata metadata)
     {
-        this(metadata, dataDirectories);
+        this(metadata, dataDirectories.getDataDirectoriesFor(metadata));
     }
 
     public Directories(final TableMetadata metadata, Collection<DataDirectory> paths)
@@ -445,10 +436,12 @@ public class Directories
         }
 
         if (candidates.isEmpty())
+        {
             if (tooBig)
-                throw new FSDiskFullWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), "");
-            else
-                throw new FSWriteError(new IOException("All configured data directories have been disallowed as unwritable for erroring out"), "");
+                throw new FSDiskFullWriteError(metadata.keyspace, writeSize);
+
+            throw new FSNoDiskAvailableForWriteError(metadata.keyspace);
+        }
 
         // shortcut for single data directory systems
         if (candidates.size() == 1)
@@ -513,14 +506,10 @@ public class Directories
                 allowedDirs.add(dir);
         }
 
-        Collections.sort(allowedDirs, new Comparator<DataDirectory>()
-        {
-            @Override
-            public int compare(DataDirectory o1, DataDirectory o2)
-            {
-                return o1.location.compareTo(o2.location);
-            }
-        });
+        if (allowedDirs.isEmpty())
+            throw new FSNoDiskAvailableForWriteError(metadata.keyspace);
+
+        allowedDirs.sort(Comparator.comparing(o -> o.location));
         return allowedDirs.toArray(new DataDirectory[allowedDirs.size()]);
     }
 
@@ -592,10 +581,35 @@ public class Directories
         }
     }
 
+    /**
+     * Checks if the specified table should be stored with local system data.
+     *
+     * <p> To minimize the risk of failures, SSTables for local system keyspaces must be stored in a single data
+     * directory. The only exception to this are some of the system table as the server can continue operating even
+     *  if those tables loose some data.</p>
+     *
+     * @param keyspace the keyspace name
+     * @param table the table name
+     * @return {@code true} if the specified table should be stored with local system data, {@code false} otherwise.
+     */
+    public static boolean isStoredInLocalSystemKeyspacesDataLocation(String keyspace, String table)
+    {
+        String keyspaceName = keyspace.toLowerCase();
+
+        return SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES.contains(keyspaceName)
+                && !(SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspaceName)
+                        && SystemKeyspace.TABLES_SPLIT_ACROSS_MULTIPLE_DISKS.contains(table.toLowerCase()));
+    }
+
     public static class DataDirectory
     {
         public final File location;
 
+        public DataDirectory(String location)
+        {
+            this(new File(location));
+        }
+
         public DataDirectory(File location)
         {
             this.location = location;
@@ -632,6 +646,90 @@ public class Directories
         }
     }
 
+    /**
+     * Data directories used to store keyspace data.
+     */
+    public static final class DataDirectories implements Iterable<DataDirectory>
+    {
+        /**
+         * The directories for storing the local system keyspaces.
+         */
+        private final DataDirectory[] localSystemKeyspaceDataDirectories;
+
+        /**
+         * The directories where the data of the non local system keyspaces should be stored.
+         */
+        private final DataDirectory[] nonLocalSystemKeyspacesDirectories;
+
+
+        public DataDirectories(String[] locationsForNonSystemKeyspaces, String[] locationsForSystemKeyspace)
+        {
+            nonLocalSystemKeyspacesDirectories = toDataDirectories(locationsForNonSystemKeyspaces);
+            localSystemKeyspaceDataDirectories = toDataDirectories(locationsForSystemKeyspace);
+        }
+
+        private static DataDirectory[] toDataDirectories(String... locations)
+        {
+            DataDirectory[] directories = new DataDirectory[locations.length];
+            for (int i = 0; i < locations.length; ++i)
+                directories[i] = new DataDirectory(new File(locations[i]));
+            return directories;
+        }
+
+        /**
+         * Returns the data directories for the specified table.
+         *
+         * @param table the table metadata
+         * @return the data directories for the specified table
+         */
+        public DataDirectory[] getDataDirectoriesFor(TableMetadata table)
+        {
+            return isStoredInLocalSystemKeyspacesDataLocation(table.keyspace, table.name) ? localSystemKeyspaceDataDirectories
+                                                                                          : nonLocalSystemKeyspacesDirectories;
+        }
+
+        @Override
+        public Iterator<DataDirectory> iterator()
+        {
+            return getAllDirectories().iterator();
+        }
+
+        public Set<DataDirectory> getAllDirectories()
+        {
+            Set<DataDirectory> directories = new LinkedHashSet<>(nonLocalSystemKeyspacesDirectories.length + localSystemKeyspaceDataDirectories.length);
+            Collections.addAll(directories, nonLocalSystemKeyspacesDirectories);
+            Collections.addAll(directories, localSystemKeyspaceDataDirectories);
+            return directories;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            DataDirectories that = (DataDirectories) o;
+
+            return Arrays.equals(this.localSystemKeyspaceDataDirectories, that.localSystemKeyspaceDataDirectories)
+                && Arrays.equals(this.nonLocalSystemKeyspacesDirectories, that.nonLocalSystemKeyspacesDirectories);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(localSystemKeyspaceDataDirectories, nonLocalSystemKeyspacesDirectories);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "DataDirectories {" +
+                   "systemKeyspaceDataDirectories=" + Arrays.toString(localSystemKeyspaceDataDirectories) +
+                   ", nonSystemKeyspacesDirectories=" + Arrays.toString(nonLocalSystemKeyspacesDirectories) +
+                   '}';
+        }
+    }
+
     static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
     {
         final DataDirectory dataDirectory;
@@ -1001,17 +1099,11 @@ public class Directories
         return visitor.getAllocatedSize();
     }
 
-    public static List<File> getKSChildDirectories(String ksName)
-    {
-        return getKSChildDirectories(ksName, dataDirectories);
-
-    }
-
     // Recursively finds all the sub directories in the KS directory.
-    public static List<File> getKSChildDirectories(String ksName, DataDirectory[] directories)
+    public static List<File> getKSChildDirectories(String ksName)
     {
         List<File> result = new ArrayList<>();
-        for (DataDirectory dataDirectory : directories)
+        for (DataDirectory dataDirectory : dataDirectories.getAllDirectories())
         {
             File ksDir = new File(dataDirectory.location, ksName);
             File[] cfDirs = ksDir.listFiles();
@@ -1062,21 +1154,6 @@ public class Directories
         return StringUtils.join(s, File.separator);
     }
 
-    @VisibleForTesting
-    static void overrideDataDirectoriesForTest(String loc)
-    {
-        for (int i = 0; i < dataDirectories.length; ++i)
-            dataDirectories[i] = new DataDirectory(new File(loc));
-    }
-
-    @VisibleForTesting
-    static void resetDataDirectoriesAfterTest()
-    {
-        String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        for (int i = 0; i < locations.length; ++i)
-            dataDirectories[i] = new DataDirectory(new File(locations[i]));
-    }
-
     private class SSTableSizeSummer extends DirectorySizeCalculator
     {
         private final HashSet<File> toSkip;
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index bbb6dbb..cc617da 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Splitter;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index bb6ab4a..278541d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -72,8 +72,6 @@ import static java.util.Collections.singletonMap;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
-import static org.apache.cassandra.locator.Replica.fullReplica;
-import static org.apache.cassandra.locator.Replica.transientReplica;
 
 public final class SystemKeyspace
 {
@@ -110,6 +108,17 @@ public final class SystemKeyspace
     public static final String PREPARED_STATEMENTS = "prepared_statements";
     public static final String REPAIRS = "repairs";
 
+    /**
+     * By default the system keyspace tables should be stored in a single data directory to allow the server
+     * to handle more gracefully disk failures. Some tables through can be split accross multiple directories
+     * as the server can continue operating even if those tables lost some data.
+     */
+    public static final Set<String> TABLES_SPLIT_ACROSS_MULTIPLE_DISKS = ImmutableSet.of(BATCHES,
+                                                                                         PAXOS,
+                                                                                         COMPACTION_HISTORY,
+                                                                                         PREPARED_STATEMENTS,
+                                                                                         REPAIRS);
+
     @Deprecated public static final String LEGACY_PEERS = "peers";
     @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
     @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges";
diff --git a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java b/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
index ca5d8da..ebb07e2 100644
--- a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
+++ b/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
@@ -18,16 +18,20 @@
 
 package org.apache.cassandra.io;
 
+import java.io.IOException;
+
 public class FSDiskFullWriteError extends FSWriteError
 {
-    public FSDiskFullWriteError(Throwable cause, String path)
+    public FSDiskFullWriteError(String keyspace, long mutationSize)
     {
-        super(cause, path);
+        super(new IOException(String.format("Insufficient disk space to write %d bytes into the %s keyspace",
+                                            mutationSize,
+                                            keyspace)));
     }
 
     @Override
     public String toString()
     {
-        return "FSDiskFullWriteError in " + path;
+        return "FSDiskFullWriteError";
     }
 }
diff --git a/src/java/org/apache/cassandra/io/FSWriteError.java b/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java
similarity index 65%
copy from src/java/org/apache/cassandra/io/FSWriteError.java
copy to src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java
index 6169904..14dcd38 100644
--- a/src/java/org/apache/cassandra/io/FSWriteError.java
+++ b/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java
@@ -15,25 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.io;
 
-import java.io.File;
+import java.io.IOException;
 
-public class FSWriteError extends FSError
+/**
+ * Thrown when all the disks used by a given keyspace have been marked as unwriteable.
+ */
+public class FSNoDiskAvailableForWriteError extends FSWriteError
 {
-    public FSWriteError(Throwable cause, File path)
-    {
-        super(cause, path);
-    }
-
-    public FSWriteError(Throwable cause, String path)
+    public FSNoDiskAvailableForWriteError(String keyspace)
     {
-        this(cause, new File(path));
+        super(new IOException(String.format("The data directories for the %s keyspace have been marked as unwritable",
+                                            keyspace)));
     }
 
     @Override
     public String toString()
     {
-        return "FSWriteError in " + path;
+        return "FSNoDiskAvailableForWriteError";
     }
 }
diff --git a/src/java/org/apache/cassandra/io/FSWriteError.java b/src/java/org/apache/cassandra/io/FSWriteError.java
index 6169904..b419086 100644
--- a/src/java/org/apache/cassandra/io/FSWriteError.java
+++ b/src/java/org/apache/cassandra/io/FSWriteError.java
@@ -31,6 +31,11 @@ public class FSWriteError extends FSError
         this(cause, new File(path));
     }
 
+    public FSWriteError(Throwable cause)
+    {
+        this(cause, new File(""));
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index f506140..e0ea436 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -41,8 +41,12 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.base.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -969,4 +973,68 @@ public final class FileUtils
             return fileStore.getAttribute(attribute);
         }
     }
+
+    /**
+     * Moves the contents of a directory to another directory.
+     * <p>Once a file has been copied to the target directory it will be deleted from the source directory.
+     * If a file already exists in the target directory a warning will be logged and the file will not
+     * be deleted.</p>
+     *
+     * @param source the directory containing the files to move
+     * @param target the directory where the files must be moved
+     */
+    public static void moveRecursively(Path source, Path target) throws IOException
+    {
+        logger.info("Moving {} to {}" , source, target);
+
+        if (Files.isDirectory(source))
+        {
+            Files.createDirectories(target);
+
+            for (File f : source.toFile().listFiles())
+            {
+                String fileName = f.getName();
+                moveRecursively(source.resolve(fileName), target.resolve(fileName));
+            }
+
+            deleteDirectoryIfEmpty(source);
+        }
+        else
+        {
+            if (Files.exists(target))
+            {
+                logger.warn("Cannot move the file {} to {} as the target file already exists." , source, target);
+            }
+            else
+            {
+                Files.copy(source, target, StandardCopyOption.COPY_ATTRIBUTES);
+                Files.delete(source);
+            }
+        }
+    }
+
+    /**
+     * Deletes the specified directory if it is empty
+     *
+     * @param path the path to the directory
+     */
+    public static void deleteDirectoryIfEmpty(Path path) throws IOException
+    {
+        Preconditions.checkArgument(Files.isDirectory(path), String.format("%s is not a directory", path));
+
+        try
+        {
+            logger.info("Deleting directory {}", path);
+            Files.delete(path);
+        }
+        catch (DirectoryNotEmptyException e)
+        {
+            try (Stream<Path> paths = Files.list(path))
+            {
+                String content = paths.map(p -> p.getFileName().toString()).collect(Collectors.joining(", "));
+
+                logger.warn("Cannot delete the directory {} as it is not empty. (Content: {})", path, content);
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 6b22635..2cb1254 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,8 +24,14 @@ import java.lang.management.MemoryPoolMXBean;
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 import javax.management.remote.JMXConnectorServer;
@@ -44,6 +50,7 @@ import com.codahale.metrics.jvm.BufferPoolMetricSet;
 import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
 import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
 import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -223,6 +230,19 @@ public class CassandraDaemon
     {
         FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
 
+        // Since CASSANDRA-14793 the local system keyspaces data are not dispatched across the data directories
+        // anymore to reduce the risks in case of disk failures. By consequence, the system need to ensure in case of
+        // upgrade that the old data files have been migrated to the new directories before we start deleting
+        // snapshots and upgrading system tables.
+        try
+        {
+            migrateSystemDataIfNeeded();
+        }
+        catch (IOException e)
+        {
+            exitOrFail(StartupException.ERR_WRONG_DISK_STATE, e.getMessage(), e);
+        }
+
         // Delete any failed snapshot deletions on Windows - see CASSANDRA-9658
         if (FBUtilities.isWindows)
             WindowsFailedSnapshotTracker.deleteOldSnapshots();
@@ -247,7 +267,7 @@ public class CassandraDaemon
         }
         catch (IOException e)
         {
-            exitOrFail(3, e.getMessage(), e.getCause());
+            exitOrFail(StartupException.ERR_WRONG_DISK_STATE, e.getMessage(), e.getCause());
         }
 
         // We need to persist this as soon as possible after startup checks.
@@ -472,6 +492,73 @@ public class CassandraDaemon
         }
 
     }
+
+    /**
+     * Checks if the data of the local system keyspaces need to be migrated to a different location.
+     *
+     * @throws IOException
+     */
+    public void migrateSystemDataIfNeeded() throws IOException
+    {
+        // If there is only one directory and no system keyspace directory has been specified we do not need to do
+        // anything. If it is not the case we want to try to migrate the data.
+        if (!DatabaseDescriptor.useSpecificLocationForLocalSystemData()
+                && DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations().length <= 1)
+            return;
+
+        // We can face several cases:
+        //  1) The system data are spread accross the data file locations and need to be moved to
+        //     the first data location (upgrade to 4.0)
+        //  2) The system data are spread accross the data file locations and need to be moved to
+        //     the system keyspace location configured by the user (upgrade to 4.0)
+        //  3) The system data are stored in the first data location and need to be moved to
+        //     the system keyspace location configured by the user (system_data_file_directory has been configured)
+        Path target = Paths.get(DatabaseDescriptor.getLocalSystemKeyspacesDataFileLocations()[0]);
+
+        String[] nonLocalSystemKeyspacesFileLocations = DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations();
+        String[] sources = DatabaseDescriptor.useSpecificLocationForLocalSystemData() ? nonLocalSystemKeyspacesFileLocations
+                                                                                      : Arrays.copyOfRange(nonLocalSystemKeyspacesFileLocations,
+                                                                                                           1,
+                                                                                                           nonLocalSystemKeyspacesFileLocations.length);
+
+        for (String source : sources)
+        {
+            Path dataFileLocation = Paths.get(source);
+
+            if (!Files.exists(dataFileLocation))
+                continue;
+
+            try (Stream<Path> locationChildren = Files.list(dataFileLocation))
+            {
+                Path[] keyspaceDirectories = locationChildren.filter(p -> SchemaConstants.isLocalSystemKeyspace(p.getFileName().toString()))
+                                                             .toArray(Path[]::new);
+
+                for (Path keyspaceDirectory : keyspaceDirectories)
+                {
+                    try (Stream<Path> keyspaceChildren = Files.list(keyspaceDirectory))
+                    {
+                        Path[] tableDirectories = keyspaceChildren.filter(Files::isDirectory)
+                                                                  .filter(p -> !SystemKeyspace.TABLES_SPLIT_ACROSS_MULTIPLE_DISKS
+                                                                                              .contains(p.getFileName()
+                                                                                                         .toString()))
+                                                                  .toArray(Path[]::new);
+
+                        for (Path tableDirectory : tableDirectories)
+                        {
+                            FileUtils.moveRecursively(tableDirectory,
+                                                      target.resolve(dataFileLocation.relativize(tableDirectory)));
+                        }
+
+                        if (!SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspaceDirectory.getFileName().toString()))
+                        {
+                            FileUtils.deleteDirectoryIfEmpty(keyspaceDirectory);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     public void setupVirtualKeyspaces()
     {
         VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
index d72b59a..d5e3e53 100644
--- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
+++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
@@ -26,9 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DisallowedDirectories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.FSError;
-import org.apache.cassandra.io.FSErrorHandler;
-import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
@@ -67,6 +65,18 @@ public class DefaultFSErrorHandler implements FSErrorHandler
                 StorageService.instance.stopTransports();
                 break;
             case best_effort:
+
+                // There are a few scenarios where we know that the node will not be able to operate properly.
+                // For those scenarios we want to stop the transports and let the administrators handle the problem.
+                // Those scenarios are:
+                // * All the disks are full
+                // * All the disks for a given keyspace have been marked as unwriteable
+                if (e instanceof FSDiskFullWriteError || e instanceof FSNoDiskAvailableForWriteError)
+                {
+                    logger.error("Stopping transports: " + e.getCause().getMessage());
+                    StorageService.instance.stopTransports();
+                }
+
                 // for both read and write errors mark the path as unwritable.
                 DisallowedDirectories.maybeMarkUnwritable(e.path);
                 if (e instanceof FSReadError)
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index cf3d414..85b5836 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -340,6 +340,7 @@ public class StartupChecks
                                                  Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
                                                                DatabaseDescriptor.getSavedCachesLocation(),
                                                                DatabaseDescriptor.getHintsDirectory().getAbsolutePath()));
+
         for (String dataDir : dirs)
         {
             logger.debug("Checking directory {}", dataDir);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index da0c8ea..661b1a0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3460,14 +3460,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return stringify(Gossiper.instance.getUnreachableMembers(), true);
     }
 
+    @Override
     public String[] getAllDataFileLocations()
     {
-        String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        for (int i = 0; i < locations.length; i++)
-            locations[i] = FileUtils.getCanonicalPath(locations[i]);
+        return getCanonicalPaths(DatabaseDescriptor.getAllDataFileLocations());
+    }
+
+    private String[] getCanonicalPaths(String[] paths)
+    {
+        String[] locations = new String[paths.length];
+        for (int i = 0; i < paths.length; i++)
+            locations[i] = FileUtils.getCanonicalPath(paths[i]);
         return locations;
     }
 
+    @Override
+    public String[] getLocalSystemKeyspacesDataFileLocations()
+    {
+        return getCanonicalPaths(DatabaseDescriptor.getLocalSystemKeyspacesDataFileLocations());
+    }
+
+    @Override
+    public String[] getNonLocalSystemKeyspacesDataFileLocations()
+    {
+        return getCanonicalPaths(DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations());
+    }
+
     public String getCommitLogLocation()
     {
         return FileUtils.getCanonicalPath(DatabaseDescriptor.getCommitLogLocation());
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 63c2d96..cc69fec 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -121,6 +121,20 @@ public interface StorageServiceMBean extends NotificationEmitter
     public String[] getAllDataFileLocations();
 
     /**
+     * Returns the locations where the local system keyspaces data should be stored.
+     *
+     * @return the locations where the local system keyspaces data should be stored
+     */
+    public String[] getLocalSystemKeyspacesDataFileLocations();
+
+    /**
+     * Returns the locations where should be stored the non system keyspaces data.
+     *
+     * @return the locations where should be stored the non system keyspaces data
+     */
+    public String[] getNonLocalSystemKeyspacesDataFileLocations();
+
+    /**
      * Get location of the commit log
      * @return a string path
      */
diff --git a/test/conf/system_keyspaces_directory.yaml b/test/conf/system_keyspaces_directory.yaml
new file mode 100644
index 0000000..685d54e
--- /dev/null
+++ b/test/conf/system_keyspaces_directory.yaml
@@ -0,0 +1 @@
+local_system_data_file_directory: build/test/cassandra/system_data
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b0a6c3f..0ed7000 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -426,6 +426,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 DatabaseDescriptor.daemonInitialization();
                 FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
                 DatabaseDescriptor.createAllDirectories();
+                CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded();
                 CommitLog.instance.start();
 
                 CassandraDaemon.getInstanceForTesting().runStartupChecks();
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 23138b0..27246aa 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -94,6 +94,9 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
         for (int i = 0; i < config.data_file_directories.length; i++)
             config.data_file_directories[i] += sep + offset;
 
+        if (config.local_system_data_file_directory != null)
+            config.local_system_data_file_directory += sep + offset;
+
         return config;
     }
 }
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index eb2016f..507827e 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -22,7 +22,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -36,10 +35,14 @@ import org.junit.Test;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.auth.AuthKeyspace;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.schema.IndexTarget;
+import org.apache.cassandra.db.Directories.DataDirectories;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.index.internal.CassandraIndex;
@@ -50,7 +53,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static org.junit.Assert.assertEquals;
@@ -88,7 +90,6 @@ public class DirectoriesTest
         tempDataDir.delete(); // hack to create a temp dir
         tempDataDir.mkdir();
 
-        Directories.overrideDataDirectoriesForTest(tempDataDir.getPath());
         // Create two fake data dir for tests, one using CF directories, one that do not.
         createTestFiles();
     }
@@ -96,10 +97,14 @@ public class DirectoriesTest
     @AfterClass
     public static void afterClass()
     {
-        Directories.resetDataDirectoriesAfterTest();
         FileUtils.deleteRecursive(tempDataDir);
     }
 
+    private static DataDirectory[] toDataDirectories(File location) throws IOException
+    {
+        return new DataDirectory[] { new DataDirectory(location) };
+    }
+
     private static void createTestFiles() throws IOException
     {
         for (TableMetadata cfm : CFM)
@@ -156,7 +161,7 @@ public class DirectoriesTest
     {
         for (TableMetadata cfm : CFM)
         {
-            Directories directories = new Directories(cfm);
+            Directories directories = new Directories(cfm, toDataDirectories(tempDataDir));
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
 
             Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.name, 1, SSTableFormat.Type.BIG);
@@ -169,7 +174,7 @@ public class DirectoriesTest
     }
 
     @Test
-    public void testSecondaryIndexDirectories()
+    public void testSecondaryIndexDirectories() throws IOException
     {
         TableMetadata.Builder builder =
             TableMetadata.builder(KS, "cf")
@@ -187,8 +192,8 @@ public class DirectoriesTest
 
         TableMetadata PARENT_CFM = builder.build();
         TableMetadata INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);
-        Directories parentDirectories = new Directories(PARENT_CFM);
-        Directories indexDirectories = new Directories(INDEX_CFM);
+        Directories parentDirectories = new Directories(PARENT_CFM, toDataDirectories(tempDataDir));
+        Directories indexDirectories = new Directories(INDEX_CFM, toDataDirectories(tempDataDir));
         // secondary index has its own directory
         for (File dir : indexDirectories.getCFDirectories())
         {
@@ -248,11 +253,11 @@ public class DirectoriesTest
     }
 
     @Test
-    public void testSSTableLister()
+    public void testSSTableLister() throws IOException
     {
         for (TableMetadata cfm : CFM)
         {
-            Directories directories = new Directories(cfm);
+            Directories directories = new Directories(cfm, toDataDirectories(tempDataDir));
             checkFiles(cfm, directories);
         }
     }
@@ -301,7 +306,7 @@ public class DirectoriesTest
     {
         for (TableMetadata cfm : CFM)
         {
-            Directories directories = new Directories(cfm);
+            Directories directories = new Directories(cfm, toDataDirectories(tempDataDir));
 
             File tempDir = directories.getTemporaryWriteableDirectoryAsFile(10);
             tempDir.mkdir();
@@ -332,19 +337,21 @@ public class DirectoriesTest
         try
         {
             DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.best_effort);
+
+            Set<DataDirectory> directories = Directories.dataDirectories.getAllDirectories();
+            DataDirectory first = directories.iterator().next();
+
             // Fake a Directory creation failure
-            if (Directories.dataDirectories.length > 0)
+            if (!directories.isEmpty())
             {
                 String[] path = new String[] {KS, "bad"};
-                File dir = new File(Directories.dataDirectories[0].location, StringUtils.join(path, File.separator));
+                File dir = new File(first.location, StringUtils.join(path, File.separator));
                 JVMStabilityInspector.inspectThrowable(new FSWriteError(new IOException("Unable to create directory " + dir), dir));
             }
 
-            for (DataDirectory dd : Directories.dataDirectories)
-            {
-                File file = new File(dd.location, new File(KS, "bad").getPath());
-                assertTrue(DisallowedDirectories.isUnwritable(file));
-            }
+            File file = new File(first.location, new File(KS, "bad").getPath());
+            assertTrue(DisallowedDirectories.isUnwritable(file));
+
         } 
         finally 
         {
@@ -357,7 +364,7 @@ public class DirectoriesTest
     {
         for (final TableMetadata cfm : CFM)
         {
-            final Directories directories = new Directories(cfm);
+            final Directories directories = new Directories(cfm, toDataDirectories(tempDataDir));
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
@@ -519,9 +526,9 @@ public class DirectoriesTest
     public void getDataDirectoryForFile()
     {
         Collection<DataDirectory> paths = new ArrayList<>();
-        paths.add(new DataDirectory(new File("/tmp/a")));
-        paths.add(new DataDirectory(new File("/tmp/aa")));
-        paths.add(new DataDirectory(new File("/tmp/aaa")));
+        paths.add(new DataDirectory("/tmp/a"));
+        paths.add(new DataDirectory("/tmp/aa"));
+        paths.add(new DataDirectory("/tmp/aaa"));
 
         for (TableMetadata cfm : CFM)
         {
@@ -614,6 +621,52 @@ public class DirectoriesTest
         }
     }
 
+    @Test
+    public void testIsStoredInLocalSystemKeyspacesDataLocation() throws IOException
+    {
+        for (String table : SystemKeyspace.TABLES_SPLIT_ACROSS_MULTIPLE_DISKS)
+        {
+            assertFalse(Directories.isStoredInLocalSystemKeyspacesDataLocation(SchemaConstants.SYSTEM_KEYSPACE_NAME, table));
+        }
+        assertTrue(Directories.isStoredInLocalSystemKeyspacesDataLocation(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2));
+        assertTrue(Directories.isStoredInLocalSystemKeyspacesDataLocation(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2));
+        assertTrue(Directories.isStoredInLocalSystemKeyspacesDataLocation(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES));
+        assertTrue(Directories.isStoredInLocalSystemKeyspacesDataLocation(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES));
+        assertFalse(Directories.isStoredInLocalSystemKeyspacesDataLocation(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLES));
+        assertFalse(Directories.isStoredInLocalSystemKeyspacesDataLocation(KS, TABLES[0]));
+    }
+
+    @Test
+    public void testDataDirectoriesIterator() throws IOException
+    {
+        Path tmpDir = Files.createTempDirectory(this.getClass().getSimpleName());
+        Path subDir_1 = Files.createDirectory(tmpDir.resolve("a"));
+        Path subDir_2 = Files.createDirectory(tmpDir.resolve("b"));
+        Path subDir_3 = Files.createDirectory(tmpDir.resolve("c"));
+
+        DataDirectories directories = new DataDirectories(new String[]{subDir_1.toString(), subDir_2.toString()},
+                                                          new String[]{subDir_3.toString()});
+
+        Iterator<DataDirectory> iter = directories.iterator();
+        assertTrue(iter.hasNext());
+        assertEquals(new DataDirectory(subDir_1.toFile()), iter.next());
+        assertTrue(iter.hasNext());
+        assertEquals(new DataDirectory(subDir_2.toFile()), iter.next());
+        assertTrue(iter.hasNext());
+        assertEquals(new DataDirectory(subDir_3.toFile()), iter.next());
+        assertFalse(iter.hasNext());
+
+        directories = new DataDirectories(new String[]{subDir_1.toString(), subDir_2.toString()},
+                                                          new String[]{subDir_1.toString()});
+
+        iter = directories.iterator();
+        assertTrue(iter.hasNext());
+        assertEquals(new DataDirectory(subDir_1.toFile()), iter.next());
+        assertTrue(iter.hasNext());
+        assertEquals(new DataDirectory(subDir_2.toFile()), iter.next());
+        assertFalse(iter.hasNext());
+    }
+
     private String getNewFilename(TableMetadata tm, boolean oldStyle)
     {
         return tm.keyspace + File.separator + tm.name + (oldStyle ? "" : Component.separator + tm.id.toHexString()) + "/na-1-big-Data.db";
diff --git a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
index 373232d..7d19f51 100644
--- a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
+++ b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
@@ -31,7 +31,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.assertj.core.api.Assertions;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -128,6 +130,96 @@ public class FileUtilsTest
         assertFalse(FileUtils.isContained(new File("/tmp/abc/../abc"), new File("/tmp/abcc")));
     }
 
+    @Test
+    public void testMoveFiles() throws IOException
+    {
+        Path tmpDir = Files.createTempDirectory(this.getClass().getSimpleName());
+        Path sourceDir = Files.createDirectory(tmpDir.resolve("source"));
+        Path subDir_1 = Files.createDirectory(sourceDir.resolve("a"));
+        subDir_1.resolve("file_1.txt").toFile().createNewFile();
+        subDir_1.resolve("file_2.txt").toFile().createNewFile();
+        Path subDir_11 = Files.createDirectory(subDir_1.resolve("ab"));
+        subDir_11.resolve("file_1.txt").toFile().createNewFile();
+        subDir_11.resolve("file_2.txt").toFile().createNewFile();
+        subDir_11.resolve("file_3.txt").toFile().createNewFile();
+        Path subDir_12 = Files.createDirectory(subDir_1.resolve("ac"));
+        Path subDir_2 = Files.createDirectory(sourceDir.resolve("b"));
+        subDir_2.resolve("file_1.txt").toFile().createNewFile();
+        subDir_2.resolve("file_2.txt").toFile().createNewFile();
+
+        Path targetDir = Files.createDirectory(tmpDir.resolve("target"));
+
+        FileUtils.moveRecursively(sourceDir, targetDir);
+
+        assertThat(sourceDir).doesNotExist();
+        assertThat(targetDir.resolve("a/file_1.txt")).exists();
+        assertThat(targetDir.resolve("a/file_2.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_1.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_2.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_3.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_1.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_2.txt")).exists();
+        assertThat(targetDir.resolve("a/ac/")).exists();
+        assertThat(targetDir.resolve("b/file_1.txt")).exists();
+        assertThat(targetDir.resolve("b/file_2.txt")).exists();
+
+        // Tests that files can be moved into existing directories
+
+        sourceDir = Files.createDirectory(tmpDir.resolve("source2"));
+        subDir_1 = Files.createDirectory(sourceDir.resolve("a"));
+        subDir_1.resolve("file_3.txt").toFile().createNewFile();
+        subDir_11 = Files.createDirectory(subDir_1.resolve("ab"));
+        subDir_11.resolve("file_4.txt").toFile().createNewFile();
+
+        FileUtils.moveRecursively(sourceDir, targetDir);
+
+        assertThat(sourceDir).doesNotExist();
+        assertThat(targetDir.resolve("a/file_1.txt")).exists();
+        assertThat(targetDir.resolve("a/file_2.txt")).exists();
+        assertThat(targetDir.resolve("a/file_3.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_1.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_2.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_3.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_4.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_1.txt")).exists();
+        assertThat(targetDir.resolve("a/ab/file_2.txt")).exists();
+        assertThat(targetDir.resolve("a/ac/")).exists();
+        assertThat(targetDir.resolve("b/file_1.txt")).exists();
+        assertThat(targetDir.resolve("b/file_2.txt")).exists();
+
+        // Tests that existing files are not replaced but trigger an error.
+
+        sourceDir = Files.createDirectory(tmpDir.resolve("source3"));
+        subDir_1 = Files.createDirectory(sourceDir.resolve("a"));
+        subDir_1.resolve("file_3.txt").toFile().createNewFile();
+        FileUtils.moveRecursively(sourceDir, targetDir);
+
+        assertThat(sourceDir).exists();
+        assertThat(sourceDir.resolve("a/file_3.txt")).exists();
+        assertThat(targetDir.resolve("a/file_3.txt")).exists();
+    }
+
+    @Test
+    public void testDeleteDirectoryIfEmpty() throws IOException
+    {
+        Path tmpDir = Files.createTempDirectory(this.getClass().getSimpleName());
+        Path subDir_1 = Files.createDirectory(tmpDir.resolve("a"));
+        Path subDir_2 = Files.createDirectory(tmpDir.resolve("b"));
+        Path file_1 = subDir_2.resolve("file_1.txt");
+        file_1.toFile().createNewFile();
+
+        FileUtils.deleteDirectoryIfEmpty(subDir_1);
+        assertThat(subDir_1).doesNotExist();
+
+        FileUtils.deleteDirectoryIfEmpty(subDir_2);
+        assertThat(subDir_2).exists();
+
+        Assertions.assertThatThrownBy(() -> FileUtils.deleteDirectoryIfEmpty(file_1))
+                  .isInstanceOf(IllegalArgumentException.class)
+                  .hasMessageContaining("is not a directory");
+    }
+
+
     private File createFolder(Path path)
     {
         File folder = path.toFile();
diff --git a/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java b/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java
index b631822..975e45b 100644
--- a/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java
+++ b/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java
@@ -100,7 +100,7 @@ public class ClearSnapshotTest extends CQLTester
         tool = ToolRunner.invokeNodetool("snapshot","-t","some-other-name");
         tool.assertOnCleanExit();
             assertTrue(!tool.getStdout().isEmpty());
-        
+
         Map<String, TabularData> snapshots_before = probe.getSnapshotDetails();
         Assert.assertTrue(snapshots_before.size() == 2);
 
diff --git a/test/unit/org/apache/cassandra/tools/NodeToolTPStatsTest.java b/test/unit/org/apache/cassandra/tools/NodeToolTPStatsTest.java
index e437fc1..31423a4 100644
--- a/test/unit/org/apache/cassandra/tools/NodeToolTPStatsTest.java
+++ b/test/unit/org/apache/cassandra/tools/NodeToolTPStatsTest.java
@@ -112,7 +112,7 @@ public class NodeToolTPStatsTest extends CQLTester
     public void testTPStats() throws Throwable
     {
         ToolResult tool = ToolRunner.invokeNodetool("tpstats");
-        Assertions.assertThat(tool.getStdout()).containsIgnoringCase("Pool Name                    Active Pending Completed Blocked All time blocked");
+        Assertions.assertThat(tool.getStdout()).containsPattern("Pool Name \\s* Active Pending Completed Blocked All time blocked");
         Assertions.assertThat(tool.getStdout()).containsIgnoringCase("Latencies waiting in queue (micros) per dropped message types");
         assertTrue(tool.getCleanedStderr().isEmpty());
         assertEquals(0, tool.getExitCode());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org