You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/11 18:53:03 UTC

svn commit: r1080702 - in /cassandra/trunk: ./ contrib/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/marshal/ src/java/or...

Author: jbellis
Date: Fri Mar 11 17:53:02 2011
New Revision: 1080702

URL: http://svn.apache.org/viewvc?rev=1080702&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7:1026516-1080312
+/cassandra/branches/cassandra-0.7:1026516-1080684
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Mar 11 17:53:02 2011
@@ -30,6 +30,8 @@
  * fix commitlog replay when flush position refers to data that didn't
    get synced before server died (CASSANDRA-2285)
  * fix fd leak in sstable2json with non-mmap'd i/o (CASSANDRA-2304)
+ * reduce memory use during streaming of multiple sstables (CASSANDRA-2301)
+ * purge tombstoned rows from cache after GCGraceSeconds (CASSANDRA-2305)
 
 
 0.7.3

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1080312
+/cassandra/branches/cassandra-0.7/contrib:1026516-1080684
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Mar 11 17:53:02 2011
@@ -171,15 +171,15 @@ public class CassandraStorage extends Lo
     {
         if (System.getenv(PIG_RPC_PORT) != null)
             ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
-        else
+        else if (ConfigHelper.getRpcPort(conf) == 0) 
             throw new IOException("PIG_RPC_PORT environment variable not set");
         if (System.getenv(PIG_INITIAL_ADDRESS) != null)
             ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-        else
+        else if (ConfigHelper.getInitialAddress(conf) == null) 
             throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
         if (System.getenv(PIG_PARTITIONER) != null)
             ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
-        else
+        else if (ConfigHelper.getPartitioner(conf) == null) 
             throw new IOException("PIG_PARTITIONER environment variable not set");
     }
 

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1080312
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1080684
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1080312
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1080684
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1080312
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1080684
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1080312
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1080684
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 11 17:53:02 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1080312
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1080684
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 11 17:53:02 2011
@@ -1795,7 +1795,7 @@ public class ColumnFamilyStore implement
         return ssTables.getRowCache().getCapacity() == 0 ? null : ssTables.getRowCache().getInternal(key);
     }
 
-    void invalidateCachedRow(DecoratedKey key)
+    public void invalidateCachedRow(DecoratedKey key)
     {
         ssTables.getRowCache().remove(key);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Mar 11 17:53:02 2011
@@ -935,7 +935,7 @@ public class CompactionManager implement
             return executor.submit(runnable);
     }
 
-    public Future<SSTableReader> submitSSTableBuild(Descriptor desc, OperationType type)
+    public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
     {
         // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
         final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
@@ -946,7 +946,7 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
-                    executor.beginCompaction(builder.cfs.columnFamily, builder);
+                    executor.beginCompaction(desc.cfname, builder);
                     return builder.build();
                 }
                 finally

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Fri Mar 11 17:53:02 2011
@@ -65,9 +65,9 @@ public class AsciiType extends AbstractT
     public void validate(ByteBuffer bytes) throws MarshalException
     {
         // 0-127
-        for (int i = 0; i < bytes.remaining(); i++)
+        for (int i = bytes.position(); i < bytes.limit(); i++)
         {
-            byte b = bytes.get(bytes.position() + i);
+            byte b = bytes.get(i);
             if (b < 0 || b > 127)
                 throw new MarshalException("Invalid byte for ascii: " + Byte.toString(b));
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Fri Mar 11 17:53:02 2011
@@ -106,7 +106,15 @@ implements Closeable, ICompactionInfo
         try
         {
             AbstractCompactedRow compactedRow = getCompactedRow();
-            return compactedRow.isEmpty() ? null : compactedRow;
+            if (compactedRow.isEmpty())
+            {
+                cfs.invalidateCachedRow(compactedRow.key);
+                return null;
+            }
+            else
+            {
+                return compactedRow;
+            }
         }
         finally
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Mar 11 17:53:02 2011
@@ -246,13 +246,23 @@ public class SSTableWriter extends SSTab
     public static class Builder implements ICompactionInfo
     {
         private final Descriptor desc;
-        public final ColumnFamilyStore cfs;
-        private final RowIndexer indexer;
+        private final OperationType type;
+        private final ColumnFamilyStore cfs;
+        private RowIndexer indexer;
 
         public Builder(Descriptor desc, OperationType type)
         {
             this.desc = desc;
+            this.type = type;
             cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+        }
+
+        // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
+        // since the 8MB buffers can use up heap quickly
+        private void maybeOpenIndexer()
+        {
+            if (indexer != null)
+                return;
             try
             {
                 if (cfs.metadata.getDefaultValidator().isCommutative())
@@ -270,6 +280,8 @@ public class SSTableWriter extends SSTab
         {
             if (cfs.isInvalid())
                 return null;
+            maybeOpenIndexer();
+
             File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
             File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
             assert !ifile.exists();
@@ -286,8 +298,10 @@ public class SSTableWriter extends SSTab
 
         public long getTotalBytes()
         {
+            maybeOpenIndexer();
             try
             {
+                // (length is still valid post-close)
                 return indexer.dfile.length();
             }
             catch (IOException e)
@@ -298,6 +312,8 @@ public class SSTableWriter extends SSTab
 
         public long getBytesComplete()
         {
+            maybeOpenIndexer();
+            // (getFilePointer is still valid post-close)
             return indexer.dfile.getFilePointer();
         }
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=1080702&r1=1080701&r2=1080702&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java Fri Mar 11 17:53:02 2011
@@ -178,4 +178,53 @@ public class CompactionsPurgeTest extend
         ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
         assert cf == null : cf;
     }
+
+    @Test
+    public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        String tableName = "RowCacheSpace";
+        String cfName = "CachedCF";
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key3");
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBuffer.wrap(String.valueOf(i).getBytes())), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+
+        // move the key up in row cache
+        cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+
+        // deletes row
+        rm = new RowMutation(tableName, key.key);
+        rm.delete(new QueryPath(cfName, null, null), 1);
+        rm.apply();
+
+        // flush and major compact
+        cfs.forceBlockingFlush();
+        CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get();
+        //cfs.invalidateCachedRow(key);
+
+        // re-inserts with timestamp lower than delete
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBuffer.wrap(String.valueOf(i).getBytes())), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+
+        // Check that the second insert did went in
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+        assert cf.getColumnCount() == 10;
+        for (IColumn c : cf)
+            assert !c.isMarkedForDelete();
+    }
 }