You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/14 20:24:54 UTC
svn commit: r1081525 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/columniterator/
src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Mon Mar 14 19:24:54 2011
New Revision: 1081525
URL: http://svn.apache.org/viewvc?rev=1081525&view=rev
Log:
fix tombstone handling in repair andsstable2json
patch by slebresne; reviewed by jbellis for CASSANDRA-2279
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Mar 14 19:24:54 2011
@@ -1,5 +1,6 @@
0.7.5
* Avoid seeking when sstable2json exports the entire file (CASSANDRA-2318)
+ * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
0.7.4
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java Mon Mar 14 19:24:54 2011
@@ -99,7 +99,6 @@ public class RowIteratorFactory
}
Iterator<IColumnIterator> collated = IteratorUtils.collatedIterator(COMPARE_BY_KEY, iterators);
- final Memtable firstMemtable = memtables.iterator().next();
// reduce rows from all sources into a single row
ReducingIterator<IColumnIterator, Row> reduced = new ReducingIterator<IColumnIterator, Row>(collated)
@@ -107,11 +106,26 @@ public class RowIteratorFactory
private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
private final List<IColumnIterator> colIters = new ArrayList<IColumnIterator>();
private DecoratedKey key;
+ private ColumnFamily returnCF;
+
+ @Override
+ protected void onKeyChange()
+ {
+ this.returnCF = ColumnFamily.create(cfs.metadata);
+ }
public void reduce(IColumnIterator current)
{
this.colIters.add(current);
this.key = current.getKey();
+ try
+ {
+ this.returnCF.delete(current.getColumnFamily());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
@Override
@@ -125,7 +139,6 @@ public class RowIteratorFactory
Comparator<IColumn> colComparator = filter.filter.getColumnComparator(comparator);
Iterator<IColumn> colCollated = IteratorUtils.collatedIterator(colComparator, colIters);
- ColumnFamily returnCF;
// First check if this row is in the rowCache. If it is we can skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
if (cached != null)
@@ -135,33 +148,8 @@ public class RowIteratorFactory
}
else if (colCollated.hasNext())
{
- returnCF = firstMemtable.getColumnFamily(key);
- // TODO this is a little subtle: the Memtable ColumnIterator has to be a shallow clone of the source CF,
- // with deletion times set correctly, so we can use it as the "base" CF to add query results to.
- // (for sstable ColumnIterators we do not care if it is a shallow clone or not.)
- returnCF = returnCF == null ? ColumnFamily.create(firstMemtable.getTableName(), filter.getColumnFamilyName())
- : returnCF.cloneMeShallow();
- long lastDeletedAt = Long.MIN_VALUE;
- for (IColumnIterator columns : colIters)
- {
- columns.hasNext(); // force cf initializtion
- try
- {
- if (columns.getColumnFamily().isMarkedForDelete())
- lastDeletedAt = Math.max(lastDeletedAt, columns.getColumnFamily().getMarkedForDeleteAt());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
- returnCF.markedForDeleteAt.set(lastDeletedAt);
filter.collectCollatedColumns(returnCF, colCollated, gcBefore);
}
- else
- {
- returnCF = null;
- }
Row rv = new Row(key, returnCF);
colIters.clear();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Mon Mar 14 19:24:54 2011
@@ -31,9 +31,11 @@ import org.apache.cassandra.db.IColumn;
public interface IColumnIterator extends Iterator<IColumn>
{
/**
- * returns the CF of the column being iterated. Do not modify the returned CF; clone first.
- * The CF is only guaranteed to be available after a call to next() or hasNext().
- * Guaranteed to be non-null.
+ * returns the CF of the column being iterated.
+ * Do not modify the returned CF; clone first.
+ * This is guaranteed to be non-null and that the returned CF have the correct metadata
+ * (markedForDeleteAt and localDeletionTime). The full CF is however only guaranteed to
+ * be available after a call to next() or hasNext().
* @throws IOException
*/
public abstract ColumnFamily getColumnFamily() throws IOException;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ReducingIterator.java Mon Mar 14 19:24:54 2011
@@ -55,6 +55,7 @@ public abstract class ReducingIterator<T
if (last == null && !source.hasNext())
return endOfData();
+ onKeyChange();
boolean keyChanged = false;
while (!keyChanged)
{
@@ -73,6 +74,12 @@ public abstract class ReducingIterator<T
return getReduced();
}
+ /**
+ * Called at the begining of each new key, before any reduce is called.
+ * To be overriden by implementing classes.
+ */
+ protected void onKeyChange() {}
+
public Iterator<T2> iterator()
{
return this;
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java?rev=1081525&r1=1081524&r2=1081525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowIterationTest.java Mon Mar 14 19:24:54 2011
@@ -62,4 +62,51 @@ public class RowIterationTest extends Cl
store.forceBlockingFlush();
assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
}
+
+ @Test
+ public void testRowIterationDeletionTime() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open(TABLE1);
+ String CF_NAME = "Standard3";
+ ColumnFamilyStore store = table.getColumnFamilyStore(CF_NAME);
+ DecoratedKey key = Util.dk("key");
+
+ // Delete row in first sstable
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ rm.delete(new QueryPath(CF_NAME, null, null), 0);
+ rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 0L);
+ int tstamp1 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime();
+ rm.apply();
+ store.forceBlockingFlush();
+
+ // Delete row in second sstable with higher timestamp
+ rm = new RowMutation(TABLE1, key.key);
+ rm.delete(new QueryPath(CF_NAME, null, null), 1);
+ rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 1L);
+ int tstamp2 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime();
+ rm.apply();
+ store.forceBlockingFlush();
+
+ ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf;
+ assert cf.getMarkedForDeleteAt() == 1L;
+ assert cf.getLocalDeletionTime() == tstamp2;
+ }
+
+ @Test
+ public void testRowIterationDeletion() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open(TABLE1);
+ String CF_NAME = "Standard3";
+ ColumnFamilyStore store = table.getColumnFamilyStore(CF_NAME);
+ DecoratedKey key = Util.dk("key");
+
+ // Delete a row in first sstable
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ rm.delete(new QueryPath(CF_NAME, null, null), 0);
+ rm.apply();
+ store.forceBlockingFlush();
+
+ ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf;
+ assert cf != null;
+ }
}