You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/12/07 17:10:18 UTC

[GitHub] [cassandra] blerer commented on a change in pull request #675: CASSANDRA-14793: Improve system table handling when losing a disk when using JBOD

blerer commented on a change in pull request #675:
URL: https://github.com/apache/cassandra/pull/675#discussion_r537676403



##########
File path: src/java/org/apache/cassandra/db/ColumnFamilyStore.java
##########
@@ -2740,4 +2730,81 @@ public boolean getNeverPurgeTombstones()
     {
         return neverPurgeTombstones;
     }
+
+    /**
+     * The thread pools used to flush memtables.
+     *
+     * <p>Each disk has its own set of thread pools to perform memtable flushes.</p>
+     * <p>Based on the configuration. System keyspaces can have their own disk
+     * to allow for special redundency mechanism. If it is the case the executor services returned for
+     * system keyspace will be differents from the ones for the other keyspaces.</p>
+     */
+    private static final class PerDiskFlushExecutors
+    {
+        /**
+         * The flush executors for non system keyspaces.
+         */
+        private final ExecutorService[] nonSystemflushExecutors;
+
+        /**
+         * The flush executors for system keyspaces.
+         */
+        private final ExecutorService[] systemDiskFlushExecutors;
+
+        public PerDiskFlushExecutors(int flushWriters,
+                                     String[] locationsForNonSystemKeyspaces,
+                                     boolean useSpecificLocationForSystemKeyspaces)
+        {
+            ExecutorService[] flushExecutors = createPerDiskFlushWriters(locationsForNonSystemKeyspaces.length, flushWriters);
+            nonSystemflushExecutors = flushExecutors;
+            systemDiskFlushExecutors = useSpecificLocationForSystemKeyspaces ? new ExecutorService[] {newThreadPool("SystemKeyspacesDiskMemtableFlushWriter", flushWriters)}
+                                                                             : new ExecutorService[] {flushExecutors[0]};
+        }
+
+        private ExecutorService[] createPerDiskFlushWriters(int numberOfExecutors, int flushWriters)
+        {
+            ExecutorService[] flushExecutors = new ExecutorService[numberOfExecutors];
+
+            for (int i = 0; i < numberOfExecutors; i++)
+            {
+                flushExecutors[i] = newThreadPool("PerDiskMemtableFlushWriter_" + i, flushWriters);
+            }
+            return flushExecutors;
+        }
+
+        private static JMXEnabledThreadPoolExecutor newThreadPool(String poolName, int size)
+        {
+            return new JMXEnabledThreadPoolExecutor(size,
+                                                    Stage.KEEP_ALIVE_SECONDS,
+                                                    TimeUnit.SECONDS,
+                                                    new LinkedBlockingQueue<Runnable>(),
+                                                    new NamedThreadFactory(poolName),
+                                                    "internal");
+        }
+
+        /**
+         * Returns the flush executors for the specified keyspace.
+         *
+         * @param keyspaceName the keyspace name
+         * @param tableName the table name
+         * @return the flush executors that should be used for flushing the memtables of the specified keyspace.
+         */
+        public ExecutorService[] getExecutorsFor(String keyspaceName, String tableName)
+        {
+            return Directories.isStoredInSystemKeyspacesDataLocation(keyspaceName, tableName) ? systemDiskFlushExecutors
+                                                                  : nonSystemflushExecutors;
+        }
+
+        /**
+         * Appends all the {@code ExecutorService} used for flushes to the colection.
+         *
+         * @param collection the colection to append to.
+         */
+        public void appendAllExecutors(Collection<ExecutorService> collection)
+        {
+            Collections.addAll(collection, nonSystemflushExecutors);
+            if (nonSystemflushExecutors != systemDiskFlushExecutors)

Review comment:
       After looking into it, I believe that this code is some left over from a previous version and is actually wrong. I will fix it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org