You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/04/14 13:59:08 UTC

[cassandra] branch trunk updated: Improve logging around incremental repairs

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78c7853  Improve logging around incremental repairs
78c7853 is described below

commit 78c785367f2836960ae26e5e5ef258162ef30ec3
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Feb 25 14:10:22 2020 +0100

    Improve logging around incremental repairs
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-15599
---
 CHANGES.txt                                        |  1 +
 .../cassandra/db/compaction/CompactionManager.java | 28 ++++++++++++----------
 .../db/compaction/PendingRepairManager.java        |  2 +-
 .../db/repair/CassandraValidationIterator.java     | 15 +++++++++---
 .../org/apache/cassandra/repair/RepairJob.java     | 19 +++++++++------
 .../cassandra/repair/RepairMessageVerbHandler.java |  7 +++---
 .../apache/cassandra/repair/RepairRunnable.java    |  4 ++--
 .../org/apache/cassandra/repair/RepairSession.java |  3 ++-
 .../consistent/CoordinatorMessagingTest.java       |  1 -
 9 files changed, 50 insertions(+), 30 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 38fef18..d125138 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Improve logging around incremental repair (CASSANDRA-15599)
  * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
  * Replace array iterators with get by index (CASSANDRA-15394)
  * Minimize BTree iterator allocations (CASSANDRA-15389)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7924a1f..2b5ffe6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1486,7 +1486,7 @@ public class CompactionManager implements CompactionManagerMBean
                                   BooleanSupplier isCancelled)
     {
         int originalCount = txn.originals().size();
-        logger.info("Performing anticompaction on {} sstables", originalCount);
+        logger.info("Performing anticompaction on {} sstables for {}", originalCount, pendingRepair);
 
         //Group SSTables
         Set<SSTableReader> sstables = txn.originals();
@@ -1509,9 +1509,8 @@ public class CompactionManager implements CompactionManagerMBean
                 antiCompactedSSTableCount += antiCompacted;
             }
         }
-
-        String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, originalCount, antiCompactedSSTableCount);
+        String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s) for {}.";
+        logger.info(format, originalCount, antiCompactedSSTableCount, pendingRepair);
     }
 
     @VisibleForTesting
@@ -1537,7 +1536,7 @@ public class CompactionManager implements CompactionManagerMBean
             return 0;
         }
 
-        logger.info("Anticompacting {}", txn);
+        logger.info("Anticompacting {} in {}.{} for {}", txn.originals(), cfs.keyspace.getName(), cfs.getTableName(), pendingRepair);
         Set<SSTableReader> sstableAsSet = txn.originals();
 
         File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
@@ -1627,8 +1626,6 @@ public class CompactionManager implements CompactionManagerMBean
                 }
             }
 
-            List<SSTableReader> anticompactedSSTables = new ArrayList<>();
-
             fullWriter.prepareToCommit();
             transWriter.prepareToCommit();
             unrepairedWriter.prepareToCommit();
@@ -1636,16 +1633,23 @@ public class CompactionManager implements CompactionManagerMBean
             txn.obsoleteOriginals();
             txn.prepareToCommit();
 
-            anticompactedSSTables.addAll(fullWriter.finished());
-            anticompactedSSTables.addAll(transWriter.finished());
-            anticompactedSSTables.addAll(unrepairedWriter.finished());
+            List<SSTableReader> fullSSTables = new ArrayList<>(fullWriter.finished());
+            List<SSTableReader> transSSTables = new ArrayList<>(transWriter.finished());
+            List<SSTableReader> unrepairedSSTables = new ArrayList<>(unrepairedWriter.finished());
 
             fullWriter.commit();
             transWriter.commit();
             unrepairedWriter.commit();
             txn.commit();
-
-            return anticompactedSSTables.size();
+            logger.info("Anticompacted {} in {}.{} to full = {}, transient = {}, unrepaired = {} for {}",
+                        sstableAsSet,
+                        cfs.keyspace.getName(),
+                        cfs.getTableName(),
+                        fullSSTables,
+                        transSSTables,
+                        unrepairedSSTables,
+                        pendingRepair);
+            return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size();
         }
         catch (Throwable e)
         {
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 78d4483..764a4dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -443,7 +443,7 @@ class PendingRepairManager
             {
                 if (obsoleteSSTables)
                 {
-                    logger.info("Obsoleting transient repaired ssatbles");
+                    logger.info("Obsoleting transient repaired sstables for {}", sessionID);
                     Preconditions.checkState(Iterables.all(transaction.originals(), SSTableReader::isTransient));
                     transaction.obsoleteOriginals();
                 }
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index 6c6f084..9bddd86 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -152,7 +152,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
             sstables = Refs.tryRef(sstablesToValidate);
             if (sstables == null)
             {
-                logger.error("Could not reference sstables");
+                logger.error("Could not reference sstables for {}", parentId);
                 throw new RuntimeException("Could not reference sstables");
             }
         }
@@ -204,8 +204,17 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
         }
 
         Preconditions.checkArgument(sstables != null);
-        logger.info("Performing validation compaction on {} sstables", sstables.size());
-        logger.debug("Performing validation compaction on {}", sstables);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentId);
+        if (prs != null)
+        {
+            logger.info("{}, parentSessionId={}: Performing validation compaction on {} sstables in {}.{}",
+                        prs.previewKind.logPrefix(sessionID),
+                        parentId,
+                        sstables.size(),
+                        cfs.keyspace.getName(),
+                        cfs.getTableName());
+        }
+
         controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, nowInSec));
         scanners = cfs.getCompactionStrategyManager().getScanners(sstables, ranges);
         ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.active);
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index e609f0d..16eb325 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -151,7 +151,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             {
                 if (!session.previewKind.isPreview())
                 {
-                    logger.info("{} {} is fully synced", session.previewKind.logPrefix(session.getId()), desc.columnFamily);
+                    logger.info("{} {}.{} is fully synced", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
                     SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily);
                 }
                 cfs.metric.repairsCompleted.inc();
@@ -165,7 +165,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             {
                 if (!session.previewKind.isPreview())
                 {
-                    logger.warn("{} {} sync failed", session.previewKind.logPrefix(session.getId()), desc.columnFamily);
+                    logger.warn("{} {}.{} sync failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
                     SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t);
                 }
                 cfs.metric.repairsCompleted.inc();
@@ -199,6 +199,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                                                   boolean pullRepair,
                                                   PreviewKind previewKind)
     {
+        long startedAt = System.currentTimeMillis();
         List<SyncTask> syncTasks = new ArrayList<>();
         // We need to difference all trees one against another
         for (int i = 0; i < trees.size() - 1; ++i)
@@ -252,7 +253,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             trees.get(i).trees.release();
         }
         trees.get(trees.size() - 1).trees.release();
-
+        logger.info("Created {} sync tasks based on {} merkle tree responses for {} (took: {}ms)",
+                    syncTasks.size(), trees.size(), desc.parentSessionId, System.currentTimeMillis() - startedAt);
         return syncTasks;
     }
 
@@ -290,6 +292,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                                                           boolean isIncremental,
                                                           PreviewKind previewKind)
     {
+        long startedAt = System.currentTimeMillis();
         List<SyncTask> syncTasks = new ArrayList<>();
         // We need to difference all trees one against another
         DifferenceHolder diffHolder = new DifferenceHolder(trees);
@@ -339,6 +342,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                 logger.debug("Node {} has nothing to stream", address);
             }
         }
+        logger.info("Created {} optimised sync tasks based on {} merkle tree responses for {} (took: {}ms)",
+                    syncTasks.size(), trees.size(), desc.parentSessionId, System.currentTimeMillis() - startedAt);
         return syncTasks;
     }
 
@@ -384,7 +389,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints);
         InetAddressAndPort address = requests.poll();
         ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, session.previewKind);
-        logger.info("Validating {}", address);
+        logger.info("{} Validating {}", session.previewKind.logPrefix(desc.sessionId), address);
         session.trackValidationCompletion(Pair.create(desc, address), firstTask);
         tasks.add(firstTask);
         ValidationTask currentTask = firstTask;
@@ -397,7 +402,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             {
                 public void onSuccess(TreeResponse result)
                 {
-                    logger.info("Validating {}", nextAddress);
+                    logger.info("{} Validating {}", session.previewKind.logPrefix(desc.sessionId), nextAddress);
                     session.trackValidationCompletion(Pair.create(desc, nextAddress), nextTask);
                     taskExecutor.execute(nextTask);
                 }
@@ -441,7 +446,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             Queue<InetAddressAndPort> requests = entry.getValue();
             InetAddressAndPort address = requests.poll();
             ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, session.previewKind);
-            logger.info("Validating {}", address);
+            logger.info("{} Validating {}", session.previewKind.logPrefix(session.getId()), address);
             session.trackValidationCompletion(Pair.create(desc, address), firstTask);
             tasks.add(firstTask);
             ValidationTask currentTask = firstTask;
@@ -454,7 +459,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                 {
                     public void onSuccess(TreeResponse result)
                     {
-                        logger.info("Validating {}", nextAddress);
+                        logger.info("{} Validating {}", session.previewKind.logPrefix(session.getId()), nextAddress);
                         session.trackValidationCompletion(Pair.create(desc, nextAddress), nextTask);
                         taskExecutor.execute(nextTask);
                     }
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 27ffd05..47da8bb 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -95,8 +95,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                     if (cfs == null)
                     {
-                        logErrorAndSendFailureResponse(String.format("Table %s.%s was dropped during snapshot phase of repair",
-                                                                     desc.keyspace, desc.columnFamily), message);
+                        logErrorAndSendFailureResponse(String.format("Table %s.%s was dropped during snapshot phase of repair %s",
+                                                                     desc.keyspace, desc.columnFamily, desc.parentSessionId), message);
                         return;
                     }
 
@@ -121,7 +121,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                     if (store == null)
                     {
-                        logger.error("Table {}.{} was dropped during snapshot phase of repair", desc.keyspace, desc.columnFamily);
+                        logger.error("Table {}.{} was dropped during snapshot phase of repair {}",
+                                     desc.keyspace, desc.columnFamily, desc.parentSessionId);
                         MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from());
                         return;
                     }
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 0ac34a3..c7a6d71 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -205,7 +205,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
         }
 
         fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCounter.get(), totalProgress, msg));
-        logger.info(msg);
+        logger.info(options.getPreviewKind().logPrefix(parentSession) + msg);
 
         ActiveRepairService.instance.removeParentRepairSession(parentSession);
         TraceState localState = traceState;
@@ -270,7 +270,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
         progressCounter.incrementAndGet();
 
         if (Iterables.isEmpty(validColumnFamilies))
-            throw new SkipRepairException(String.format("Empty keyspace, skipping repair: %s", keyspace));
+            throw new SkipRepairException(String.format("%s Empty keyspace, skipping repair: %s", parentSession, keyspace));
         return Lists.newArrayList(validColumnFamilies);
     }
 
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 95a6e57..2468857 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -285,7 +285,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         if (terminated)
             return;
 
-        logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames));
+        logger.info("{} parentSessionId = {}: new session: will sync {} on range {} for {}.{}",
+                    previewKind.logPrefix(getId()), parentRepairSession, repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames));
         Tracing.traceRepair("Syncing range {}", commonRange);
         if (!previewKind.isPreview())
         {
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index 6f0d846..420cd54 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -208,7 +208,6 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
 
         // execute repair and start prepare phase
         ListenableFuture<Boolean> sessionResult = coordinator.execute(sessionSupplier, proposeFailed);
-        Assert.assertFalse(proposeFailed.get());
         prepareLatch.countDown();
         // prepare completed
         try


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org