You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/14 20:24:54 UTC

svn commit: r1081525 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/columniterator/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Mon Mar 14 19:24:54 2011
New Revision: 1081525

URL: http://svn.apache.org/viewvc?rev=1081525&view=rev
Log:
fix tombstone handling in repair andsstable2json
patch by slebresne; reviewed by jbellis for CASSANDRA-2279

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Mar 14 19:24:54 2011
@@ -1,5 +1,6 @@
 0.7.5
  * Avoid seeking when sstable2json exports the entire file (CASSANDRA-2318)
+ * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
 
 
 0.7.4

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java Mon Mar 14 19:24:54 2011
@@ -99,7 +99,6 @@ public class RowIteratorFactory
         }
 
         Iterator<IColumnIterator> collated = IteratorUtils.collatedIterator(COMPARE_BY_KEY, iterators);
-        final Memtable firstMemtable = memtables.iterator().next();
 
         // reduce rows from all sources into a single row
         ReducingIterator<IColumnIterator, Row> reduced = new ReducingIterator<IColumnIterator, Row>(collated)
@@ -107,11 +106,26 @@ public class RowIteratorFactory
             private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
             private final List<IColumnIterator> colIters = new ArrayList<IColumnIterator>();
             private DecoratedKey key;
+            private ColumnFamily returnCF;
+
+            @Override
+            protected void onKeyChange()
+            {
+                this.returnCF = ColumnFamily.create(cfs.metadata);
+            }
 
             public void reduce(IColumnIterator current)
             {
                 this.colIters.add(current);
                 this.key = current.getKey();
+                try
+                {
+                    this.returnCF.delete(current.getColumnFamily());
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
             }
 
             @Override
@@ -125,7 +139,6 @@ public class RowIteratorFactory
                 Comparator<IColumn> colComparator = filter.filter.getColumnComparator(comparator);
                 Iterator<IColumn> colCollated = IteratorUtils.collatedIterator(colComparator, colIters);
 
-                ColumnFamily returnCF;
                 // First check if this row is in the rowCache. If it is we can skip the rest
                 ColumnFamily cached = cfs.getRawCachedRow(key);
                 if (cached != null)
@@ -135,33 +148,8 @@ public class RowIteratorFactory
                 }
                 else if (colCollated.hasNext())
                 {
-                    returnCF = firstMemtable.getColumnFamily(key);
-                    // TODO this is a little subtle: the Memtable ColumnIterator has to be a shallow clone of the source CF,
-                    // with deletion times set correctly, so we can use it as the "base" CF to add query results to.
-                    // (for sstable ColumnIterators we do not care if it is a shallow clone or not.)
-                    returnCF = returnCF == null ? ColumnFamily.create(firstMemtable.getTableName(), filter.getColumnFamilyName())
-                                                : returnCF.cloneMeShallow();
-                    long lastDeletedAt = Long.MIN_VALUE;
-                    for (IColumnIterator columns : colIters)
-                    {
-                        columns.hasNext(); // force cf initializtion
-                        try
-                        {
-                            if (columns.getColumnFamily().isMarkedForDelete())
-                                lastDeletedAt = Math.max(lastDeletedAt, columns.getColumnFamily().getMarkedForDeleteAt());
-                        }
-                        catch (IOException e)
-                        {
-                            throw new IOError(e);
-                        }
-                    }
-                    returnCF.markedForDeleteAt.set(lastDeletedAt);
                     filter.collectCollatedColumns(returnCF, colCollated, gcBefore);
                 }
-                else
-                {
-                    returnCF = null;
-                }
 
                 Row rv = new Row(key, returnCF);
                 colIters.clear();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Mon Mar 14 19:24:54 2011
@@ -31,9 +31,11 @@ import org.apache.cassandra.db.IColumn;
 public interface IColumnIterator extends Iterator<IColumn>
 {
     /**
-     * returns the CF of the column being iterated.  Do not modify the returned CF; clone first.
-     * The CF is only guaranteed to be available after a call to next() or hasNext().
-     * Guaranteed to be non-null.
+     * returns the CF of the column being iterated.
+     * Do not modify the returned CF; clone first.
+     * This is guaranteed to be non-null and that the returned CF have the correct metadata
+     * (markedForDeleteAt and localDeletionTime). The full CF is however only guaranteed to 
+     * be available after a call to next() or hasNext().
      * @throws IOException 
      */
     public abstract ColumnFamily getColumnFamily() throws IOException;

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java Mon Mar 14 19:24:54 2011
@@ -55,6 +55,7 @@ public abstract class ReducingIterator<T
         if (last == null && !source.hasNext())
             return endOfData();
 
+        onKeyChange();
         boolean keyChanged = false;
         while (!keyChanged)
         {
@@ -73,6 +74,12 @@ public abstract class ReducingIterator<T
         return getReduced();
     }
 
+    /**
+     * Called at the begining of each new key, before any reduce is called.
+     * To be overriden by implementing classes.
+     */
+    protected void onKeyChange() {}
+
     public Iterator<T2> iterator()
     {
         return this;

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java Mon Mar 14 19:24:54 2011
@@ -62,4 +62,51 @@ public class RowIterationTest extends Cl
         store.forceBlockingFlush();
         assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
     }
+
+    @Test
+    public void testRowIterationDeletionTime() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open(TABLE1);
+        String CF_NAME = "Standard3";
+        ColumnFamilyStore store = table.getColumnFamilyStore(CF_NAME);
+        DecoratedKey key = Util.dk("key");
+
+        // Delete row in first sstable
+        RowMutation rm = new RowMutation(TABLE1, key.key);
+        rm.delete(new QueryPath(CF_NAME, null, null), 0);
+        rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 0L);
+        int tstamp1 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime();
+        rm.apply();
+        store.forceBlockingFlush();
+
+        // Delete row in second sstable with higher timestamp
+        rm = new RowMutation(TABLE1, key.key);
+        rm.delete(new QueryPath(CF_NAME, null, null), 1);
+        rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 1L);
+        int tstamp2 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime();
+        rm.apply();
+        store.forceBlockingFlush();
+
+        ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf;
+        assert cf.getMarkedForDeleteAt() == 1L;
+        assert cf.getLocalDeletionTime() == tstamp2;
+    }
+
+    @Test
+    public void testRowIterationDeletion() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open(TABLE1);
+        String CF_NAME = "Standard3";
+        ColumnFamilyStore store = table.getColumnFamilyStore(CF_NAME);
+        DecoratedKey key = Util.dk("key");
+
+        // Delete a row in first sstable
+        RowMutation rm = new RowMutation(TABLE1, key.key);
+        rm.delete(new QueryPath(CF_NAME, null, null), 0);
+        rm.apply();
+        store.forceBlockingFlush();
+
+        ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf;
+        assert cf != null;
+    }
 }