You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/30 16:18:34 UTC

[06/10] git commit: Fixes for compacting larger-than-memory rows patch by jbellis; reviewed by marcuse for CASSANDRA-6274

Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db

Branch: refs/heads/cassandra-2.0
Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f
Parents: 7187a8a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:14:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:14:15 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  22 ++++----
 .../db/compaction/AbstractCompactedRow.java     |   4 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/LazilyCompactedRow.java       |  34 ++++++++----
 .../cassandra/db/compaction/Scrubber.java       |  52 ++++++++++++-------
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 3599 bytes
 8 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73a2b5..31b944c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
  * Compact hottest sstables first and optionally omit coldest from
    compaction entirely (CASSANDRA-6109)
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..75a06d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,20 +140,22 @@ public class ColumnIndex
             }
             ColumnIndex index = build();
 
-            finish();
+            maybeWriteEmptyRowHeader();
 
             return index;
         }
 
-        public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+        /**
+         * The important distinction wrt build() is that we may be building for a row that ends up
+         * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+         * columns shadowed by expired tombstone).  Thus, it is the caller's responsibility
+         * to decide whether to write the header for an empty row.
+         */
+        public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
         {
-            for (OnDiskAtom c : columns)
-                add(c);
-            ColumnIndex index = build();
-
-            finish();
-
-            return index;
+            while (columns.hasNext())
+                add(columns.next());
+            return build();
         }
 
         public void add(OnDiskAtom column) throws IOException
@@ -219,7 +221,7 @@ public class ColumnIndex
             return result;
         }
 
-        public void finish() throws IOException
+        public void maybeWriteEmptyRowHeader() throws IOException
         {
             if (!deletionInfo.isLive())
                 maybeWriteRowHeader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
 
     /**
      * write the row (size + column index + filter + column data, but NOT row key) to @param out.
-     * It is an error to call this if isEmpty is false.  (Because the key is appended first,
-     * so we'd have an incomplete row written.)
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
+     *
+     * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
      */
     public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..65515d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -154,7 +154,7 @@ public class CompactionController
 
     /**
      * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
-     * are included in the compaction set
+     * older than @param maxDeletionTimestamp are included in the compaction set
      */
     public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..0cdbbb7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
-    private ColumnFamily emptyColumnFamily;
+    private final ColumnFamily emptyColumnFamily;
     private Reducer reducer;
     private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxDelTimestamp;
+    private long maxTombstoneTimestamp;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        maxDelTimestamp = Long.MIN_VALUE;
+        ColumnFamily rawCf = null;
+        maxTombstoneTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
-            maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
+            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
 
-            if (emptyColumnFamily == null)
-                emptyColumnFamily = cf;
+            if (rawCf == null)
+                rawCf = cf;
             else
-                emptyColumnFamily.delete(cf);
+                rawCf.delete(cf);
         }
-        this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+
+        // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
+        // tombstones newer than the row-level tombstones we've seen -- but we won't know that
+        // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
+        // no other versions of this row present.
+        this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+
+        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+        // to get rid of redundant row-level and range tombstones
+        assert rawCf != null;
+        int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-            columnsIndex = indexBuilder.build(this);
+            columnsIndex = indexBuilder.buildForCompaction(iterator());
             if (columnsIndex.columnsIndex.isEmpty())
             {
                 boolean cfIrrelevant = shouldPurge
@@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         );
         reducer = null;
 
+        indexBuilder.maybeWriteEmptyRowHeader();
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index d8a2745..708e929 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
 
     private final OutputHandler outputHandler;
 
-    private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+    private static final Comparator<Row> rowComparator = new Comparator<Row>()
     {
-         public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+         public int compare(Row r1, Row r2)
          {
              return r1.key.compareTo(r2.key);
          }
     };
-    private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
 
     public void scrub()
     {
-        outputHandler.output("Scrubbing " + sstable);
+        outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
             // TODO errors when creating the writer may leave empty temp files.
             writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
-            AbstractCompactedRow prevRow = null;
+            DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
             {
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
 
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                    if (prevKey != null && prevKey.compareTo(key) > 0)
                     {
-                        outOfOrderRows.add(compactedRow);
-                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        saveOutOfOrderRow(prevKey, key, atoms);
                         continue;
                     }
 
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                     if (writer.append(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
-                    prevRow = compactedRow;
+                    prevKey = key;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                            if (prevKey != null && prevKey.compareTo(key) > 0)
                             {
-                                outOfOrderRows.add(compactedRow);
-                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                saveOutOfOrderRow(prevKey, key, atoms);
                                 continue;
                             }
+
+                            AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                             if (writer.append(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
-                            prevRow = compactedRow;
+                            prevKey = key;
                         }
                         catch (Throwable th2)
                         {
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
         if (!outOfOrderRows.isEmpty())
         {
             SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
-            for (AbstractCompactedRow row : outOfOrderRows)
-                inOrderWriter.append(row);
+            for (Row row : outOfOrderRows)
+                inOrderWriter.append(row.key, row.cf);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
             outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
         }
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
         }
     }
 
+    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+    {
+        // TODO bitch if the row is too large?  if it is there's not much we can do ...
+        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+        // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+        // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+        ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+        while (atoms.hasNext())
+        {
+            OnDiskAtom atom = atoms.next();
+            cf.addAtom(atom);
+        }
+        outOfOrderRows.add(new Row(key, cf));
+    }
+
     public SSTableReader getNewSSTable()
     {
         return newSstable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
                 columnIndexer.add(atom); // This write the atom on disk too
             }
 
-            columnIndexer.finish();
+            columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ