You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/18 05:31:39 UTC
svn commit: r1082820 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/util/
Author: jbellis
Date: Fri Mar 18 04:31:38 2011
New Revision: 1082820
URL: http://svn.apache.org/viewvc?rev=1082820&view=rev
Log:
ensure size calculation and write phase of large-row compaction use the same threshold for TTL expiration
patch by slebresne; reviewed by jbellis for CASSANDRA-2349
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Mar 18 04:31:38 2011
@@ -10,6 +10,8 @@
* reduce contention on Table.flusherLock (CASSANDRA-1954)
* fix comparator used for non-indexed secondary expressions in index scan
(CASSANDRA-2347)
+ * ensure size calculation and write phase of large-row compaction use
+ the same threshold for TTL expiration (CASSANDRA-2349)
0.7.4
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java Fri Mar 18 04:31:38 2011
@@ -35,7 +35,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.FBUtilities;
@@ -70,7 +70,7 @@ public class ColumnFamily implements ICo
private final Integer cfid;
private final ColumnFamilyType type;
- private transient ICompactSerializer2<IColumn> columnSerializer;
+ private transient IColumnSerializer columnSerializer;
final AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
private ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
@@ -135,7 +135,7 @@ public class ColumnFamily implements ICo
/**
* FIXME: Gross.
*/
- public ICompactSerializer2<IColumn> getColumnSerializer()
+ public IColumnSerializer getColumnSerializer()
{
return columnSerializer;
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java Fri Mar 18 04:31:38 2011
@@ -29,10 +29,10 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class ColumnSerializer implements ICompactSerializer2<IColumn>
+public class ColumnSerializer implements IColumnSerializer
{
private static final Logger logger = LoggerFactory.getLogger(ColumnSerializer.class);
@@ -63,6 +63,11 @@ public class ColumnSerializer implements
public Column deserialize(DataInput dis) throws IOException
{
+ return deserialize(dis, (int) (System.currentTimeMillis() / 1000));
+ }
+
+ public Column deserialize(DataInput dis, int expireBefore) throws IOException
+ {
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
if (name.remaining() <= 0)
throw new CorruptColumnException("invalid column name length " + name.remaining());
@@ -74,7 +79,7 @@ public class ColumnSerializer implements
int expiration = dis.readInt();
long ts = dis.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(dis);
- if ((int) (System.currentTimeMillis() / 1000 ) > expiration)
+ if (expiration < expireBefore)
{
// the column is now expired, we can safely return a simple
// tombstone
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java Fri Mar 18 04:31:38 2011
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.ColumnSortedMap;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -308,7 +308,7 @@ public class SuperColumn implements ICol
}
}
-class SuperColumnSerializer implements ICompactSerializer2<IColumn>
+class SuperColumnSerializer implements IColumnSerializer
{
private static Logger logger = LoggerFactory.getLogger(SuperColumnSerializer.class);
@@ -348,6 +348,11 @@ class SuperColumnSerializer implements I
public IColumn deserialize(DataInput dis) throws IOException
{
+ return deserialize(dis, (int)(System.currentTimeMillis() / 1000));
+ }
+
+ public IColumn deserialize(DataInput dis, int expireBefore) throws IOException
+ {
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
int localDeleteTime = dis.readInt();
if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
@@ -359,7 +364,7 @@ class SuperColumnSerializer implements I
/* read the number of columns */
int size = dis.readInt();
ColumnSerializer serializer = Column.serializer();
- ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size);
+ ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, expireBefore);
SuperColumn superColumn = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer,IColumn>(preSortedMap));
if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
{
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1082820&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java Fri Mar 18 04:31:38 2011
@@ -0,0 +1,32 @@
+package org.apache.cassandra.io;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IColumn;
+
+public interface IColumnSerializer extends ICompactSerializer2<IColumn>
+{
+ public IColumn deserialize(DataInput in, int expireBefore) throws IOException;
+}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Fri Mar 18 04:31:38 2011
@@ -52,6 +52,9 @@ public class SSTableIdentityIterator imp
public final int columnCount;
private final long columnPosition;
+ // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
+ private final int expireBefore;
+
/**
* Used to iterate through the columns of a row.
* @param sstable SSTable we are reading ffrom.
@@ -75,6 +78,7 @@ public class SSTableIdentityIterator imp
this.key = key;
this.dataStart = dataStart;
this.dataSize = dataSize;
+ this.expireBefore = (int)(System.currentTimeMillis() / 1000);
finishedAt = dataStart + dataSize;
try
@@ -137,7 +141,7 @@ public class SSTableIdentityIterator imp
{
try
{
- return sstable.getColumnSerializer().deserialize(file);
+ return sstable.getColumnSerializer().deserialize(file, expireBefore);
}
catch (IOException e)
{
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Mar 18 04:31:38 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
@@ -583,7 +583,7 @@ public class SSTableReader extends SSTab
return ColumnFamily.create(metadata);
}
- public ICompactSerializer2<IColumn> getColumnSerializer()
+ public IColumnSerializer getColumnSerializer()
{
return metadata.cfType == ColumnFamilyType.Standard
? Column.serializer()
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Fri Mar 18 04:31:38 2011
@@ -42,13 +42,15 @@ public class ColumnSortedMap implements
private DataInput dis;
private Comparator<ByteBuffer> comparator;
private int length;
+ private final int expireBefore;
- public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length)
+ public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, int expireBefore)
{
this.comparator = comparator;
this.serializer = serializer;
this.dis = dis;
this.length = length;
+ this.expireBefore = expireBefore;
}
public int size()
@@ -138,7 +140,7 @@ public class ColumnSortedMap implements
public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
{
- return new ColumnSet(serializer, dis, length);
+ return new ColumnSet(serializer, dis, length, expireBefore);
}
}
@@ -147,12 +149,14 @@ class ColumnSet implements Set<Map.Entry
private ColumnSerializer serializer;
private DataInput dis;
private int length;
+ private final int expireBefore;
- public ColumnSet(ColumnSerializer serializer, DataInput dis, int length)
+ public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, int expireBefore)
{
this.serializer = serializer;
this.dis = dis;
this.length = length;
+ this.expireBefore = expireBefore;
}
public int size()
@@ -172,7 +176,7 @@ class ColumnSet implements Set<Map.Entry
public Iterator<Entry<ByteBuffer, IColumn>> iterator()
{
- return new ColumnIterator(serializer, dis, length);
+ return new ColumnIterator(serializer, dis, length, expireBefore);
}
public Object[] toArray()
@@ -226,12 +230,14 @@ class ColumnIterator implements Iterator
private DataInput dis;
private int length;
private int count = 0;
+ private final int expireBefore;
- public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length)
+ public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, int expireBefore)
{
this.dis = dis;
this.serializer = serializer;
this.length = length;
+ this.expireBefore = expireBefore;
}
private IColumn deserializeNext()
@@ -239,7 +245,7 @@ class ColumnIterator implements Iterator
try
{
count++;
- return serializer.deserialize(dis);
+ return serializer.deserialize(dis, expireBefore);
}
catch (IOException e)
{