You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/12 22:06:29 UTC
[2/4] git commit: avoid flushing everyone on truncate;
save truncation position in system table instead patch by jbellis;
reviewed by yukim for CASSANDRA-4906
avoid flushing everyone on truncate; save truncation position in system table instead
patch by jbellis; reviewed by yukim for CASSANDRA-4906
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/38bfc6dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/38bfc6dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/38bfc6dc
Branch: refs/heads/trunk
Commit: 38bfc6dca06bd0192167ae5e9bd51d593542f03e
Parents: 93f8fec
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Oct 27 11:18:31 2012 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Nov 12 15:05:36 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 3 +-
.../apache/cassandra/cql3/UntypedResultSet.java | 9 ++-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 97 ++++-----------
src/java/org/apache/cassandra/db/SystemTable.java | 64 ++++++++++
.../cassandra/db/commitlog/CommitLogReplayer.java | 21 ++--
.../cassandra/db/compaction/CompactionManager.java | 5 +-
7 files changed, 118 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff79b9a..9ac6227 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-rc1
+ * save truncation position in system table (CASSANDRA-4906)
* Move CompressionMetadata off-heap (CASSANDRA-4937)
* allow CLI to GET cql3 columnfamily data (CASSANDRA-4924)
* Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 921242a..5f0e93a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -181,7 +181,8 @@ public final class CFMetaData
+ "thrift_version text,"
+ "cql_version text,"
+ "data_center text,"
- + "rack text"
+ + "rack text,"
+ + "truncated_at map<uuid, blob>"
+ ") WITH COMMENT='information about the local node'");
public static final CFMetaData TraceSessionsCf = compile(14, "CREATE TABLE " + Tracing.SESSIONS_CF + " ("
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index ca3acf5..b6fcb55 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -133,7 +133,14 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
public <T> Set<T> getSet(String column, AbstractType<T> type)
{
- return SetType.getInstance(type).compose(data.get(column));
+ ByteBuffer raw = data.get(column);
+ return raw == null ? null : SetType.getInstance(type).compose(raw);
+ }
+
+ public <K, V> Map<K, V> getMap(String column, AbstractType<K> keyType, AbstractType<V> valueType)
+ {
+ ByteBuffer raw = data.get(column);
+ return raw == null ? null : MapType.getInstance(keyType, valueType).compose(raw);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a91af8c..439ef5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1720,38 +1720,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Waits for flushes started BEFORE THIS METHOD IS CALLED to finish.
- * Does NOT guarantee that no flush is active when it returns.
- */
- private void waitForActiveFlushes()
- {
- Future<?> future;
- Table.switchLock.writeLock().lock();
- try
- {
- future = postFlushExecutor.submit(new Runnable() { public void run() { } });
- }
- finally
- {
- Table.switchLock.writeLock().unlock();
- }
-
- try
- {
- future.get();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
- {
- throw new AssertionError(e);
- }
- }
-
- /**
- * Truncate practically deletes the entire column family's data
+ * Truncate deletes the entire column family's data with no expensive tombstone creation
* @return a Future to the delete operation. Call the future's get() to make
* sure the column family has been deleted
*/
@@ -1767,21 +1736,29 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// Bonus complication: since we store replay position in sstable metadata,
// truncating those sstables means we will replay any CL segments from the
// beginning if we restart before they are discarded for normal reasons
- // post-truncate. So we need to (a) force a new segment so the currently
- // active one can be discarded, and (b) flush *all* CFs so that unflushed
- // data in others don't keep any pre-truncate CL segments alive.
- //
- // Bonus bonus: simply forceFlush of all the CF is not enough, because if
- // for a given column family the memtable is clean, forceFlush will return
- // immediately, even though there could be a memtable being flushed at the same
- // time. So to guarantee that all segments can be cleaned out, we need to
- // "waitForActiveFlushes" after the new segment has been created.
+ // post-truncate. So we need to create a "dummy" sstable containing
+ // only the replay position. This is done by CompactionManager.submitTruncate.
logger.debug("truncating {}", columnFamily);
if (DatabaseDescriptor.isAutoSnapshot())
{
// flush the CF being truncated before forcing the new segment
forceBlockingFlush();
+
+ // sleep a little to make sure that our truncatedAt comes after any sstable
+ // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
+ try
+ {
+ long starttime = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - starttime) < 1)
+ {
+ Thread.sleep(1);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
else
{
@@ -1804,33 +1781,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- KSMetaData ksm = Schema.instance.getKSMetaData(this.table.name);
- if (ksm.durableWrites)
- {
- CommitLog.instance.forceNewSegment();
- Future<ReplayPosition> position = CommitLog.instance.getContext();
- // now flush everyone else. re-flushing ourselves is not necessary, but harmless
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- cfs.forceFlush();
- waitForActiveFlushes();
- // if everything was clean, flush won't have called discard
- CommitLog.instance.discardCompletedSegments(metadata.cfId, position.get());
- }
-
- // sleep a little to make sure that our truncatedAt comes after any sstable
- // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
- try
- {
- long starttime = System.currentTimeMillis();
- while ((System.currentTimeMillis() - starttime) < 1)
- {
- Thread.sleep(1);
- }
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
long truncatedAt = System.currentTimeMillis();
if (DatabaseDescriptor.isAutoSnapshot())
snapshot(Table.getTimestampedSnapshotName(columnFamily));
@@ -2093,8 +2043,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*
* @param truncatedAt The timestamp of the truncation
* (all SSTables before that timestamp are going be marked as compacted)
+ *
+ * @return the most recent replay position of the truncated data
*/
- public void discardSSTables(long truncatedAt)
+ public ReplayPosition discardSSTables(long truncatedAt)
{
List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
@@ -2104,7 +2056,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
truncatedSSTables.add(sstable);
}
- if (!truncatedSSTables.isEmpty())
- markCompacted(truncatedSSTables, OperationType.UNKNOWN);
+ if (truncatedSSTables.isEmpty())
+ return ReplayPosition.NONE;
+
+ markCompacted(truncatedSSTables, OperationType.UNKNOWN);
+ return ReplayPosition.getReplayPosition(truncatedSSTables);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index a14ba58..34f7096 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -17,6 +17,9 @@
*/
package org.apache.cassandra.db;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
@@ -29,8 +32,11 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.avro.ipc.ByteBufferInputStream;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -43,12 +49,15 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.Constants;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
@@ -178,6 +187,61 @@ public class SystemTable
}
}
+ public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position)
+ {
+ String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
+ processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, position), LOCAL_KEY));
+ forceBlockingFlush(LOCAL_CF);
+ }
+
+ private static String positionAsMapEntry(ColumnFamilyStore cfs, ReplayPosition position)
+ {
+ DataOutputBuffer out = new DataOutputBuffer();
+ try
+ {
+ ReplayPosition.serializer.serialize(position, out);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return String.format("{'%s': '%s'}",
+ cfs.metadata.cfId,
+ ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
+ }
+
+ public static Map<UUID, ReplayPosition> getTruncationPositions()
+ {
+ String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
+ UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ if (rows.isEmpty())
+ return Collections.emptyMap();
+
+ UntypedResultSet.Row row = rows.one();
+ Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
+ if (rawMap == null)
+ return Collections.emptyMap();
+
+ Map<UUID, ReplayPosition> positions = new HashMap<UUID, ReplayPosition>();
+ for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
+ {
+ positions.put(entry.getKey(), positionFromBlob(entry.getValue()));
+ }
+ return positions;
+ }
+
+ private static ReplayPosition positionFromBlob(ByteBuffer bytes)
+ {
+ try
+ {
+ return ReplayPosition.serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Record tokens being used by another node
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 9bc0179..9f949d0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -35,11 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FileUtils;
@@ -72,18 +68,27 @@ public class CommitLogReplayer
this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
+ this.checksum = new PureJavaCrc32();
+
// compute per-CF and global replay positions
- this.cfPositions = new HashMap<UUID, ReplayPosition>();
+ cfPositions = new HashMap<UUID, ReplayPosition>();
+ Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
+ Map<UUID, ReplayPosition> truncationPositions = SystemTable.getTruncationPositions();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
// it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
// below: gRP will return NONE if there are no flushed sstables, which is important to have in the
// list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
+
+ // but, if we've truncted the cf in question, then we need to need to start replay after the truncation
+ ReplayPosition truncatedAt = truncationPositions.get(cfs.metadata.cfId);
+ if (truncatedAt != null)
+ rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
+
cfPositions.put(cfs.metadata.cfId, rp);
}
- this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
- this.checksum = new PureJavaCrc32();
+ globalPosition = replayPositionOrdering.min(cfPositions.values());
}
public void recover(File[] clogs) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38bfc6dc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2d06036..4399948 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexBuilder;
@@ -861,11 +862,13 @@ public class CompactionManager implements CompactionManagerMBean
try
{
- main.discardSSTables(truncatedAt);
+ ReplayPosition replayAfter = main.discardSSTables(truncatedAt);
for (SecondaryIndex index : main.indexManager.getIndexes())
index.truncate(truncatedAt);
+ SystemTable.saveTruncationPosition(main, replayAfter);
+
for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
{
if (key.cfId == main.metadata.cfId)