You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/04/14 13:59:08 UTC
[cassandra] branch trunk updated: Improve logging around
incremental repairs
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78c7853 Improve logging around incremental repairs
78c7853 is described below
commit 78c785367f2836960ae26e5e5ef258162ef30ec3
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Feb 25 14:10:22 2020 +0100
Improve logging around incremental repairs
Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-15599
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionManager.java | 28 ++++++++++++----------
.../db/compaction/PendingRepairManager.java | 2 +-
.../db/repair/CassandraValidationIterator.java | 15 +++++++++---
.../org/apache/cassandra/repair/RepairJob.java | 19 +++++++++------
.../cassandra/repair/RepairMessageVerbHandler.java | 7 +++---
.../apache/cassandra/repair/RepairRunnable.java | 4 ++--
.../org/apache/cassandra/repair/RepairSession.java | 3 ++-
.../consistent/CoordinatorMessagingTest.java | 1 -
9 files changed, 50 insertions(+), 30 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 38fef18..d125138 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Improve logging around incremental repair (CASSANDRA-15599)
* Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
* Replace array iterators with get by index (CASSANDRA-15394)
* Minimize BTree iterator allocations (CASSANDRA-15389)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7924a1f..2b5ffe6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1486,7 +1486,7 @@ public class CompactionManager implements CompactionManagerMBean
BooleanSupplier isCancelled)
{
int originalCount = txn.originals().size();
- logger.info("Performing anticompaction on {} sstables", originalCount);
+ logger.info("Performing anticompaction on {} sstables for {}", originalCount, pendingRepair);
//Group SSTables
Set<SSTableReader> sstables = txn.originals();
@@ -1509,9 +1509,8 @@ public class CompactionManager implements CompactionManagerMBean
antiCompactedSSTableCount += antiCompacted;
}
}
-
- String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format, originalCount, antiCompactedSSTableCount);
+ String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s) for {}.";
+ logger.info(format, originalCount, antiCompactedSSTableCount, pendingRepair);
}
@VisibleForTesting
@@ -1537,7 +1536,7 @@ public class CompactionManager implements CompactionManagerMBean
return 0;
}
- logger.info("Anticompacting {}", txn);
+ logger.info("Anticompacting {} in {}.{} for {}", txn.originals(), cfs.keyspace.getName(), cfs.getTableName(), pendingRepair);
Set<SSTableReader> sstableAsSet = txn.originals();
File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
@@ -1627,8 +1626,6 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- List<SSTableReader> anticompactedSSTables = new ArrayList<>();
-
fullWriter.prepareToCommit();
transWriter.prepareToCommit();
unrepairedWriter.prepareToCommit();
@@ -1636,16 +1633,23 @@ public class CompactionManager implements CompactionManagerMBean
txn.obsoleteOriginals();
txn.prepareToCommit();
- anticompactedSSTables.addAll(fullWriter.finished());
- anticompactedSSTables.addAll(transWriter.finished());
- anticompactedSSTables.addAll(unrepairedWriter.finished());
+ List<SSTableReader> fullSSTables = new ArrayList<>(fullWriter.finished());
+ List<SSTableReader> transSSTables = new ArrayList<>(transWriter.finished());
+ List<SSTableReader> unrepairedSSTables = new ArrayList<>(unrepairedWriter.finished());
fullWriter.commit();
transWriter.commit();
unrepairedWriter.commit();
txn.commit();
-
- return anticompactedSSTables.size();
+ logger.info("Anticompacted {} in {}.{} to full = {}, transient = {}, unrepaired = {} for {}",
+ sstableAsSet,
+ cfs.keyspace.getName(),
+ cfs.getTableName(),
+ fullSSTables,
+ transSSTables,
+ unrepairedSSTables,
+ pendingRepair);
+ return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size();
}
catch (Throwable e)
{
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 78d4483..764a4dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -443,7 +443,7 @@ class PendingRepairManager
{
if (obsoleteSSTables)
{
- logger.info("Obsoleting transient repaired ssatbles");
+ logger.info("Obsoleting transient repaired sstables for {}", sessionID);
Preconditions.checkState(Iterables.all(transaction.originals(), SSTableReader::isTransient));
transaction.obsoleteOriginals();
}
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index 6c6f084..9bddd86 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -152,7 +152,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
sstables = Refs.tryRef(sstablesToValidate);
if (sstables == null)
{
- logger.error("Could not reference sstables");
+ logger.error("Could not reference sstables for {}", parentId);
throw new RuntimeException("Could not reference sstables");
}
}
@@ -204,8 +204,17 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
}
Preconditions.checkArgument(sstables != null);
- logger.info("Performing validation compaction on {} sstables", sstables.size());
- logger.debug("Performing validation compaction on {}", sstables);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentId);
+ if (prs != null)
+ {
+ logger.info("{}, parentSessionId={}: Performing validation compaction on {} sstables in {}.{}",
+ prs.previewKind.logPrefix(sessionID),
+ parentId,
+ sstables.size(),
+ cfs.keyspace.getName(),
+ cfs.getTableName());
+ }
+
controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, nowInSec));
scanners = cfs.getCompactionStrategyManager().getScanners(sstables, ranges);
ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.active);
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index e609f0d..16eb325 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -151,7 +151,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
if (!session.previewKind.isPreview())
{
- logger.info("{} {} is fully synced", session.previewKind.logPrefix(session.getId()), desc.columnFamily);
+ logger.info("{} {}.{} is fully synced", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily);
}
cfs.metric.repairsCompleted.inc();
@@ -165,7 +165,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
if (!session.previewKind.isPreview())
{
- logger.warn("{} {} sync failed", session.previewKind.logPrefix(session.getId()), desc.columnFamily);
+ logger.warn("{} {}.{} sync failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t);
}
cfs.metric.repairsCompleted.inc();
@@ -199,6 +199,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
boolean pullRepair,
PreviewKind previewKind)
{
+ long startedAt = System.currentTimeMillis();
List<SyncTask> syncTasks = new ArrayList<>();
// We need to difference all trees one against another
for (int i = 0; i < trees.size() - 1; ++i)
@@ -252,7 +253,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
trees.get(i).trees.release();
}
trees.get(trees.size() - 1).trees.release();
-
+ logger.info("Created {} sync tasks based on {} merkle tree responses for {} (took: {}ms)",
+ syncTasks.size(), trees.size(), desc.parentSessionId, System.currentTimeMillis() - startedAt);
return syncTasks;
}
@@ -290,6 +292,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
boolean isIncremental,
PreviewKind previewKind)
{
+ long startedAt = System.currentTimeMillis();
List<SyncTask> syncTasks = new ArrayList<>();
// We need to difference all trees one against another
DifferenceHolder diffHolder = new DifferenceHolder(trees);
@@ -339,6 +342,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
logger.debug("Node {} has nothing to stream", address);
}
}
+ logger.info("Created {} optimised sync tasks based on {} merkle tree responses for {} (took: {}ms)",
+ syncTasks.size(), trees.size(), desc.parentSessionId, System.currentTimeMillis() - startedAt);
return syncTasks;
}
@@ -384,7 +389,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints);
InetAddressAndPort address = requests.poll();
ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, session.previewKind);
- logger.info("Validating {}", address);
+ logger.info("{} Validating {}", session.previewKind.logPrefix(desc.sessionId), address);
session.trackValidationCompletion(Pair.create(desc, address), firstTask);
tasks.add(firstTask);
ValidationTask currentTask = firstTask;
@@ -397,7 +402,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
public void onSuccess(TreeResponse result)
{
- logger.info("Validating {}", nextAddress);
+ logger.info("{} Validating {}", session.previewKind.logPrefix(desc.sessionId), nextAddress);
session.trackValidationCompletion(Pair.create(desc, nextAddress), nextTask);
taskExecutor.execute(nextTask);
}
@@ -441,7 +446,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
Queue<InetAddressAndPort> requests = entry.getValue();
InetAddressAndPort address = requests.poll();
ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, session.previewKind);
- logger.info("Validating {}", address);
+ logger.info("{} Validating {}", session.previewKind.logPrefix(session.getId()), address);
session.trackValidationCompletion(Pair.create(desc, address), firstTask);
tasks.add(firstTask);
ValidationTask currentTask = firstTask;
@@ -454,7 +459,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
public void onSuccess(TreeResponse result)
{
- logger.info("Validating {}", nextAddress);
+ logger.info("{} Validating {}", session.previewKind.logPrefix(session.getId()), nextAddress);
session.trackValidationCompletion(Pair.create(desc, nextAddress), nextTask);
taskExecutor.execute(nextTask);
}
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 27ffd05..47da8bb 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -95,8 +95,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
if (cfs == null)
{
- logErrorAndSendFailureResponse(String.format("Table %s.%s was dropped during snapshot phase of repair",
- desc.keyspace, desc.columnFamily), message);
+ logErrorAndSendFailureResponse(String.format("Table %s.%s was dropped during snapshot phase of repair %s",
+ desc.keyspace, desc.columnFamily, desc.parentSessionId), message);
return;
}
@@ -121,7 +121,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
if (store == null)
{
- logger.error("Table {}.{} was dropped during snapshot phase of repair", desc.keyspace, desc.columnFamily);
+ logger.error("Table {}.{} was dropped during snapshot phase of repair {}",
+ desc.keyspace, desc.columnFamily, desc.parentSessionId);
MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from());
return;
}
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 0ac34a3..c7a6d71 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -205,7 +205,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
}
fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCounter.get(), totalProgress, msg));
- logger.info(msg);
+ logger.info(options.getPreviewKind().logPrefix(parentSession) + msg);
ActiveRepairService.instance.removeParentRepairSession(parentSession);
TraceState localState = traceState;
@@ -270,7 +270,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
progressCounter.incrementAndGet();
if (Iterables.isEmpty(validColumnFamilies))
- throw new SkipRepairException(String.format("Empty keyspace, skipping repair: %s", keyspace));
+ throw new SkipRepairException(String.format("%s Empty keyspace, skipping repair: %s", parentSession, keyspace));
return Lists.newArrayList(validColumnFamilies);
}
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 95a6e57..2468857 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -285,7 +285,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
if (terminated)
return;
- logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames));
+ logger.info("{} parentSessionId = {}: new session: will sync {} on range {} for {}.{}",
+ previewKind.logPrefix(getId()), parentRepairSession, repairedNodes(), commonRange, keyspace, Arrays.toString(cfnames));
Tracing.traceRepair("Syncing range {}", commonRange);
if (!previewKind.isPreview())
{
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index 6f0d846..420cd54 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -208,7 +208,6 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
// execute repair and start prepare phase
ListenableFuture<Boolean> sessionResult = coordinator.execute(sessionSupplier, proposeFailed);
- Assert.assertFalse(proposeFailed.get());
prepareLatch.countDown();
// prepare completed
try
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org