You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/23 17:08:34 UTC
[2/4] git commit: merge from 1.2
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f620b348
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f620b348
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f620b348
Branch: refs/heads/trunk
Commit: f620b348a064a55749ffcbc4d25e08c3fe71f1be
Parents: 0680372 b706391
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 23 10:08:06 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 23 10:08:06 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 9 ++++-
.../apache/cassandra/db/HintedHandOffManager.java | 25 +++++++++++
src/java/org/apache/cassandra/db/RowMutation.java | 9 ++++
src/java/org/apache/cassandra/db/SystemTable.java | 32 ++++++++++-----
.../cassandra/db/commitlog/CommitLogReplayer.java | 10 ++---
6 files changed, 68 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b765896,66c5f04..d283b72
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,5 +1,59 @@@
+2.0
+ * Removed on-heap row cache (CASSANDRA-5348)
+ * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
+ * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
+ * Experimental triggers (CASSANDRA-1311)
+ * JEMalloc support for off-heap allocation (CASSANDRA-3997)
+ * Single-pass compaction (CASSANDRA-4180)
+ * Removed token range bisection (CASSANDRA-5518)
+ * Removed compatibility with pre-1.2.5 sstables and network messages
+ (CASSANDRA-5511)
+ * removed PBSPredictor (CASSANDRA-5455)
+ * CAS support (CASSANDRA-5062, 5441, 5443)
+ * Leveled compaction performs size-tiered compactions in L0
+ (CASSANDRA-5371, 5439)
+ * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
+ * Log when a node is down longer than the hint window (CASSANDRA-4554)
+ * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
+ * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
+ * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
+ * Change Message IDs to ints (CASSANDRA-5307)
+ * Move sstable level information into the Stats component, removing the
+ need for a separate Manifest file (CASSANDRA-4872)
+ * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
+ * make index_interval configurable per columnfamily (CASSANDRA-3961)
+ * add default_time_to_live (CASSANDRA-3974)
+ * add memtable_flush_period_in_ms (CASSANDRA-4237)
+ * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
+ * upgrade thrift to 0.9.0 (CASSANDRA-3719)
+ * drop unnecessary keyspace parameter from user-defined compaction API
+ (CASSANDRA-5139)
+ * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
+ * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
+ * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
+ * Allow custom configuration loader (CASSANDRA-5045)
+ * Remove memory emergency pressure valve logic (CASSANDRA-3534)
+ * Reduce request latency with eager retry (CASSANDRA-4705)
+ * cqlsh: Remove ASSUME command (CASSANDRA-5331)
+ * Rebuild BF when loading sstables if bloom_filter_fp_chance
+ has changed since compaction (CASSANDRA-5015)
+ * remove row-level bloom filters (CASSANDRA-4885)
+ * Change Kernel Page Cache skipping into row preheating (disabled by default)
+ (CASSANDRA-4937)
+ * Improve repair by deciding on a gcBefore before sending
+ out TreeRequests (CASSANDRA-4932)
+ * Add an official way to disable compactions (CASSANDRA-5074)
+ * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
+ * Add binary protocol versioning (CASSANDRA-5436)
+ * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
+ * Add alias support to SELECT statement (CASSANDRA-5075)
+ * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
+ * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
+ * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
+
+
1.2.6
+ * Ignore pre-truncate hints (CASSANDRA-4655)
* Move System.exit on OOM into a separate thread (CASSANDRA-5273)
* Write row markers when serializing schema (CASSANDRA-5572)
* Check only SSTables for the requested range when streaming (CASSANDRA-5569)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 78825cf,429859e..dcd7814
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1830,116 -1793,11 +1831,116 @@@ public class ColumnFamilyStore implemen
}
}
- long truncatedAt = System.currentTimeMillis();
- if (DatabaseDescriptor.isAutoSnapshot())
- snapshot(Table.getTimestampedSnapshotName(columnFamily));
+ Runnable truncateRunnable = new Runnable()
+ {
+ public void run()
+ {
+ logger.debug("Discarding sstable data for truncated CF + indexes");
+
+ final long truncatedAt = System.currentTimeMillis();
+ if (DatabaseDescriptor.isAutoSnapshot())
+ snapshot(Table.getTimestampedSnapshotName(name));
+
+ ReplayPosition replayAfter = discardSSTables(truncatedAt);
+
+ for (SecondaryIndex index : indexManager.getIndexes())
+ index.truncateBlocking(truncatedAt);
+
- SystemTable.saveTruncationPosition(ColumnFamilyStore.this, replayAfter);
++ SystemTable.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
+
+ logger.debug("cleaning out row cache");
+ for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+ {
+ if (key.cfId == metadata.cfId)
+ CacheService.instance.rowCache.remove(key);
+ }
+ }
+ };
+
+ runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
+ logger.debug("truncate complete");
+ }
+
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
+ {
+ // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
+ // and so we only run one major compaction at a time
+ synchronized (this)
+ {
+ logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
+
+ Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
+ for (ColumnFamilyStore cfs : selfWithIndexes)
+ cfs.getCompactionStrategy().pause();
+ try
+ {
+ // interrupt in-progress compactions
+ Function<ColumnFamilyStore, CFMetaData> f = new Function<ColumnFamilyStore, CFMetaData>()
+ {
+ public CFMetaData apply(ColumnFamilyStore cfs)
+ {
+ return cfs.metadata;
+ }
+ };
+ Iterable<CFMetaData> allMetadata = Iterables.transform(selfWithIndexes, f);
+ CompactionManager.instance.interruptCompactionFor(allMetadata, interruptValidation);
+
+ // wait for the interruption to be recognized
+ long start = System.nanoTime();
+ long delay = TimeUnit.MINUTES.toNanos(1);
+ while (System.nanoTime() - start < delay)
+ {
+ if (CompactionManager.instance.isCompacting(selfWithIndexes))
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ else
+ break;
+ }
+
+ // doublecheck that we finished, instead of timing out
+ for (ColumnFamilyStore cfs : selfWithIndexes)
+ {
+ if (!cfs.getDataTracker().getCompacting().isEmpty())
+ {
+ logger.warn("Unable to cancel in-progress compactions for {}. Probably there is an unusually large row in progress somewhere. It is also possible that buggy code left some sstables compacting after it was done with them", metadata.cfName);
+ }
+ }
+ logger.debug("Compactions successfully cancelled");
+
+ // run our task
+ try
+ {
+ return callable.call();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ finally
+ {
+ for (ColumnFamilyStore cfs : selfWithIndexes)
+ cfs.getCompactionStrategy().resume();
+ }
+ }
+ }
+
+ public Iterable<SSTableReader> markAllCompacting()
+ {
+ Callable<Iterable<SSTableReader>> callable = new Callable<Iterable<SSTableReader>>()
+ {
+ public Iterable<SSTableReader> call() throws Exception
+ {
+ assert data.getCompacting().isEmpty() : data.getCompacting();
+ Iterable<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
+ if (Iterables.isEmpty(sstables))
+ return null;
+ boolean success = data.markCompacting(sstables);
+ assert success : "something marked things compacting while compactions are disabled";
+ return sstables;
+ }
+ };
- return CompactionManager.instance.submitTruncate(this, truncatedAt);
+ return runWithCompactionsDisabled(callable, false);
}
public long getBloomFilterFalsePositives()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/RowMutation.java
index b08055f,b85cfcd..d78247b
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@@ -238,14 -311,65 +238,23 @@@ public class RowMutation implements IMu
return buff.append("])").toString();
}
- public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc)
- {
- if (cosc.super_column != null)
- {
- for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns)
- {
- add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl);
- }
- }
- else if (cosc.column != null)
- {
- add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
- }
- else if (cosc.counter_super_column != null)
- {
- for (org.apache.cassandra.thrift.CounterColumn column : cosc.counter_super_column.columns)
- {
- addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name), column.value);
- }
- }
- else // cosc.counter_column != null
- {
- addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value);
- }
- }
-
- public void deleteColumnOrSuperColumn(String cfName, Deletion del)
- {
- if (del.predicate != null && del.predicate.column_names != null)
- {
- for(ByteBuffer c : del.predicate.column_names)
- {
- if (del.super_column == null && Schema.instance.getColumnFamilyType(table, cfName) == ColumnFamilyType.Super)
- delete(new QueryPath(cfName, c), del.timestamp);
- else
- delete(new QueryPath(cfName, del.super_column, c), del.timestamp);
- }
- }
- else
- {
- delete(new QueryPath(cfName, del.super_column), del.timestamp);
- }
- }
-
+ public RowMutation without(UUID cfId)
+ {
+ RowMutation rm = new RowMutation(table, key);
+ for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+ if (!entry.getKey().equals(cfId))
+ rm.add(entry.getValue());
+ return rm;
+ }
+
public static class RowMutationSerializer implements IVersionedSerializer<RowMutation>
{
- public void serialize(RowMutation rm, DataOutput dos, int version) throws IOException
+ public void serialize(RowMutation rm, DataOutput out, int version) throws IOException
{
- dos.writeUTF(rm.getTable());
- ByteBufferUtil.writeWithShortLength(rm.key(), dos);
+ if (version < MessagingService.VERSION_20)
+ out.writeUTF(rm.getTable());
+
+ ByteBufferUtil.writeWithShortLength(rm.key(), out);
/* serialize the modifications in the mutation */
int size = rm.modifications.size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemTable.java
index dd818b2,327f01b..9662661
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@@ -45,14 -43,14 +45,14 @@@ import org.apache.cassandra.db.marshal.
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.sstable.SSTableReader;
+ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.Constants;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CounterId;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PaxosState;
+import org.apache.cassandra.thrift.cassandraConstants;
+import org.apache.cassandra.utils.*;
import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
@@@ -126,71 -140,61 +126,81 @@@ public class SystemTabl
DatabaseDescriptor.getPartitioner().getClass().getName()));
}
- /** if system data becomes incompatible across versions of cassandra, that logic (and associated purging) is managed here */
- private static void upgradeSystemData() throws ExecutionException, InterruptedException
+ /**
+ * Write compaction log, except columfamilies under system keyspace.
+ *
+ * @param cfs
+ * @param toCompact sstables to compact
+ * @return compaction task id or null if cfs is under system keyspace
+ */
+ public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
{
- Table table = Table.open(Table.SYSTEM_KS);
- ColumnFamilyStore oldStatusCfs = table.getColumnFamilyStore(OLD_STATUS_CF);
- if (oldStatusCfs.getSSTables().size() > 0)
+ if (Table.SYSTEM_KS.equals(cfs.table.getName()))
+ return null;
+
+ UUID compactionId = UUIDGen.getTimeUUID();
+ String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
+ Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
{
- SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
- cols.add(ByteBufferUtil.bytes("ClusterName"));
- cols.add(ByteBufferUtil.bytes("Token"));
- QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), new QueryPath(OLD_STATUS_CF), cols);
- ColumnFamily oldCf = oldStatusCfs.getColumnFamily(filter);
- Iterator<IColumn> oldColumns = oldCf.columns.iterator();
-
- String clusterName = null;
- try
- {
- clusterName = ByteBufferUtil.string(oldColumns.next().value());
- }
- catch (CharacterCodingException e)
+ public Integer apply(SSTableReader sstable)
{
- throw new RuntimeException(e);
+ return sstable.descriptor.generation;
}
- // serialize the old token as a collection of (one )tokens.
- Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value());
- String tokenBytes = tokensAsSet(Collections.singleton(token));
- // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason)
- String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', %s, '%s')";
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name()));
-
- oldStatusCfs.truncate();
- }
+ });
+ processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
+ forceBlockingFlush(COMPACTION_LOG);
+ return compactionId;
+ }
+
+ public static void finishCompaction(UUID taskId)
+ {
+ assert taskId != null;
- ColumnFamilyStore oldHintsCfs = table.getColumnFamilyStore(OLD_HINTS_CF);
- if (oldHintsCfs.getSSTables().size() > 0)
+ String req = "DELETE FROM system.%s WHERE id = %s";
+ processInternal(String.format(req, COMPACTION_LOG, taskId));
+ forceBlockingFlush(COMPACTION_LOG);
+ }
+
+ /**
+ * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+ */
+ public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+ {
+ String req = "SELECT * FROM system.%s";
+ UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
+
+ SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+ for (UntypedResultSet.Row row : resultSet)
{
- logger.info("Possible old-format hints found. Truncating");
- oldHintsCfs.truncate();
+ String keyspace = row.getString("keyspace_name");
+ String columnfamily = row.getString("columnfamily_name");
+ Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+
+ unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
}
+ return unfinishedCompactions;
+ }
+
+ public static void discardCompactionsInProgress()
+ {
+ ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
+ compactionLog.truncateBlocking();
}
- public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position)
+ public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
- processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, position), LOCAL_KEY));
+ processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+ forceBlockingFlush(LOCAL_CF);
+ }
+
+ /**
+ * This method is used to remove information about truncation time for specified column family
+ */
+ public static void removeTruncationRecord(UUID cfId)
+ {
+ String req = "DELETE truncation_time['%s'] from system.%s WHERE key = '%s'";
+ processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
forceBlockingFlush(LOCAL_CF);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------