You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/12 19:47:07 UTC

[2/8] git commit: Merge branch 'cassandra-2.0' into trunk

Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d53c838c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d53c838c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d53c838c

Branch: refs/heads/trunk
Commit: d53c838c9d2f89ac6c88c8306f2302f7fbc6b33d
Parents: 448e4d4 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:17:45 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:18:14 2013 +0600

----------------------------------------------------------------------
 .../db/AbstractThreadUnsafeSortedColumns.java   |  6 +-
 .../cassandra/db/AtomicSortedColumns.java       |  4 +-
 .../org/apache/cassandra/db/ColumnFamily.java   | 11 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 26 ++++++-
 .../org/apache/cassandra/db/ColumnIndex.java    |  2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   | 76 ++++++++++++++-----
 .../org/apache/cassandra/db/DeletionTime.java   | 16 ++++
 .../apache/cassandra/db/RangeTombstoneList.java |  2 +-
 .../db/compaction/CompactionController.java     |  5 +-
 .../db/compaction/LazilyCompactedRow.java       | 77 +++++++++++---------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  2 +-
 .../db/compaction/CompactionsPurgeTest.java     | 77 ++++++++++++++++++--
 .../streaming/StreamingTransferTest.java        |  4 +-
 14 files changed, 236 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 396bbd3,d585407..4e54af0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -885,6 -898,17 +897,16 @@@ public class ColumnFamilyStore implemen
              return null;
          }
  
 -        removeDeletedColumnsOnly(cf, gcBefore, indexer);
 -        return removeDeletedCF(cf, gcBefore);
++        return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore);
+     }
+ 
+     /**
+      * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+      * columns that have been dropped from the schema (for CQL3 tables only).
+      * @return the updated ColumnFamily
+      */
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
++    public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+     {
          Iterator<Column> iter = cf.iterator();
          DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
          boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
@@@ -899,10 -924,15 +921,10 @@@
              {
                  iter.remove();
                  indexer.remove(c);
 -                removedBytes += c.dataSize();
              }
          }
 -        return removedBytes;
 -    }
  
-         return removeDeletedCF(cf, gcBefore);
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
 -    {
 -        return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
++        return cf;
      }
  
      // returns true if

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index dc7730c,7edc60e..c4ce2e8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -148,24 -153,25 +148,25 @@@ public class CompactionControlle
      }
  
      /**
 -     * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
 -     * older than @param maxDeletionTimestamp are included in the compaction set
 +     * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
-      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
-      * in other sstables.
++     * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
++     * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
++     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
       */
 -    public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
 +    public long maxPurgeableTimestamp(DecoratedKey key)
      {
          List<SSTableReader> filteredSSTables = overlappingTree.search(key);
 +        long min = Long.MAX_VALUE;
          for (SSTableReader sstable : filteredSSTables)
          {
 -            if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
 -            {
 -                // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
 -                // we check index file instead.
 -                if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 -                    return false;
 -                else if (sstable.getBloomFilter().isPresent(key.key))
 -                    return false;
 -            }
 +            // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
 +            // we check index file instead.
 +            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 +                min = Math.min(min, sstable.getMinTimestamp());
 +            else if (sstable.getBloomFilter().isPresent(key.key))
 +                min = Math.min(min, sstable.getMinTimestamp());
          }
 -        return true;
 +        return min;
      }
  
      public void invalidateCachedRow(DecoratedKey key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 2cb014a,3b7a3d4..8237ff5
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@@ -56,8 -58,8 +56,9 @@@ public class LazilyCompactedRow extend
      private boolean closed;
      private ColumnIndex.Builder indexBuilder;
      private final SecondaryIndexManager.Updater indexer;
 -    private long maxTombstoneTimestamp;
 +    private final Reducer reducer;
 +    private final Iterator<OnDiskAtom> merger;
+     private DeletionInfo deletionInfo;
  
      public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
      {
@@@ -66,39 -68,27 +67,38 @@@
          this.controller = controller;
          indexer = controller.cfs.indexManager.updaterFor(key);
  
-         ColumnFamily rawCf = null;
+         // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
+         // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+         deletionInfo = DeletionInfo.live();
 -        maxTombstoneTimestamp = Long.MIN_VALUE;
          for (OnDiskAtomIterator row : rows)
--        {
-             ColumnFamily cf = row.getColumnFamily();
 -            DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
 -            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
 -            deletionInfo = deletionInfo.add(delInfo);
 -        }
++            deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo());
  
-             if (rawCf == null)
-                 rawCf = cf;
-             else
-                 rawCf.delete(cf);
-         }
 -        // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
 -        // tombstones newer than the row-level tombstones we've seen -- but we won't know that
 -        // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
 -        // no other versions of this row present.
 -        this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
++        // tombstones with a localDeletionTime before this can be purged.  This is the minimum timestamp for any sstable
++        // containing `key` outside of the set of sstables involved in this compaction.
 +        maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
-         // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
-         // to get rid of redundant row-level and range tombstones
-         assert rawCf != null;
-         int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
-         ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
-         emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+ 
+         emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+         emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
 -        if (shouldPurge)
++        if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp)
+             emptyColumnFamily.purgeTombstones(controller.gcBefore);
 +
 +        reducer = new Reducer();
 +        merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
 +    }
 +
-     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
++    private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
 +    {
-         // We should only gc tombstone if shouldPurge == true. But otherwise,
-         // it is still ok to collect column that shadowed by their (deleted)
-         // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
-         ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
-                                                                  shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
-                                                                  controller.cfs.indexManager.updaterFor(key));
-         if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-             CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-         return compacted;
++        // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
++        // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
++        // without purging tombstones.
++        int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
++        ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
++
++        // if we have counters, remove old shards
++        if (cf.metadata().getDefaultValidator().isCommutative())
++            CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
++
++        return cf;
      }
  
      public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@@ -109,31 -99,31 +109,29 @@@
          try
          {
              indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
 -            columnsIndex = indexBuilder.buildForCompaction(iterator());
 +            columnsIndex = indexBuilder.buildForCompaction(merger);
-             if (columnsIndex.columnsIndex.isEmpty())
-             {
-                 boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
-                                      ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                                      : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
-                 if (cfIrrelevant)
-                     return null;
-             }
+ 
+             // if there aren't any columns or tombstones, return null
+             if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+                 return null;
          }
          catch (IOException e)
          {
              throw new RuntimeException(e);
          }
          // reach into the reducer (created during iteration) to get column count, size, max column timestamp
 -        // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
 -        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
 -                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
 -                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
 -                                      reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
 -                                      reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
 -                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
 -                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
 +        columnStats = new ColumnStats(reducer.columns,
 +                                      reducer.minTimestampSeen,
 +                                      Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
 +                                      reducer.maxLocalDeletionTimeSeen,
 +                                      reducer.tombstones,
 +                                      reducer.minColumnNameSeen,
 +                                      reducer.maxColumnNameSeen
          );
 -        reducer = null;
  
+         // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
          indexBuilder.maybeWriteEmptyRowHeader();
+ 
          out.writeShort(SSTableWriter.END_OF_ROW);
  
          close();
@@@ -245,8 -257,9 +252,10 @@@
              }
              else
              {
 +                boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+                 // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+                 container.setDeletionInfo(deletionInfo);
 -                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
 +                ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
                  if (purged == null || !purged.iterator().hasNext())
                  {
                      container.clear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 18e637b,8461023..bd814d0
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@@ -38,8 -38,10 +38,11 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.Util;
  
  import static org.junit.Assert.assertEquals;
 +import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
  import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
 -import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++
+ import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
  
  import org.apache.cassandra.utils.ByteBufferUtil;
  
@@@ -286,8 -283,10 +289,9 @@@ public class CompactionsPurgeTest exten
          String cfName = "Standard1";
          Keyspace keyspace = Keyspace.open(keyspaceName);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 -
          DecoratedKey key = Util.dk("key3");
          RowMutation rm;
+         QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
  
          // inserts
          rm = new RowMutation(keyspaceName, key.key);
@@@ -316,4 -322,60 +323,60 @@@
          for (Column c : cf)
              assert !c.isMarkedForDelete(System.currentTimeMillis());
      }
- }
+ 
+     @Test
+     public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+     {
+         String keyspace = "cql_keyspace";
+         String table = "table1";
+         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+         cfs.disableAutoCompaction();
+ 
+         // write a row out to one sstable
+         processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                       keyspace, table, 1, "foo", 1));
+         cfs.forceBlockingFlush();
+ 
+         UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(1, result.size());
+ 
+         // write a row tombstone out to a second sstable
+         processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         cfs.forceBlockingFlush();
+ 
+         // basic check that the row is considered deleted
+         assertEquals(2, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(0, result.size());
+ 
+         // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+         Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+         future.get();
+ 
+         // the data should be gone, but the tombstone should still exist
+         assertEquals(1, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(0, result.size());
+ 
+         // write a row out to one sstable
+         processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                       keyspace, table, 1, "foo", 1));
+         cfs.forceBlockingFlush();
+         assertEquals(2, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(1, result.size());
+ 
+         // write a row tombstone out to a different sstable
+         processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         cfs.forceBlockingFlush();
+ 
+         // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+         future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+         future.get();
+ 
+         // both the data and the tombstone should be gone this time
+         assertEquals(0, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(0, result.size());
+     }
 -}
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------