You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/18 05:31:39 UTC

svn commit: r1082820 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/

Author: jbellis
Date: Fri Mar 18 04:31:38 2011
New Revision: 1082820

URL: http://svn.apache.org/viewvc?rev=1082820&view=rev
Log:
ensure size calculation and write phase of large-row compaction use the same threshold for TTL expiration
patch by slebresne; reviewed by jbellis for CASSANDRA-2349

Added:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Mar 18 04:31:38 2011
@@ -10,6 +10,8 @@
  * reduce contention on Table.flusherLock (CASSANDRA-1954)
  * fix comparator used for non-indexed secondary expressions in index scan
    (CASSANDRA-2347)
+ * ensure size calculation and write phase of large-row compaction use
+   the same threshold for TTL expiration (CASSANDRA-2349)
 
 
 0.7.4

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamily.java Fri Mar 18 04:31:38 2011
@@ -35,7 +35,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -70,7 +70,7 @@ public class ColumnFamily implements ICo
     private final Integer cfid;
     private final ColumnFamilyType type;
 
-    private transient ICompactSerializer2<IColumn> columnSerializer;
+    private transient IColumnSerializer columnSerializer;
     final AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
     final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
     private ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
@@ -135,7 +135,7 @@ public class ColumnFamily implements ICo
     /**
      * FIXME: Gross.
      */
-    public ICompactSerializer2<IColumn> getColumnSerializer()
+    public IColumnSerializer getColumnSerializer()
     {
         return columnSerializer;
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java Fri Mar 18 04:31:38 2011
@@ -29,10 +29,10 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class ColumnSerializer implements ICompactSerializer2<IColumn>
+public class ColumnSerializer implements IColumnSerializer
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnSerializer.class);
 
@@ -63,6 +63,11 @@ public class ColumnSerializer implements
 
     public Column deserialize(DataInput dis) throws IOException
     {
+        return deserialize(dis, (int) (System.currentTimeMillis() / 1000));
+    }
+
+    public Column deserialize(DataInput dis, int expireBefore) throws IOException
+    {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
             throw new CorruptColumnException("invalid column name length " + name.remaining());
@@ -74,7 +79,7 @@ public class ColumnSerializer implements
             int expiration = dis.readInt();
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            if ((int) (System.currentTimeMillis() / 1000 ) > expiration)
+            if (expiration < expireBefore)
             {
                 // the column is now expired, we can safely return a simple
                 // tombstone

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java Fri Mar 18 04:31:38 2011
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.ColumnSortedMap;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -308,7 +308,7 @@ public class SuperColumn implements ICol
     }
 }
 
-class SuperColumnSerializer implements ICompactSerializer2<IColumn>
+class SuperColumnSerializer implements IColumnSerializer
 {
     private static Logger logger = LoggerFactory.getLogger(SuperColumnSerializer.class);
 
@@ -348,6 +348,11 @@ class SuperColumnSerializer implements I
 
     public IColumn deserialize(DataInput dis) throws IOException
     {
+        return deserialize(dis, (int)(System.currentTimeMillis() / 1000));
+    }
+
+    public IColumn deserialize(DataInput dis, int expireBefore) throws IOException
+    {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         int localDeleteTime = dis.readInt();
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
@@ -359,7 +364,7 @@ class SuperColumnSerializer implements I
         /* read the number of columns */
         int size = dis.readInt();
         ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size);
+        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, expireBefore);
         SuperColumn superColumn = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer,IColumn>(preSortedMap));
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
         {

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1082820&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/IColumnSerializer.java Fri Mar 18 04:31:38 2011
@@ -0,0 +1,32 @@
+package org.apache.cassandra.io;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IColumn;
+
+public interface IColumnSerializer extends ICompactSerializer2<IColumn>
+{
+    public IColumn deserialize(DataInput in, int expireBefore) throws IOException;
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Fri Mar 18 04:31:38 2011
@@ -52,6 +52,9 @@ public class SSTableIdentityIterator imp
     public final int columnCount;
     private final long columnPosition;
 
+    // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
+    private final int expireBefore;
+
     /**
      * Used to iterate through the columns of a row.
      * @param sstable SSTable we are reading ffrom.
@@ -75,6 +78,7 @@ public class SSTableIdentityIterator imp
         this.key = key;
         this.dataStart = dataStart;
         this.dataSize = dataSize;
+        this.expireBefore = (int)(System.currentTimeMillis() / 1000);
         finishedAt = dataStart + dataSize;
 
         try
@@ -137,7 +141,7 @@ public class SSTableIdentityIterator imp
     {
         try
         {
-            return sstable.getColumnSerializer().deserialize(file);
+            return sstable.getColumnSerializer().deserialize(file, expireBefore);
         }
         catch (IOException e)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Mar 18 04:31:38 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
@@ -583,7 +583,7 @@ public class SSTableReader extends SSTab
         return ColumnFamily.create(metadata);
     }
 
-    public ICompactSerializer2<IColumn> getColumnSerializer()
+    public IColumnSerializer getColumnSerializer()
     {
         return metadata.cfType == ColumnFamilyType.Standard
                ? Column.serializer()

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1082820&r1=1082819&r2=1082820&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Fri Mar 18 04:31:38 2011
@@ -42,13 +42,15 @@ public class ColumnSortedMap implements 
     private DataInput dis;
     private Comparator<ByteBuffer> comparator;
     private int length;
+    private final int expireBefore;
 
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length)
+    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, int expireBefore)
     {
         this.comparator = comparator;
         this.serializer = serializer;
         this.dis = dis;
         this.length = length;
+        this.expireBefore = expireBefore;
     }
 
     public int size()
@@ -138,7 +140,7 @@ public class ColumnSortedMap implements 
 
     public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
     {
-        return new ColumnSet(serializer, dis, length);
+        return new ColumnSet(serializer, dis, length, expireBefore);
     }
 }
 
@@ -147,12 +149,14 @@ class ColumnSet implements Set<Map.Entry
     private ColumnSerializer serializer;
     private DataInput dis;
     private int length;
+    private final int expireBefore;
 
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length)
+    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, int expireBefore)
     {
         this.serializer = serializer;
         this.dis = dis;
         this.length = length;
+        this.expireBefore = expireBefore;
     }
 
     public int size()
@@ -172,7 +176,7 @@ class ColumnSet implements Set<Map.Entry
 
     public Iterator<Entry<ByteBuffer, IColumn>> iterator()
     {
-        return new ColumnIterator(serializer, dis, length);
+        return new ColumnIterator(serializer, dis, length, expireBefore);
     }
 
     public Object[] toArray()
@@ -226,12 +230,14 @@ class ColumnIterator implements Iterator
     private DataInput dis;
     private int length;
     private int count = 0;
+    private final int expireBefore;
 
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length)
+    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, int expireBefore)
     {
         this.dis = dis;
         this.serializer = serializer;
         this.length = length;
+        this.expireBefore = expireBefore;
     }
 
     private IColumn deserializeNext()
@@ -239,7 +245,7 @@ class ColumnIterator implements Iterator
         try
         {
             count++;
-            return serializer.deserialize(dis);
+            return serializer.deserialize(dis, expireBefore);
         }
         catch (IOException e)
         {