You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/04 16:36:12 UTC
svn commit: r1142690 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/
test/unit/org/apache/cassandra/db/
test/unit/org/apache/cassandra/db/compaction/
test/unit/org/apache/cassandra/service/
Author: slebresne
Date: Mon Jul 4 14:36:11 2011
New Revision: 1142690
URL: http://svn.apache.org/viewvc?rev=1142690&view=rev
Log:
Reset CF and SC deletion time after gc_grace
patch by slebresne; reviewed by jbellis for CASSANDRA-2317
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jul 4 14:36:11 2011
@@ -10,6 +10,7 @@
* clean up tmp files after failed compaction (CASSANDRA-2468)
* restrict repair streaming to specific columnfamilies (CASSANDRA-2280)
* don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
+ * reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
0.8.2
Added: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1142690&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Mon Jul 4 14:36:11 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractColumnContainer implements IColumnContainer, IIterableColumns
+{
+ private static Logger logger = LoggerFactory.getLogger(AbstractColumnContainer.class);
+
+ protected final ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
+ protected final AtomicReference<DeletionInfo> deletionInfo = new AtomicReference<DeletionInfo>(new DeletionInfo());
+
+ protected AbstractColumnContainer(ConcurrentSkipListMap<ByteBuffer, IColumn> columns)
+ {
+ this.columns = columns;
+ }
+
+ @Deprecated // TODO this is a hack to set initial value outside constructor
+ public void delete(int localtime, long timestamp)
+ {
+ deletionInfo.set(new DeletionInfo(timestamp, localtime));
+ }
+
+ public void delete(AbstractColumnContainer cc2)
+ {
+ // Keeping deletion info for max markedForDeleteAt value
+ DeletionInfo current;
+ DeletionInfo cc2Info = cc2.deletionInfo.get();
+ while (true)
+ {
+ current = deletionInfo.get();
+ if (current.markedForDeleteAt >= cc2Info.markedForDeleteAt || deletionInfo.compareAndSet(current, cc2Info))
+ break;
+ }
+ }
+
+ public boolean isMarkedForDelete()
+ {
+ return getMarkedForDeleteAt() > Long.MIN_VALUE;
+ }
+
+ public long getMarkedForDeleteAt()
+ {
+ return deletionInfo.get().markedForDeleteAt;
+ }
+
+ public int getLocalDeletionTime()
+ {
+ return deletionInfo.get().localDeletionTime;
+ }
+
+ public AbstractType getComparator()
+ {
+ return (AbstractType)columns.comparator();
+ }
+
+ public void maybeResetDeletionTimes(int gcBefore)
+ {
+ while (true)
+ {
+ DeletionInfo current = deletionInfo.get();
+ // Stop if either we don't need to change the deletion info (it's
+ // still MIN_VALUE or not expired yet) or we've succesfully changed it
+ if (current.localDeletionTime == Integer.MIN_VALUE
+ || current.localDeletionTime > gcBefore
+ || deletionInfo.compareAndSet(current, new DeletionInfo()))
+ break;
+ }
+ }
+
+ /**
+ * We need to go through each column in the column container and resolve it before adding
+ */
+ public void addAll(AbstractColumnContainer cc)
+ {
+ for (IColumn column : cc.getSortedColumns())
+ addColumn(column);
+ delete(cc);
+ }
+
+ /*
+ * If we find an old column that has the same name
+ * the ask it to resolve itself else add the new column
+ */
+ public void addColumn(IColumn column)
+ {
+ ByteBuffer name = column.name();
+ IColumn oldColumn;
+ while ((oldColumn = columns.putIfAbsent(name, column)) != null)
+ {
+ if (oldColumn instanceof SuperColumn)
+ {
+ assert column instanceof SuperColumn;
+ ((SuperColumn) oldColumn).putColumn((SuperColumn)column);
+ break; // Delegated to SuperColumn
+ }
+ else
+ {
+ // calculate reconciled col from old (existing) col and new col
+ IColumn reconciledColumn = column.reconcile(oldColumn);
+ if (columns.replace(name, oldColumn, reconciledColumn))
+ break;
+
+ // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+ // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
+ }
+ }
+ }
+
+ abstract protected void putColumn(SuperColumn sc);
+
+ public IColumn getColumn(ByteBuffer name)
+ {
+ return columns.get(name);
+ }
+
+ public SortedSet<ByteBuffer> getColumnNames()
+ {
+ return columns.keySet();
+ }
+
+ public Collection<IColumn> getSortedColumns()
+ {
+ return columns.values();
+ }
+
+ public Collection<IColumn> getReverseSortedColumns()
+ {
+ return columns.descendingMap().values();
+ }
+
+ public Map<ByteBuffer, IColumn> getColumnsMap()
+ {
+ return columns;
+ }
+
+ public void remove(ByteBuffer columnName)
+ {
+ columns.remove(columnName);
+ }
+
+ public int getColumnCount()
+ {
+ return columns.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return columns.isEmpty();
+ }
+
+ public int getEstimatedColumnCount()
+ {
+ return getColumnCount();
+ }
+
+ public Iterator<IColumn> iterator()
+ {
+ return columns.values().iterator();
+ }
+
+ private static class DeletionInfo
+ {
+ public final long markedForDeleteAt;
+ public final int localDeletionTime;
+
+ public DeletionInfo()
+ {
+ this(Long.MIN_VALUE, Integer.MIN_VALUE);
+ }
+
+ public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
+ {
+ this.markedForDeleteAt = markedForDeleteAt;
+ this.localDeletionTime = localDeletionTime;
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Mon Jul 4 14:36:11 2011
@@ -22,16 +22,9 @@ import static org.apache.cassandra.db.DB
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,10 +35,8 @@ import org.apache.cassandra.io.IColumnSe
import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.FBUtilities;
-public class ColumnFamily implements IColumnContainer, IIterableColumns
+public class ColumnFamily extends AbstractColumnContainer
{
- private static Logger logger = LoggerFactory.getLogger(ColumnFamily.class);
-
/* The column serializer for this Column Family. Create based on config. */
private static ColumnFamilySerializer serializer = new ColumnFamilySerializer();
private final CFMetaData cfm;
@@ -71,23 +62,25 @@ public class ColumnFamily implements ICo
}
private transient IColumnSerializer columnSerializer;
- final AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
- final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
- private ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
public ColumnFamily(CFMetaData cfm)
{
+ this(cfm, new ConcurrentSkipListMap<ByteBuffer, IColumn>(cfm.comparator));
+ }
+
+ private ColumnFamily(CFMetaData cfm, ConcurrentSkipListMap<ByteBuffer, IColumn> map)
+ {
+ super(map);
assert cfm != null;
this.cfm = cfm;
columnSerializer = cfm.cfType == ColumnFamilyType.Standard ? Column.serializer() : SuperColumn.serializer(cfm.subcolumnComparator);
- columns = new ConcurrentSkipListMap<ByteBuffer, IColumn>(cfm.comparator);
}
public ColumnFamily cloneMeShallow()
{
ColumnFamily cf = new ColumnFamily(cfm);
- cf.markedForDeleteAt.set(markedForDeleteAt.get());
- cf.localDeletionTime.set(localDeletionTime.get());
+ // since deletion info is immutable, aliasing it is fine
+ cf.deletionInfo.set(deletionInfo.get());
return cf;
}
@@ -103,8 +96,9 @@ public class ColumnFamily implements ICo
public ColumnFamily cloneMe()
{
- ColumnFamily cf = cloneMeShallow();
- cf.columns = columns.clone();
+ ColumnFamily cf = new ColumnFamily(cfm, columns.clone());
+ // since deletion info is immutable, aliasing it is fine
+ cf.deletionInfo.set(deletionInfo.get());
return cf;
}
@@ -121,32 +115,11 @@ public class ColumnFamily implements ICo
return cfm;
}
- /*
- * We need to go through each column
- * in the column family and resolve it before adding
- */
- public void addAll(ColumnFamily cf)
- {
- for (IColumn column : cf.getSortedColumns())
- addColumn(column);
- delete(cf);
- }
-
public IColumnSerializer getColumnSerializer()
{
return columnSerializer;
}
- public int getColumnCount()
- {
- return columns.size();
- }
-
- public boolean isEmpty()
- {
- return columns.isEmpty();
- }
-
public boolean isSuper()
{
return getType() == ColumnFamilyType.Super;
@@ -214,82 +187,6 @@ public class ColumnFamily implements ICo
}
/*
- * If we find an old column that has the same name
- * the ask it to resolve itself else add the new column .
- */
- public void addColumn(IColumn column)
- {
- ByteBuffer name = column.name();
- IColumn oldColumn;
- while ((oldColumn = columns.putIfAbsent(name, column)) != null)
- {
- if (oldColumn instanceof SuperColumn)
- {
- ((SuperColumn) oldColumn).putColumn(column);
- break; // Delegated to SuperColumn
- }
- else
- {
- // calculate reconciled col from old (existing) col and new col
- IColumn reconciledColumn = column.reconcile(oldColumn);
- if (columns.replace(name, oldColumn, reconciledColumn))
- break;
-
- // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
- // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
- }
- }
- }
-
- public IColumn getColumn(ByteBuffer name)
- {
- return columns.get(name);
- }
-
- public SortedSet<ByteBuffer> getColumnNames()
- {
- return columns.keySet();
- }
-
- public Collection<IColumn> getSortedColumns()
- {
- return columns.values();
- }
-
- public Collection<IColumn> getReverseSortedColumns()
- {
- return columns.descendingMap().values();
- }
-
- public Map<ByteBuffer, IColumn> getColumnsMap()
- {
- return columns;
- }
-
- public void remove(ByteBuffer columnName)
- {
- columns.remove(columnName);
- }
-
- @Deprecated // TODO this is a hack to set initial value outside constructor
- public void delete(int localtime, long timestamp)
- {
- localDeletionTime.set(localtime);
- markedForDeleteAt.set(timestamp);
- }
-
- public void delete(ColumnFamily cf2)
- {
- FBUtilities.atomicSetMax(localDeletionTime, cf2.getLocalDeletionTime()); // do this first so we won't have a column that's "deleted" but has no local deletion time
- FBUtilities.atomicSetMax(markedForDeleteAt, cf2.getMarkedForDeleteAt());
- }
-
- public boolean isMarkedForDelete()
- {
- return markedForDeleteAt.get() > Long.MIN_VALUE;
- }
-
- /*
* This function will calculate the difference between 2 column families.
* The external input is assumed to be a superset of internal.
*/
@@ -330,11 +227,6 @@ public class ColumnFamily implements ICo
return null;
}
- public AbstractType getComparator()
- {
- return (AbstractType)columns.comparator();
- }
-
int size()
{
int size = 0;
@@ -382,16 +274,6 @@ public class ColumnFamily implements ICo
column.updateDigest(digest);
}
- public long getMarkedForDeleteAt()
- {
- return markedForDeleteAt.get();
- }
-
- public int getLocalDeletionTime()
- {
- return localDeletionTime.get();
- }
-
public static AbstractType getComparatorFor(String table, String columnFamilyName, ByteBuffer superColumnName)
{
return superColumnName == null
@@ -414,16 +296,6 @@ public class ColumnFamily implements ICo
addAll(cf);
}
- public int getEstimatedColumnCount()
- {
- return getColumnCount();
- }
-
- public Iterator<IColumn> iterator()
- {
- return columns.values().iterator();
- }
-
public long serializedSize()
{
int size = boolSize_ // bool
@@ -449,4 +321,9 @@ public class ColumnFamily implements ICo
column.validateFields(metadata);
}
}
+
+ protected void putColumn(SuperColumn sc)
+ {
+ throw new UnsupportedOperationException("Unsupported operation for a column family");
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Mon Jul 4 14:36:11 2011
@@ -97,8 +97,8 @@ public class ColumnFamilySerializer impl
public void serializeCFInfo(ColumnFamily columnFamily, DataOutput dos) throws IOException
{
- dos.writeInt(columnFamily.localDeletionTime.get());
- dos.writeLong(columnFamily.markedForDeleteAt.get());
+ dos.writeInt(columnFamily.getLocalDeletionTime());
+ dos.writeLong(columnFamily.getMarkedForDeleteAt());
}
public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul 4 14:36:11 2011
@@ -775,6 +775,10 @@ public class ColumnFamilyStore implement
// (we want this to be deterministic to avoid confusion.)
if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <= gcBefore)
return null;
+
+ // If there is non deleted columns, we still need to reset the column family
+ // deletion times since gc_grace seconds had elapsed
+ cf.maybeResetDeletionTimes(gcBefore);
return cf;
}
@@ -844,6 +848,12 @@ public class ColumnFamilyStore implement
{
cf.remove(c.name());
}
+ else
+ {
+ // If there is non deleted columns, we still need to reset the column family
+ // deletion times since gc_grace seconds had elapsed
+ c.maybeResetDeletionTimes(gcBefore);
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java Mon Jul 4 14:36:11 2011
@@ -32,6 +32,7 @@ public interface IColumnContainer
public boolean isMarkedForDelete();
public long getMarkedForDeleteAt();
+ public int getLocalDeletionTime();
public AbstractType getComparator();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jul 4 14:36:11 2011
@@ -190,7 +190,7 @@ public class RowMutation implements IMut
else if (path.columnName == null)
{
SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator());
- sc.markForDeleteAt(localDeleteTime, timestamp);
+ sc.delete(localDeleteTime, timestamp);
columnFamily.addColumn(sc);
}
else
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Mon Jul 4 14:36:11 2011
@@ -28,8 +28,6 @@ import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -42,7 +40,7 @@ import org.apache.cassandra.utils.FBUtil
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-public class SuperColumn implements IColumn, IColumnContainer
+public class SuperColumn extends AbstractColumnContainer implements IColumn
{
private static NonBlockingHashMap<Comparator, SuperColumnSerializer> serializers = new NonBlockingHashMap<Comparator, SuperColumnSerializer>();
public static SuperColumnSerializer serializer(AbstractType comparator)
@@ -56,10 +54,7 @@ public class SuperColumn implements ICol
return serializer;
}
- private ByteBuffer name_;
- private ConcurrentSkipListMap<ByteBuffer, IColumn> columns_;
- private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
- private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
+ private ByteBuffer name;
public SuperColumn(ByteBuffer name, AbstractType comparator)
{
@@ -68,54 +63,41 @@ public class SuperColumn implements ICol
SuperColumn(ByteBuffer name, ConcurrentSkipListMap<ByteBuffer, IColumn> columns)
{
+ super(columns);
assert name != null;
assert name.remaining() <= IColumn.MAX_NAME_LENGTH;
- name_ = name;
- columns_ = columns;
- }
-
- public AbstractType getComparator()
- {
- return (AbstractType)columns_.comparator();
+ this.name = name;
}
public SuperColumn cloneMeShallow()
{
- SuperColumn sc = new SuperColumn(name_, getComparator());
- sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
+ SuperColumn sc = new SuperColumn(name, getComparator());
+ // since deletion info is immutable, aliasing it is fine
+ sc.deletionInfo.set(deletionInfo.get());
return sc;
}
public IColumn cloneMe()
{
- SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns_));
- sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
+ SuperColumn sc = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns));
+ // since deletion info is immutable, aliasing it is fine
+ sc.deletionInfo.set(deletionInfo.get());
return sc;
}
- public boolean isMarkedForDelete()
- {
- return markedForDeleteAt.get() > Long.MIN_VALUE;
- }
-
public ByteBuffer name()
{
- return name_;
+ return name;
}
public Collection<IColumn> getSubColumns()
{
- return columns_.values();
- }
-
- public Collection<IColumn> getSortedColumns()
- {
- return getSubColumns();
+ return getSortedColumns();
}
public IColumn getSubColumn(ByteBuffer columnName)
{
- IColumn column = columns_.get(columnName);
+ IColumn column = columns.get(columnName);
assert column == null || column instanceof Column;
return column;
}
@@ -143,12 +125,7 @@ public class SuperColumn implements ICol
* We need to keep the way we are calculating the column size in sync with the
* way we are calculating the size for the column family serializer.
*/
- return DBConstants.shortSize_ + name_.remaining() + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size();
- }
-
- public void remove(ByteBuffer columnName)
- {
- columns_.remove(columnName);
+ return DBConstants.shortSize_ + name.remaining() + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size();
}
public long timestamp()
@@ -159,7 +136,7 @@ public class SuperColumn implements ICol
public long mostRecentLiveChangeAt()
{
long max = Long.MIN_VALUE;
- for (IColumn column : columns_.values())
+ for (IColumn column : getSubColumns())
{
if (!column.isMarkedForDelete() && column.timestamp() > max)
{
@@ -174,42 +151,24 @@ public class SuperColumn implements ICol
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}
+ @Override
public void addColumn(IColumn column)
{
assert column instanceof Column : "A super column can only contain simple columns";
-
- ByteBuffer name = column.name();
- IColumn oldColumn;
- while ((oldColumn = columns_.putIfAbsent(name, column)) != null)
- {
- IColumn reconciledColumn = column.reconcile(oldColumn);
- if (columns_.replace(name, oldColumn, reconciledColumn))
- break;
-
- // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
- // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
- }
+ super.addColumn((Column)column);
}
/*
* Go through each sub column if it exists then as it to resolve itself
* if the column does not exist then create it.
*/
- public void putColumn(IColumn column)
+ protected void putColumn(SuperColumn column)
{
- assert column instanceof SuperColumn;
-
for (IColumn subColumn : column.getSubColumns())
{
addColumn(subColumn);
}
- FBUtilities.atomicSetMax(localDeletionTime, column.getLocalDeletionTime()); // do this first so we won't have a column that's "deleted" but has no local deletion time
- FBUtilities.atomicSetMax(markedForDeleteAt, column.getMarkedForDeleteAt());
- }
-
- public long getMarkedForDeleteAt()
- {
- return markedForDeleteAt.get();
+ delete(column);
}
public IColumn diff(IColumn columnNew)
@@ -217,7 +176,7 @@ public class SuperColumn implements ICol
IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator());
if (columnNew.getMarkedForDeleteAt() > getMarkedForDeleteAt())
{
- ((SuperColumn)columnDiff).markForDeleteAt(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt());
+ ((SuperColumn)columnDiff).delete(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt());
}
// (don't need to worry about columnNew containing subColumns that are shadowed by
@@ -225,7 +184,7 @@ public class SuperColumn implements ICol
// takes care of those for us.)
for (IColumn subColumn : columnNew.getSubColumns())
{
- IColumn columnInternal = columns_.get(subColumn.name());
+ IColumn columnInternal = columns.get(subColumn.name());
if(columnInternal == null )
{
columnDiff.addColumn(subColumn);
@@ -248,19 +207,19 @@ public class SuperColumn implements ICol
public void updateDigest(MessageDigest digest)
{
- assert name_ != null;
- digest.update(name_.duplicate());
+ assert name != null;
+ digest.update(name.duplicate());
DataOutputBuffer buffer = new DataOutputBuffer();
try
{
- buffer.writeLong(markedForDeleteAt.get());
+ buffer.writeLong(getMarkedForDeleteAt());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
digest.update(buffer.getData(), 0, buffer.getLength());
- for (IColumn column : columns_.values())
+ for (IColumn column : getSubColumns())
{
column.updateDigest(digest);
}
@@ -270,14 +229,14 @@ public class SuperColumn implements ICol
{
StringBuilder sb = new StringBuilder();
sb.append("SuperColumn(");
- sb.append(comparator.getString(name_));
+ sb.append(comparator.getString(name));
if (isMarkedForDelete()) {
sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-");
}
sb.append(" [");
- sb.append(getComparator().getColumnsString(columns_.values()));
+ sb.append(getComparator().getColumnsString(getSubColumns()));
sb.append("])");
return sb.toString();
@@ -285,26 +244,14 @@ public class SuperColumn implements ICol
public boolean isLive()
{
- return mostRecentLiveChangeAt() > markedForDeleteAt.get();
- }
-
- public int getLocalDeletionTime()
- {
- return localDeletionTime.get();
- }
-
- @Deprecated // TODO this is a hack to set initial value outside constructor
- public void markForDeleteAt(int localDeleteTime, long timestamp)
- {
- this.localDeletionTime.set(localDeleteTime);
- this.markedForDeleteAt.set(timestamp);
+ return mostRecentLiveChangeAt() > getMarkedForDeleteAt();
}
public IColumn shallowCopy()
{
- SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator());
- sc.localDeletionTime = localDeletionTime;
- sc.markedForDeleteAt = markedForDeleteAt;
+ SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name()), this.getComparator());
+ // since deletion info is immutable, aliasing it is fine
+ sc.deletionInfo.set(deletionInfo.get());
return sc;
}
@@ -312,11 +259,9 @@ public class SuperColumn implements ICol
{
// we don't try to intern supercolumn names, because if we're using Cassandra correctly it's almost
// certainly just going to pollute our interning map with unique, dynamic values
- SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator());
- sc.localDeletionTime = localDeletionTime;
- sc.markedForDeleteAt = markedForDeleteAt;
-
- for(Map.Entry<ByteBuffer, IColumn> c : columns_.entrySet())
+ SuperColumn sc = (SuperColumn)shallowCopy();
+
+ for(Map.Entry<ByteBuffer, IColumn> c : columns.entrySet())
{
sc.addColumn(c.getValue().localCopy(cfs));
}
@@ -414,7 +359,7 @@ class SuperColumnSerializer implements I
{
throw new IOException("Invalid localDeleteTime read: " + localDeleteTime);
}
- superColumn.markForDeleteAt(localDeleteTime, markedForDeleteAt);
+ superColumn.delete(localDeleteTime, markedForDeleteAt);
return superColumn;
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Mon Jul 4 14:36:11 2011
@@ -129,10 +129,10 @@ public class QueryFilter
// time of the cf, if that is greater.
long deletedAt = c.getMarkedForDeleteAt();
if (returnCF.getMarkedForDeleteAt() > deletedAt)
- ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
+ ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
- ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
+ ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
}
curCF.clear();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Mon Jul 4 14:36:11 2011
@@ -54,7 +54,7 @@ public class RowTest extends SchemaLoade
sc1.addColumn(column("subcolumn", "A", 0));
SuperColumn sc2 = new SuperColumn(ByteBufferUtil.bytes("one"), AsciiType.instance);
- sc2.markForDeleteAt(0, 0);
+ sc2.delete(0, 0);
SuperColumn scDiff = (SuperColumn)sc1.diff(sc2);
assertEquals(scDiff.getSubColumns().size(), 0);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Mon Jul 4 14:36:11 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,7 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -184,7 +186,7 @@ public class CompactionsPurgeTest extend
}
@Test
- public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException
+ public void testCompactionPurgeCachedRow() throws IOException, ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
@@ -230,4 +232,96 @@ public class CompactionsPurgeTest extend
for (IColumn c : cf)
assert !c.isMarkedForDelete();
}
+
+ @Test
+ public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ String tableName = "Keyspace1";
+ String cfName = "Standard1";
+ Table table = Table.open(tableName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ DecoratedKey key = Util.dk("key3");
+ RowMutation rm;
+
+ // inserts
+ rm = new RowMutation(tableName, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+ }
+ rm.apply();
+
+ // deletes row with timestamp such that not all columns are deleted
+ rm = new RowMutation(tableName, key.key);
+ rm.delete(new QueryPath(cfName, null, null), 4);
+ rm.apply();
+
+ // flush and major compact (with tombstone purging)
+ cfs.forceBlockingFlush();
+ Util.compactAll(cfs).get();
+
+ // re-inserts with timestamp lower than delete
+ rm = new RowMutation(tableName, key.key);
+ for (int i = 0; i < 5; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+ }
+ rm.apply();
+
+ // Check that the second insert did went in
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+ assertEquals(10, cf.getColumnCount());
+ for (IColumn c : cf)
+ assert !c.isMarkedForDelete();
+ }
+
+ @Test
+ public void testCompactionPurgeTombstonedSuperColumn() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ String tableName = "Keyspace1";
+ String cfName = "Super5";
+ Table table = Table.open(tableName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ DecoratedKey key = Util.dk("key5");
+ RowMutation rm;
+
+ ByteBuffer scName = ByteBufferUtil.bytes("sc");
+
+ // inserts
+ rm = new RowMutation(tableName, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, scName, ByteBuffer.wrap(String.valueOf(i).getBytes())), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+ }
+ rm.apply();
+
+ // deletes supercolumn with timestamp such that not all columns go
+ rm = new RowMutation(tableName, key.key);
+ rm.delete(new QueryPath(cfName, scName, null), 4);
+ rm.apply();
+
+ // flush and major compact
+ cfs.forceBlockingFlush();
+ Util.compactAll(cfs).get();
+
+ // re-inserts with timestamp lower than delete
+ rm = new RowMutation(tableName, key.key);
+ for (int i = 0; i < 5; i++)
+ {
+ rm.add(new QueryPath(cfName, scName, ByteBuffer.wrap(String.valueOf(i).getBytes())), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+ }
+ rm.apply();
+
+ // Check that the second insert did went in
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+ SuperColumn sc = (SuperColumn)cf.getColumn(scName);
+ assert sc != null;
+ assertEquals(10, sc.getColumnCount());
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java Mon Jul 4 14:36:11 2011
@@ -131,7 +131,7 @@ public class RowResolverTest extends Sch
// subcolumn is newer than a tombstone on its parent, but not newer than the row deletion
ColumnFamily scf1 = ColumnFamily.create("Keyspace1", "Super1");
SuperColumn sc = superColumn(scf1, "super-foo", column("one", "A", 1));
- sc.markForDeleteAt((int) (System.currentTimeMillis() / 1000), 0);
+ sc.delete((int) (System.currentTimeMillis() / 1000), 0);
scf1.addColumn(sc);
ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");