You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/10/07 22:33:29 UTC
[2/3] git commit: Save compaction history to system keyspace
Save compaction history to system keyspace
patch by lantao yan; reviewed by yukim for CASSANDRA-5078
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2b12784
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2b12784
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2b12784
Branch: refs/heads/trunk
Commit: a2b12784fe3785fe96d9c0e2d7e8c72bfc88ac7c
Parents: 01a57ee
Author: lantao yan <ya...@hotmail.com>
Authored: Mon Oct 7 15:22:11 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Oct 7 15:30:50 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +
.../org/apache/cassandra/config/CFMetaData.java | 11 +++
.../org/apache/cassandra/config/KSMetaData.java | 1 +
.../org/apache/cassandra/db/SystemKeyspace.java | 25 ++++++
.../CompactionHistoryTabularData.java | 84 ++++++++++++++++++++
.../db/compaction/CompactionManager.java | 14 ++++
.../db/compaction/CompactionManagerMBean.java | 4 +
.../cassandra/db/compaction/CompactionTask.java | 52 ++++++------
.../org/apache/cassandra/tools/NodeCmd.java | 26 ++++++
.../org/apache/cassandra/tools/NodeProbe.java | 6 ++
.../apache/cassandra/tools/NodeToolHelp.yaml | 3 +
12 files changed, 204 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddd976e..ee631a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
* Allow alter keyspace on system_traces (CASSANDRA-6016)
* Disallow empty column names in cql (CASSANDRA-6136)
* Use Java7 file-handling APIs and fix file moving on Windows (CASSANDRA-5383)
+ * Save compaction history to system keyspace (CASSANDRA-5078)
Merged from 1.2:
* Limit CQL prepared statement cache by size instead of count (CASSANDRA-6107)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6ed8449..37fbae7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,9 @@ New features
(See blog post at TODO)
- Configurable metrics reporting
(see conf/metrics-reporter-config-sample.yaml)
+ - Compaction history and stats are now saved to system keyspace
+ (system.compaction_history table). You can access historiy via
+ new 'nodetool compactionhistory' command or CQL.
Upgrading
---------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 8c4075c..bbea21e 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -268,6 +268,17 @@ public final class CFMetaData
+ "PRIMARY KEY ((keyspace_name, columnfamily_name, generation))"
+ ") WITH COMMENT='historic sstable read rates'");
+ public static final CFMetaData CompactionHistoryCf = compile("CREATE TABLE " + SystemKeyspace.COMPACTION_HISTORY_CF + " ("
+ + "id uuid,"
+ + "keyspace_name text,"
+ + "columnfamily_name text,"
+ + "compacted_at timestamp,"
+ + "bytes_in bigint,"
+ + "bytes_out bigint,"
+ + "rows_merged map<int, bigint>,"
+ + "PRIMARY KEY (id)"
+ + ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800");
+
public enum Caching
{
ALL, KEYS_ONLY, ROWS_ONLY, NONE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 20ecda3..0a32f5c 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -90,6 +90,7 @@ public final class KSMetaData
CFMetaData.SchemaColumnFamiliesCf,
CFMetaData.SchemaColumnsCf,
CFMetaData.CompactionLogCf,
+ CFMetaData.CompactionHistoryCf,
CFMetaData.PaxosCf,
CFMetaData.SSTableActivityCF);
return new KSMetaData(Keyspace.SYSTEM_KS, LocalStrategy.class, Collections.<String, String>emptyMap(), true, cfDefs);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1d5927a..50af82d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import javax.management.openmbean.*;
import com.google.common.base.Function;
import com.google.common.collect.HashMultimap;
@@ -29,6 +30,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
+import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.transport.Server;
import org.apache.commons.lang3.StringUtils;
@@ -80,6 +82,7 @@ public class SystemKeyspace
public static final String COMPACTION_LOG = "compactions_in_progress";
public static final String PAXOS_CF = "paxos";
public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
+ public static final String COMPACTION_HISTORY_CF = "compaction_history";
private static final String LOCAL_KEY = "local";
private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
@@ -220,6 +223,28 @@ public class SystemKeyspace
compactionLog.truncateBlocking();
}
+ public static void updateCompactionHistory(String ksname,
+ String cfname,
+ long compactedAt,
+ long bytesIn,
+ long bytesOut,
+ Map<Integer, Long> rowsMerged)
+ {
+ // don't write anything when the history table itself is compacted, since that would in turn cause new compactions
+ if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_CF))
+ return;
+ String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) "
+ + "VALUES (%s, '%s', '%s', %d, %d, %d, {%s})";
+ processInternal(String.format(req, COMPACTION_HISTORY_CF, UUIDGen.getTimeUUID().toString(), ksname, cfname, compactedAt, bytesIn, bytesOut, FBUtilities.toString(rowsMerged)));
+ forceBlockingFlush(COMPACTION_HISTORY_CF);
+ }
+
+ public static TabularData getCompactionHistory() throws OpenDataException
+ {
+ UntypedResultSet queryResultSet = processInternal("SELECT * from system.compaction_history");
+ return CompactionHistoryTabularData.from(queryResultSet);
+ }
+
public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java
new file mode 100644
index 0000000..be64d44
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import javax.management.openmbean.*;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.base.Throwables;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CompactionHistoryTabularData
+{
+ private static final String[] ITEM_NAMES = new String[]{ "id", "keyspace_name", "columnfamily_name", "compacted_at",
+ "bytes_in", "bytes_out", "rows_merged" };
+
+ private static final String[] ITEM_DESCS = new String[]{ "time uuid", "keyspace name",
+ "column family name", "compaction finished at",
+ "total bytes in", "total bytes out", "total rows merged" };
+
+ private static final String TYPE_NAME = "CompactionHistory";
+
+ private static final String ROW_DESC = "CompactionHistory";
+
+ private static final OpenType<?>[] ITEM_TYPES;
+
+ private static final CompositeType COMPOSITE_TYPE;
+
+ private static final TabularType TABULAR_TYPE;
+
+ static {
+ try
+ {
+ ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG,
+ SimpleType.LONG, SimpleType.LONG, SimpleType.STRING };
+
+ COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
+
+ TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static TabularData from(UntypedResultSet resultSet) throws OpenDataException
+ {
+ TabularDataSupport result = new TabularDataSupport(TABULAR_TYPE);
+ for (UntypedResultSet.Row row : resultSet)
+ {
+ UUID id = row.getUUID(ITEM_NAMES[0]);
+ String ksName = row.getString(ITEM_NAMES[1]);
+ String cfName = row.getString(ITEM_NAMES[2]);
+ long compactedAt = row.getLong(ITEM_NAMES[3]);
+ long bytesIn = row.getLong(ITEM_NAMES[4]);
+ long bytesOut = row.getLong(ITEM_NAMES[5]);
+ Map<Integer, Long> rowMerged = row.getMap(ITEM_NAMES[6], Int32Type.instance, LongType.instance);
+
+ result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
+ new Object[]{ id.toString(), ksName, cfName, compactedAt, bytesIn, bytesOut,
+ "{" + FBUtilities.toString(rowMerged) + "}" }));
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 317014f..bcf9422 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -24,6 +24,8 @@ import java.util.*;
import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
import com.google.common.base.Throwables;
import com.google.common.collect.*;
@@ -1016,6 +1018,18 @@ public class CompactionManager implements CompactionManagerMBean
return out;
}
+ public TabularData getCompactionHistory()
+ {
+ try
+ {
+ return SystemKeyspace.getCompactionHistory();
+ }
+ catch (OpenDataException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public long getTotalBytesCompacted()
{
return metrics.bytesCompacted.count();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 25f7c32..acf1e52 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.List;
import java.util.Map;
+import javax.management.openmbean.TabularData;
public interface CompactionManagerMBean
{
@@ -28,6 +29,9 @@ public interface CompactionManagerMBean
/** List of running compaction summary strings. */
public List<String> getCompactionSummary();
+ /** compaction history **/
+ public TabularData getCompactionHistory();
+
/**
* @see org.apache.cassandra.metrics.CompactionMetrics#pendingTasks
* @return estimated number of compactions remaining to perform
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c9889cc..0b2cb54 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -244,34 +244,34 @@ public class CompactionTask extends AbstractCompactionTask
for (SSTableReader sstable : sstables)
sstable.preheat(cachedKeyMap.get(sstable.descriptor));
- if (logger.isInfoEnabled())
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTable.getTotalBytes(toCompact);
+ long endsize = SSTable.getTotalBytes(sstables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder builder = new StringBuilder();
+ for (SSTableReader reader : sstables)
+ builder.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
+ for (int i = 0; i < counts.length; i++)
{
- // log a bunch of statistics about the result
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTable.getTotalBytes(toCompact);
- long endsize = SSTable.getTotalBytes(sstables);
- double ratio = (double)endsize / (double)startsize;
-
- StringBuilder builder = new StringBuilder();
- for (SSTableReader reader : sstables)
- builder.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- for (int i = 0; i < counts.length; i++)
- {
- int rows = i + 1;
- long count = counts[i];
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- }
-
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total rows, %,d unique. Row merge counts were {%s}",
- toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ int rows = i + 1;
+ long count = counts[i];
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
}
+
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, start, startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total rows, %,d unique. Row merge counts were {%s}",
+ toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
}
private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 657d7f2..d47a4a3 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -27,6 +27,7 @@ import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
+import javax.management.openmbean.TabularData;
import com.google.common.base.Joiner;
import com.google.common.collect.LinkedHashMultimap;
@@ -109,6 +110,7 @@ public class NodeCmd
CLEARSNAPSHOT,
COMPACT,
COMPACTIONSTATS,
+ COMPACTIONHISTORY,
DECOMMISSION,
DESCRIBECLUSTER,
DISABLEBINARY,
@@ -1085,6 +1087,7 @@ public class NodeCmd
case TPSTATS : nodeCmd.printThreadPoolStats(System.out); break;
case VERSION : nodeCmd.printReleaseVersion(System.out); break;
case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break;
+ case COMPACTIONHISTORY:nodeCmd.printCompactionHistory(System.out); break;
case DESCRIBECLUSTER : nodeCmd.printClusterDescription(System.out, host); break;
case DISABLEBINARY : probe.stopNativeTransport(); break;
case ENABLEBINARY : probe.startNativeTransport(); break;
@@ -1304,6 +1307,29 @@ public class NodeCmd
System.exit(probe.isFailed() ? 1 : 0);
}
+ private void printCompactionHistory(PrintStream out)
+ {
+ out.println("Compaction History: ");
+
+ TabularData tabularData = this.probe.compactionHistory();
+ if (tabularData.isEmpty())
+ {
+ out.printf("There is no compaction history");
+ return;
+ }
+
+ String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n";
+ List<String> indexNames = tabularData.getTabularType().getIndexNames();
+ out.printf(format, indexNames.toArray(new String[indexNames.size()]));
+
+ Set<?> values = tabularData.keySet();
+ for (Object eachValue : values)
+ {
+ List<?> value = (List<?>) eachValue;
+ out.printf(format, value.toArray(new Object[value.size()]));
+ }
+ }
+
private static void printHistory(String[] args, ToolCommandLine cmd)
{
//don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 15f837f..f05cfb7 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -36,6 +36,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
+import javax.management.openmbean.TabularData;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -864,6 +865,11 @@ public class NodeProbe
{
return spProxy.getReadRepairRepairedBackground();
}
+
+ public TabularData compactionHistory()
+ {
+ return compactionProxy.getCompactionHistory();
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2b12784/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index 4e8a8a0..165a174 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -49,6 +49,9 @@ commands:
- name: compactionstats
help: |
Print statistics on compactions
+ - name: compactionhistory
+ help: |
+ Print history of compaction
- name: disablebinary
help: |
Disable native transport (binary protocol)