You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/05/02 21:04:13 UTC
cassandra git commit: Enhanced Compaction Logging
Repository: cassandra
Updated Branches:
refs/heads/trunk 307890363 -> e16d8a7a6
Enhanced Compaction Logging
patch by Carl Yeksigian; reviewed by Marcus Eriksson for CASSANDRA-10805
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e16d8a7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e16d8a7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e16d8a7a
Branch: refs/heads/trunk
Commit: e16d8a7a667d50271a183a95be894126cb2a5414
Parents: 3078903
Author: Carl Yeksigian <ca...@apache.org>
Authored: Mon May 2 15:01:39 2016 -0400
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Mon May 2 15:03:38 2016 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 13 +-
.../compaction/AbstractCompactionStrategy.java | 23 +-
.../db/compaction/CompactionLogger.java | 342 +++++++++++++++++++
.../compaction/CompactionStrategyManager.java | 39 ++-
.../cassandra/db/compaction/CompactionTask.java | 2 +
.../DateTieredCompactionStrategy.java | 31 ++
.../DateTieredCompactionStrategyOptions.java | 8 +-
.../compaction/LeveledCompactionStrategy.java | 27 +-
.../SizeTieredCompactionStrategy.java | 1 +
10 files changed, 475 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c802031..1a3069c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.6
+ * Enhanced Compaction Logging (CASSANDRA-10805)
* Make prepared statement cache size configurable (CASSANDRA-11555)
* Integrated JMX authentication and authorization (CASSANDRA-10091)
* Add units to stress ouput (CASSANDRA-11352)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b47cf85..6b841c2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1163,12 +1163,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
memtable.cfs.replaceFlushed(memtable, sstables);
reclaim(memtable);
- logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
- sstables,
- sstables.size(),
- FBUtilities.prettyPrintMemory(totalBytesOnDisk),
- FBUtilities.prettyPrintMemory(maxBytesOnDisk),
- FBUtilities.prettyPrintMemory(minBytesOnDisk));
+ memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+ logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+ sstables,
+ sstables.size(),
+ FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+ FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+ FBUtilities.prettyPrintMemory(minBytesOnDisk));
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 40f0ce2..668bc51 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -63,11 +63,13 @@ public abstract class AbstractCompactionStrategy
// minimum interval needed to perform tombstone removal compaction in seconds, default 86400 or 1 day.
protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400;
protected static final boolean DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION = false;
+ protected static final boolean DEFAULT_LOG_ALL_OPTION = false;
protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold";
protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval";
// disable range overlap check when deciding if an SSTable is candidate for tombstone compaction (CASSANDRA-6563)
protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
+ protected static final String LOG_ALL_OPTION = "log_all";
protected static final String COMPACTION_ENABLED = "enabled";
public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones";
@@ -78,6 +80,7 @@ public abstract class AbstractCompactionStrategy
protected long tombstoneCompactionInterval;
protected boolean uncheckedTombstoneCompaction;
protected boolean disableTombstoneCompactions = false;
+ protected boolean logAll = true;
private final Directories directories;
@@ -110,6 +113,8 @@ public abstract class AbstractCompactionStrategy
tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
optionValue = options.get(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
uncheckedTombstoneCompaction = optionValue == null ? DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION : Boolean.parseBoolean(optionValue);
+ optionValue = options.get(LOG_ALL_OPTION);
+ logAll = optionValue == null ? DEFAULT_LOG_ALL_OPTION : Boolean.parseBoolean(optionValue);
if (!shouldBeEnabled())
this.disable();
}
@@ -463,7 +468,16 @@ public abstract class AbstractCompactionStrategy
if (unchecked != null)
{
if (!unchecked.equalsIgnoreCase("true") && !unchecked.equalsIgnoreCase("false"))
- throw new ConfigurationException(String.format("'%s' should be either 'true' or 'false', not '%s'",UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked));
+ throw new ConfigurationException(String.format("'%s' should be either 'true' or 'false', not '%s'", UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked));
+ }
+
+ String logAll = options.get(LOG_ALL_OPTION);
+ if (logAll != null)
+ {
+ if (!logAll.equalsIgnoreCase("true") && !logAll.equalsIgnoreCase("false"))
+ {
+ throw new ConfigurationException(String.format("'%s' should either be 'true' or 'false', not %s", LOG_ALL_OPTION, logAll));
+ }
}
String compactionEnabled = options.get(COMPACTION_ENABLED);
@@ -474,10 +488,12 @@ public abstract class AbstractCompactionStrategy
throw new ConfigurationException(String.format("enabled should either be 'true' or 'false', not %s", compactionEnabled));
}
}
+
Map<String, String> uncheckedOptions = new HashMap<String, String>(options);
uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION);
uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
uncheckedOptions.remove(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
+ uncheckedOptions.remove(LOG_ALL_OPTION);
uncheckedOptions.remove(COMPACTION_ENABLED);
uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
return uncheckedOptions;
@@ -521,6 +537,11 @@ public abstract class AbstractCompactionStrategy
return groupedSSTables;
}
+ public CompactionLogger.Strategy strategyLogger()
+ {
+ return CompactionLogger.Strategy.none;
+ }
+
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java b/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java
new file mode 100644
index 0000000..16a7f2a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.file.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+public class CompactionLogger
+{
+ public interface Strategy
+ {
+ JsonNode sstable(SSTableReader sstable);
+
+ JsonNode options();
+
+ static Strategy none = new Strategy()
+ {
+ public JsonNode sstable(SSTableReader sstable)
+ {
+ return null;
+ }
+
+ public JsonNode options()
+ {
+ return null;
+ }
+ };
+ }
+
+ /**
+ * This will produce the compaction strategy's starting information.
+ */
+ public interface StrategySummary
+ {
+ JsonNode getSummary();
+ }
+
+ /**
+ * This is an interface to allow writing to a different interface.
+ */
+ public interface Writer
+ {
+ /**
+ * This is used when we are already trying to write out the start of a
+ * @param statement This should be written out to the medium capturing the logs
+ * @param tag This is an identifier for a strategy; each strategy should have a distinct Object
+ */
+ void writeStart(JsonNode statement, Object tag);
+
+ /**
+ * @param statement This should be written out to the medium capturing the logs
+ * @param summary This can be used when a tag is not recognized by this writer; this can be because the file
+ * has been rolled, or otherwise the writer had to start over
+ * @param tag This is an identifier for a strategy; each strategy should have a distinct Object
+ */
+ void write(JsonNode statement, StrategySummary summary, Object tag);
+ }
+
+ private interface CompactionStrategyAndTableFunction
+ {
+ JsonNode apply(AbstractCompactionStrategy strategy, SSTableReader sstable);
+ }
+
+ private static final JsonNodeFactory json = JsonNodeFactory.instance;
+ private static final Logger logger = LoggerFactory.getLogger(CompactionLogger.class);
+ private static final Writer serializer = new CompactionLogSerializer();
+ private final ColumnFamilyStore cfs;
+ private final CompactionStrategyManager csm;
+ private final AtomicInteger identifier = new AtomicInteger(0);
+ private final Map<AbstractCompactionStrategy, String> compactionStrategyMapping = new ConcurrentHashMap<>();
+ private final AtomicBoolean enabled = new AtomicBoolean(false);
+
+ public CompactionLogger(ColumnFamilyStore cfs, CompactionStrategyManager csm)
+ {
+ this.csm = csm;
+ this.cfs = cfs;
+ }
+
+ private void forEach(Consumer<AbstractCompactionStrategy> consumer)
+ {
+ csm.getStrategies()
+ .forEach(l -> l.forEach(consumer));
+ }
+
+ private ArrayNode compactionStrategyMap(Function<AbstractCompactionStrategy, JsonNode> select)
+ {
+ ArrayNode node = json.arrayNode();
+ forEach(acs -> node.add(select.apply(acs)));
+ return node;
+ }
+
+ private ArrayNode sstableMap(Collection<SSTableReader> sstables, CompactionStrategyAndTableFunction csatf)
+ {
+ ArrayNode node = json.arrayNode();
+ sstables.forEach(t -> node.add(csatf.apply(csm.getCompactionStrategyFor(t), t)));
+ return node;
+ }
+
+ private String getId(AbstractCompactionStrategy strategy)
+ {
+ return compactionStrategyMapping.computeIfAbsent(strategy, s -> String.valueOf(identifier.getAndIncrement()));
+ }
+
+ private JsonNode formatSSTables(AbstractCompactionStrategy strategy)
+ {
+ ArrayNode node = json.arrayNode();
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ if (csm.getCompactionStrategyFor(sstable) == strategy)
+ node.add(formatSSTable(strategy, sstable));
+ }
+ return node;
+ }
+
+ private JsonNode formatSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable)
+ {
+ ObjectNode node = json.objectNode();
+ node.put("generation", sstable.descriptor.generation);
+ node.put("version", sstable.descriptor.version.getVersion());
+ node.put("size", sstable.onDiskLength());
+ JsonNode logResult = strategy.strategyLogger().sstable(sstable);
+ if (logResult != null)
+ node.put("details", logResult);
+ return node;
+ }
+
+ private JsonNode startStrategy(AbstractCompactionStrategy strategy)
+ {
+ ObjectNode node = json.objectNode();
+ node.put("strategyId", getId(strategy));
+ node.put("type", strategy.getName());
+ node.put("tables", formatSSTables(strategy));
+ node.put("repaired", csm.isRepaired(strategy));
+ List<String> folders = csm.getStrategyFolders(strategy);
+ ArrayNode folderNode = json.arrayNode();
+ for (String folder : folders)
+ {
+ folderNode.add(folder);
+ }
+ node.put("folders", folderNode);
+
+ JsonNode logResult = strategy.strategyLogger().options();
+ if (logResult != null)
+ node.put("options", logResult);
+ return node;
+ }
+
+ private JsonNode shutdownStrategy(AbstractCompactionStrategy strategy)
+ {
+ ObjectNode node = json.objectNode();
+ node.put("strategyId", getId(strategy));
+ return node;
+ }
+
+ private JsonNode describeSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable)
+ {
+ ObjectNode node = json.objectNode();
+ node.put("strategyId", getId(strategy));
+ node.put("table", formatSSTable(strategy, sstable));
+ return node;
+ }
+
+ private void describeStrategy(ObjectNode node)
+ {
+ node.put("keyspace", cfs.keyspace.getName());
+ node.put("table", cfs.getTableName());
+ node.put("time", System.currentTimeMillis());
+ }
+
+ private JsonNode startStrategies()
+ {
+ ObjectNode node = json.objectNode();
+ node.put("type", "enable");
+ describeStrategy(node);
+ node.put("strategies", compactionStrategyMap(this::startStrategy));
+ return node;
+ }
+
+ public void enable()
+ {
+ if (enabled.compareAndSet(false, true))
+ {
+ serializer.writeStart(startStrategies(), this);
+ }
+ }
+
+ public void disable()
+ {
+ if (enabled.compareAndSet(true, false))
+ {
+ ObjectNode node = json.objectNode();
+ node.put("type", "disable");
+ describeStrategy(node);
+ node.put("strategies", compactionStrategyMap(this::shutdownStrategy));
+ serializer.write(node, this::startStrategies, this);
+ }
+ }
+
+ public void flush(Collection<SSTableReader> sstables)
+ {
+ if (enabled.get())
+ {
+ ObjectNode node = json.objectNode();
+ node.put("type", "flush");
+ describeStrategy(node);
+ node.put("tables", sstableMap(sstables, this::describeSSTable));
+ serializer.write(node, this::startStrategies, this);
+ }
+ }
+
+ public void compaction(long startTime, Collection<SSTableReader> input, long endTime, Collection<SSTableReader> output)
+ {
+ if (enabled.get())
+ {
+ ObjectNode node = json.objectNode();
+ node.put("type", "compaction");
+ describeStrategy(node);
+ node.put("start", String.valueOf(startTime));
+ node.put("end", String.valueOf(endTime));
+ node.put("input", sstableMap(input, this::describeSSTable));
+ node.put("output", sstableMap(output, this::describeSSTable));
+ serializer.write(node, this::startStrategies, this);
+ }
+ }
+
+ public void pending(AbstractCompactionStrategy strategy, int remaining)
+ {
+ if (remaining != 0 && enabled.get())
+ {
+ ObjectNode node = json.objectNode();
+ node.put("type", "pending");
+ describeStrategy(node);
+ node.put("strategyId", getId(strategy));
+ node.put("pending", remaining);
+ serializer.write(node, this::startStrategies, this);
+ }
+ }
+
+ private static class CompactionLogSerializer implements Writer
+ {
+ private static final String logDirectory = System.getProperty("cassandra.logdir", ".");
+ private final ExecutorService loggerService = Executors.newFixedThreadPool(1);
+ // This is only accessed on the logger service thread, so it does not need to be thread safe
+ private final Set<Object> rolled = new HashSet<>();
+ private OutputStreamWriter stream;
+
+ private static OutputStreamWriter createStream() throws IOException
+ {
+ int count = 0;
+ Path compactionLog = Paths.get(logDirectory, "compaction.log");
+ if (Files.exists(compactionLog))
+ {
+ Path tryPath = compactionLog;
+ while (Files.exists(tryPath))
+ {
+ tryPath = Paths.get(logDirectory, String.format("compaction-%d.log", count++));
+ }
+ Files.move(compactionLog, tryPath);
+ }
+
+ return new OutputStreamWriter(Files.newOutputStream(compactionLog, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE));
+ }
+
+ private void writeLocal(String toWrite)
+ {
+ try
+ {
+ if (stream == null)
+ stream = createStream();
+ stream.write(toWrite);
+ stream.flush();
+ }
+ catch (IOException ioe)
+ {
+ // We'll drop the change and log the error to the logger.
+ NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 1, TimeUnit.MINUTES,
+ "Could not write to the log file: {}", ioe);
+ }
+
+ }
+
+ public void writeStart(JsonNode statement, Object tag)
+ {
+ final String toWrite = statement.toString() + System.lineSeparator();
+ loggerService.execute(() -> {
+ rolled.add(tag);
+ writeLocal(toWrite);
+ });
+ }
+
+ public void write(JsonNode statement, StrategySummary summary, Object tag)
+ {
+ final String toWrite = statement.toString() + System.lineSeparator();
+ loggerService.execute(() -> {
+ if (!rolled.contains(tag))
+ {
+ writeLocal(summary.getSummary().toString() + System.lineSeparator());
+ rolled.add(tag);
+ }
+ writeLocal(toWrite);
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index be861e1..4d93294 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -22,6 +22,7 @@ import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
@@ -61,6 +62,7 @@ import org.apache.cassandra.service.StorageService;
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
+ public final CompactionLogger compactionLogger;
private final ColumnFamilyStore cfs;
private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
@@ -86,6 +88,7 @@ public class CompactionStrategyManager implements INotificationConsumer
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
+ this.compactionLogger = new CompactionLogger(cfs, this);
reload(cfs.metadata);
params = cfs.metadata.params.compaction;
locations = getDirectories().getWriteableLocations();
@@ -162,6 +165,10 @@ public class CompactionStrategyManager implements INotificationConsumer
{
writeLock.unlock();
}
+ repaired.forEach(AbstractCompactionStrategy::startup);
+ unrepaired.forEach(AbstractCompactionStrategy::startup);
+ if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll))
+ compactionLogger.enable();
}
/**
@@ -171,7 +178,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* @param sstable
* @return
*/
- private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
readLock.lock();
@@ -234,6 +241,7 @@ public class CompactionStrategyManager implements INotificationConsumer
isActive = false;
repaired.forEach(AbstractCompactionStrategy::shutdown);
unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+ compactionLogger.disable();
}
finally
{
@@ -847,4 +855,33 @@ public class CompactionStrategyManager implements INotificationConsumer
readLock.unlock();
}
}
+
+ public boolean isRepaired(AbstractCompactionStrategy strategy)
+ {
+ return repaired.contains(strategy);
+ }
+
+ public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
+ {
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+ if (cfs.getPartitioner().splitter().isPresent())
+ {
+ int unrepairedIndex = unrepaired.indexOf(strategy);
+ if (unrepairedIndex > 0)
+ {
+ return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+ }
+ int repairedIndex = repaired.indexOf(strategy);
+ if (repairedIndex > 0)
+ {
+ return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+ }
+ }
+ List<String> folders = new ArrayList<>(locations.length);
+ for (Directories.DataDirectory location : locations)
+ {
+ folders.add(location.location.getAbsolutePath());
+ }
+ return folders;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1465ba4..5df91fd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -150,6 +150,7 @@ public class CompactionTask extends AbstractCompactionTask
logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
long start = System.nanoTime();
+ long startTime = System.currentTimeMillis();
long totalKeysWritten = 0;
long estimatedKeys = 0;
try (CompactionController controller = getCompactionController(transaction.originals()))
@@ -234,6 +235,7 @@ public class CompactionTask extends AbstractCompactionTask
mergeSummary));
logger.trace(String.format("CF Total Bytes Compacted: %s", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize))));
logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
+ cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, transaction.originals(), System.currentTimeMillis(), newSStables);
// update the metrics
cfs.metric.compactionBytesWritten.inc(endsize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 9a17e06..7c1ff13 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
@@ -32,6 +33,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.Pair;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
import static com.google.common.collect.Iterables.filter;
@@ -342,6 +346,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
n += Math.ceil((double)stcsBucket.size() / cfs.getMaximumCompactionThreshold());
}
estimatedRemainingTasks = n;
+ cfs.getCompactionStrategyManager().compactionLogger.pending(this, n);
}
@@ -453,6 +458,32 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
return uncheckedOptions;
}
+ public CompactionLogger.Strategy strategyLogger() {
+ return new CompactionLogger.Strategy()
+ {
+ public JsonNode sstable(SSTableReader sstable)
+ {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.put("min_timestamp", sstable.getMinTimestamp());
+ node.put("max_timestamp", sstable.getMaxTimestamp());
+ return node;
+ }
+
+ public JsonNode options()
+ {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ TimeUnit resolution = DateTieredCompactionStrategy.this.options.timestampResolution;
+ node.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY,
+ resolution.toString());
+ node.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY,
+ resolution.toSeconds(DateTieredCompactionStrategy.this.options.baseTime));
+ node.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY,
+ resolution.toSeconds(DateTieredCompactionStrategy.this.options.maxWindowSize));
+ return node;
+ }
+ };
+ }
+
public String toString()
{
return String.format("DateTieredCompactionStrategy[%s/%s]",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
index 78a0cab..fee9e34 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
@@ -44,6 +44,7 @@ public final class DateTieredCompactionStrategyOptions
@Deprecated
protected final long maxSSTableAge;
+ protected final TimeUnit timestampResolution;
protected final long baseTime;
protected final long expiredSSTableCheckFrequency;
protected final long maxWindowSize;
@@ -51,7 +52,7 @@ public final class DateTieredCompactionStrategyOptions
public DateTieredCompactionStrategyOptions(Map<String, String> options)
{
String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
- TimeUnit timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
+ timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString());
optionValue = options.get(MAX_SSTABLE_AGE_KEY);
@@ -68,9 +69,10 @@ public final class DateTieredCompactionStrategyOptions
public DateTieredCompactionStrategyOptions()
{
maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert((long) DEFAULT_MAX_SSTABLE_AGE_DAYS, TimeUnit.DAYS));
- baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
+ timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+ baseTime = timestampResolution.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS);
- maxWindowSize = DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS);
+ maxWindowSize = timestampResolution.convert(1, TimeUnit.DAYS);
}
public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 068d283..b6ad64c 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -38,6 +38,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.FBUtilities;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
public class LeveledCompactionStrategy extends AbstractCompactionStrategy
{
@@ -208,7 +211,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
public int getEstimatedRemainingTasks()
{
- return manifest.getEstimatedTasks();
+ int n = manifest.getEstimatedTasks();
+ cfs.getCompactionStrategyManager().compactionLogger.pending(this, n);
+ return n;
}
public long getMaxSSTableBytes()
@@ -444,6 +449,26 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
return null;
}
+ public CompactionLogger.Strategy strategyLogger()
+ {
+ return new CompactionLogger.Strategy()
+ {
+ public JsonNode sstable(SSTableReader sstable)
+ {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.put("level", sstable.getSSTableLevel());
+ node.put("min_token", sstable.first.getToken().toString());
+ node.put("max_token", sstable.last.getToken().toString());
+ return node;
+ }
+
+ public JsonNode options()
+ {
+ return null;
+ }
+ };
+ }
+
public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
{
Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 28bdf5c..8ef2ac7 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -86,6 +86,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
logger.trace("Compaction buckets are {}", buckets);
estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
+ cfs.getCompactionStrategyManager().compactionLogger.pending(this, estimatedRemainingTasks);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
if (!mostInteresting.isEmpty())
return mostInteresting;