You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/22 22:50:27 UTC

svn commit: r1174383 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/filter/ src/java/org/apache/c...

Author: jbellis
Date: Thu Sep 22 20:50:26 2011
New Revision: 1174383

URL: http://svn.apache.org/viewvc?rev=1174383&view=rev
Log:
merge #3234 from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/MergeIterator.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0:1167085-1174382
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 22 20:50:26 2011
@@ -15,8 +15,9 @@
  * Base choice of random or "balanced" token on bootstrap on whether
    schema definitions were found (CASSANDRA-3219)
  * Fixes for LeveledCompactionStrategy score computation, prioritization,
-   and scheduling (CASSANDRA-3224)
+   scheduling, and performance (CASSANDRA-3224, 3234)
  * parallelize sstable open at server startup (CASSANDRA-2988)
+ * fix handling of exceptions writing to OutboundTcpConnection (CASSANDRA-3235)
 
 
 1.0.0-beta1

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0/contrib:1167085-1174382
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1174382
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1174382
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1174382
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1174382
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 22 20:50:26 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1173657,1173664
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1173617,1173663
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1174382
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1174379
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Thu Sep 22 20:50:26 2011
@@ -244,4 +244,16 @@ public abstract class AbstractColumnCont
             this.localDeletionTime = localDeletionTime;
         }
     }
+
+    public boolean hasExpiredTombstones(int gcBefore)
+    {
+        if (isMarkedForDelete() && getLocalDeletionTime() < gcBefore)
+            return true;
+
+        for (IColumn column : columns)
+            if (column.hasExpiredTombstones(gcBefore))
+                return true;
+
+        return false;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Sep 22 20:50:26 2011
@@ -280,5 +280,10 @@ public class Column implements IColumn
         if (valueValidator != null)
             valueValidator.validate(value());
     }
+
+    public boolean hasExpiredTombstones(int gcBefore)
+    {
+        return isMarkedForDelete() && getLocalDeletionTime() < gcBefore;
+    }
 }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Sep 22 20:50:26 2011
@@ -812,8 +812,10 @@ public class ColumnFamilyStore implement
 
     private static void removeDeletedStandard(ColumnFamily cf, int gcBefore)
     {
-        for (IColumn c : cf)
+        Iterator<IColumn> iter = cf.iterator();
+        while (iter.hasNext())
         {
+            IColumn c = iter.next();
             ByteBuffer cname = c.name();
             // remove columns if
             // (a) the column itself is tombstoned or
@@ -821,7 +823,7 @@ public class ColumnFamilyStore implement
             if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)
                 || c.timestamp() <= cf.getMarkedForDeleteAt())
             {
-                cf.remove(cname);
+                iter.remove();
             }
         }
     }
@@ -836,15 +838,17 @@ public class ColumnFamilyStore implement
         {
             SuperColumn c = (SuperColumn)iter.next();
             long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
-            for (IColumn subColumn : c.getSubColumns())
+            Iterator<IColumn> subIter = c.getSubColumns().iterator();
+            while (subIter.hasNext())
             {
+                IColumn subColumn = subIter.next();
                 // remove subcolumns if
                 // (a) the subcolumn itself is tombstoned or
                 // (b) the supercolumn is tombstoned and the subcolumn is not newer than it
                 if (subColumn.timestamp() <= minTimestamp
                     || (subColumn.isMarkedForDelete() && subColumn.getLocalDeletionTime() <= gcBefore))
                 {
-                    c.remove(subColumn.name());
+                    subIter.remove();
                 }
             }
             if (c.getSubColumns().isEmpty() && c.getLocalDeletionTime() <= gcBefore)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Thu Sep 22 20:50:26 2011
@@ -72,6 +72,11 @@ public interface IColumn
     boolean isLive();
 
     /**
+     * @return true if the column or any its subcolumns expired before @param gcBefore
+     */
+    public boolean hasExpiredTombstones(int gcBefore);
+
+    /**
      * For a standard column, this is the same as timestamp().
      * For a super column, this is the max column timestamp of the sub columns.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java Thu Sep 22 20:50:26 2011
@@ -44,7 +44,7 @@ public interface IColumnContainer
 
     public boolean isMarkedForDelete();
     public long getMarkedForDeleteAt();
-    public int getLocalDeletionTime();
+    public boolean hasExpiredTombstones(int gcBefore);
 
     public AbstractType getComparator();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Thu Sep 22 20:50:26 2011
@@ -100,6 +100,11 @@ public class RowIteratorFactory
             private DecoratedKey key;
             private ColumnFamily returnCF;
 
+            public boolean trivialReduceIsTrivial()
+            {
+                return false;
+            }
+
             @Override
             protected void onKeyChange()
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java Thu Sep 22 20:50:26 2011
@@ -92,4 +92,9 @@ public abstract class AbstractCompaction
      * @return the number of background tasks estimated to still be needed for this columnfamilystore
      */
     public abstract int getEstimatedRemainingTasks();
+
+    /**
+     * @return size in bytes of the largest sstables for this strategy
+     */
+    public abstract long getMaxSSTableSize();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java Thu Sep 22 20:50:26 2011
@@ -88,6 +88,11 @@ public class CompactionIterable extends 
     {
         protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
 
+        public boolean trivialReduceIsTrivial()
+        {
+            return false;
+        }
+
         public void reduce(IColumnIterator current)
         {
             rows.add((SSTableIdentityIterator) current);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Sep 22 20:50:26 2011
@@ -120,10 +120,11 @@ public class CompactionTask extends Abst
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
-        // TODO the int cast here is potentially buggy
-        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(toCompact));
+        long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
+        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
+        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         if (logger.isDebugEnabled())
-            logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+            logger.debug("Expected bloom filter size : " + keysPerSSTable);
 
         AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
                                       ? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
@@ -152,7 +153,7 @@ public class CompactionTask extends Abst
                 return 0;
             }
 
-            SSTableWriter writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
+            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
             writers.add(writer);
             while (nni.hasNext())
             {
@@ -179,7 +180,7 @@ public class CompactionTask extends Abst
                     SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
                     cachedKeyMap.put(toIndex, cachedKeys);
                     sstables.add(toIndex);
-                    writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
+                    writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
                     writers.add(writer);
                     cachedKeys = new HashMap<DecoratedKey, Long>();
                 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Thu Sep 22 20:50:26 2011
@@ -215,6 +215,11 @@ public class LazilyCompactedRow extends 
         int size = 0;
         long maxTimestampSeen = Long.MIN_VALUE;
 
+        public boolean trivialReduceIsTrivial()
+        {
+            return true;
+        }
+
         public void reduce(IColumn current)
         {
             container.addColumn(current);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Thu Sep 22 20:50:26 2011
@@ -28,11 +28,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
@@ -47,7 +45,7 @@ public class LeveledCompactionStrategy e
 
     private LeveledManifest manifest;
     private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
-    private final int maxSSTableSize;
+    private final int maxSSTableSizeInMB;
     private final AtomicReference<LeveledCompactionTask> task = new AtomicReference<LeveledCompactionTask>();
 
     public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
@@ -70,12 +68,12 @@ public class LeveledCompactionStrategy e
                 }
             }
         }
-        maxSSTableSize = configuredMaxSSTableSize;
+        maxSSTableSizeInMB = configuredMaxSSTableSize;
 
         cfs.getDataTracker().subscribe(this);
         logger.info(this + " subscribed to the data tracker.");
 
-        manifest = LeveledManifest.create(cfs, this.maxSSTableSize);
+        manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB);
         logger.debug("Created {}", manifest);
         // override min/max for this strategy
         cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
@@ -119,7 +117,7 @@ public class LeveledCompactionStrategy e
             return Collections.emptyList();
         }
 
-        LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize);
+        LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSizeInMB);
         return task.compareAndSet(currentTask, newTask)
                ? Collections.<AbstractCompactionTask>singletonList(newTask)
                : Collections.<AbstractCompactionTask>emptyList();
@@ -156,6 +154,11 @@ public class LeveledCompactionStrategy e
         }
     }
 
+    public long getMaxSSTableSize()
+    {
+        return maxSSTableSizeInMB * 1024 * 1024;
+    }
+
     @Override
     public String toString()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Thu Sep 22 20:50:26 2011
@@ -89,10 +89,10 @@ public class ParallelCompactionIterable 
 
     private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
     {
-        private final MergeIterator<RowContainer, CompactedRowContainer> reducer;
+        private final CloseableIterator<CompactedRowContainer> reducer;
         private final CompactionController controller;
 
-        public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller)
+        public Unwrapper(CloseableIterator<CompactedRowContainer> reducer, CompactionController controller)
         {
             this.reducer = reducer;
             this.controller = controller;
@@ -148,6 +148,11 @@ public class ParallelCompactionIterable 
         private final ThreadPoolExecutor executor;
         private int row = 0;
 
+        public boolean trivialReduceIsTrivial()
+        {
+            return false;
+        }
+
         private Reducer()
         {
             super();
@@ -224,6 +229,7 @@ public class ParallelCompactionIterable 
                     }
                     else
                     {
+                        // addAll is ok even if cf is an ArrayBackedSortedColumns
                         cf.addAll(thisCF, HeapAllocator.instance);
                     }
                 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Thu Sep 22 20:50:26 2011
@@ -66,7 +66,27 @@ public class PrecompactedRow extends Abs
 
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey<?> key, CompactionController controller, ColumnFamily cf)
     {
-        return removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf);
+        // avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS
+        // gets behind and has hundreds of overlapping L0 sstables.  Essentially, this method is an
+        // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf),
+        // taking this into account.
+        Boolean shouldPurge = null;
+
+        if (cf.hasExpiredTombstones(controller.gcBefore))
+            shouldPurge = controller.shouldPurge(key);
+        ColumnFamily compacted = shouldPurge != null && shouldPurge
+                               ? ColumnFamilyStore.removeDeleted(cf, controller.gcBefore)
+                               : cf;
+
+        if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
+        {
+            if (shouldPurge == null)
+                shouldPurge = controller.shouldPurge(key);
+            if (shouldPurge)
+                CounterColumn.removeOldShards(compacted, controller.gcBefore);
+        }
+
+        return compacted;
     }
 
     public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, CompactionController controller, ColumnFamily cf)
@@ -105,6 +125,7 @@ public class PrecompactedRow extends Abs
             }
             else
             {
+                // addAll is ok even if cf is an ArrayBackedSortedColumns
                 cf.addAll(thisCF, HeapAllocator.instance);
             }
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java Thu Sep 22 20:50:26 2011
@@ -176,6 +176,11 @@ public class SizeTieredCompactionStrateg
         return minSSTableSize;
     }
 
+    public long getMaxSSTableSize()
+    {
+        return Long.MAX_VALUE;
+    }
+
     public String toString()
     {
         return String.format("SizeTieredCompactionStrategy[%s/%s]",

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Sep 22 20:50:26 2011
@@ -93,10 +93,15 @@ public class QueryFilter
         Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator);
         // define a 'reduced' iterator that merges columns w/ the same name, which
         // greatly simplifies computing liveColumns in the presence of tombstones.
-        Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>()
+        MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>()
         {
             ColumnFamily curCF = returnCF.cloneMeShallow();
 
+            public boolean trivialReduceIsTrivial()
+            {
+                return true;
+            }
+
             protected boolean isEqual(IColumn o1, IColumn o2)
             {
                 return o1.name().equals(o2.name());
@@ -111,7 +116,7 @@ public class QueryFilter
                     // consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the
                     // result which removes column; which shouldn't be done on the original super column).
                     assert current instanceof SuperColumn;
-                    curCF.addColumn(((SuperColumn)current).cloneMe());
+                    curCF.addColumn(((SuperColumn) current).cloneMe());
                 }
                 else
                 {
@@ -129,16 +134,17 @@ public class QueryFilter
                     // time of the cf, if that is greater.
                     long deletedAt = c.getMarkedForDeleteAt();
                     if (returnCF.getMarkedForDeleteAt() > deletedAt)
-                        ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
+                        ((SuperColumn) c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
 
-                    c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
-                    ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
+                    c = filter.filterSuperColumn((SuperColumn) c, gcBefore);
+                    ((SuperColumn) c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
                 }
-                curCF.clear();           
+                curCF.clear();
 
                 return c;
             }
-        });
+        };
+        Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer);
 
         topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Sep 22 20:50:26 2011
@@ -24,16 +24,15 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
 public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
 {
-    private final MergeIterator<DecoratedKey,DecoratedKey> mi;
+    private final IMergeIterator<DecoratedKey,DecoratedKey> mi;
 
     public ReducingKeyIterator(Collection<SSTableReader> sstables)
     {
@@ -44,6 +43,11 @@ public class ReducingKeyIterator impleme
         {
             DecoratedKey<?> reduced = null;
 
+            public boolean trivialReduceIsTrivial()
+            {
+                return true;
+            }
+
             public void reduce(DecoratedKey current)
             {
                 reduced = current;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Thu Sep 22 20:50:26 2011
@@ -27,9 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -228,7 +226,7 @@ public class SSTableIdentityIterator imp
     public ColumnFamily getColumnFamilyWithColumns() throws IOException
     {
         assert inputWithTracker.getBytesRead() == headerSize();
-        ColumnFamily cf = columnFamily.cloneMeShallow();
+        ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
         // since we already read column count, just pass that value and continue deserialization
         ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote);
         if (validateColumns)

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Thu Sep 22 20:50:26 2011
@@ -136,7 +136,7 @@ public class OutboundTcpConnection exten
         }
     }
 
-    static void write(Message message, String id, DataOutputStream out)
+    static void write(Message message, String id, DataOutputStream out) throws IOException
     {
         /*
          Setting up the protocol header. This is 4 bytes long
@@ -157,23 +157,16 @@ public class OutboundTcpConnection exten
         // Setting up the version bit
         header |= (message.getVersion() << 8);
 
-        try
-        {
-            out.writeInt(MessagingService.PROTOCOL_MAGIC);
-            out.writeInt(header);
-            // compute total Message length for compatibility w/ 0.8 and earlier
-            byte[] bytes = message.getMessageBody();
-            int total = messageLength(message.header_, id, bytes);
-            out.writeInt(total);
-            out.writeUTF(id);
-            Header.serializer().serialize(message.header_, out, message.getVersion());
-            out.writeInt(bytes.length);
-            out.write(bytes);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
+        out.writeInt(MessagingService.PROTOCOL_MAGIC);
+        out.writeInt(header);
+        // compute total Message length for compatibility w/ 0.8 and earlier
+        byte[] bytes = message.getMessageBody();
+        int total = messageLength(message.header_, id, bytes);
+        out.writeInt(total);
+        out.writeUTF(id);
+        Header.serializer().serialize(message.header_, out, message.getVersion());
+        out.writeInt(bytes.length);
+        out.write(bytes);
     }
 
     public static int messageLength(Header header, String id, byte[] bytes)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Sep 22 20:50:26 2011
@@ -86,7 +86,8 @@ public class RangeSliceResponseResolver 
             iters.add(new RowIterator(reply.rows.iterator(), response.getFrom()));
         }
         // for each row, compute the combination of all different versions seen, and repair incomplete versions
-        MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
+        // TODO do we need to call close?
+        CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
 
         List<Row> resolvedRows = new ArrayList<Row>(n);
         while (iter.hasNext())
@@ -140,6 +141,11 @@ public class RangeSliceResponseResolver 
         List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
         DecoratedKey key;
 
+        public boolean trivialReduceIsTrivial()
+        {
+            return false;
+        }
+
         public void reduce(Pair<Row,InetAddress> current)
         {
             key = current.left.key;

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/MergeIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/MergeIterator.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/MergeIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/MergeIterator.java Thu Sep 22 20:50:26 2011
@@ -20,50 +20,32 @@ package org.apache.cassandra.utils;
 
 import java.io.IOException;
 import java.io.IOError;
-import java.util.ArrayDeque;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Ordering;
 
 /** Merges sorted input iterators which individually contain unique items. */
-public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements CloseableIterator<Out>
+public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out>
 {
-    public final Comparator<In> comp;
+    protected final Reducer<In,Out> reducer;
     protected final List<? extends CloseableIterator<In>> iterators;
-    // a queue for return: all candidates must be open and have at least one item
-    protected final PriorityQueue<Candidate<In>> queue;
 
-    protected MergeIterator(List<? extends CloseableIterator<In>> iters, Comparator<In> comp)
+    protected MergeIterator(List<? extends CloseableIterator<In>> iters, Reducer<In, Out> reducer)
     {
         this.iterators = iters;
-        this.comp = comp;
-        this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size()));
-        for (CloseableIterator<In> iter : iters)
-        {
-            Candidate<In> candidate = new Candidate<In>(iter, comp);
-            if (!candidate.advance())
-                // was empty
-                continue;
-            this.queue.add(candidate);
-        }
-    }
-
-    public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters)
-    {
-        return get(iters, (Comparator<E>)Ordering.natural());
-    }
-
-    public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters, Comparator<E> comp)
-    {
-        return new OneToOne<E>(iters, comp);
+        this.reducer = reducer;
     }
 
-    public static <In,Out> MergeIterator<In,Out> get(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer)
-    {
-        return new ManyToOne<In,Out>(iters, comp, reducer);
+    public static <In, Out> IMergeIterator<In, Out> get(final List<? extends CloseableIterator<In>> sources,
+                                                    Comparator<In> comparator,
+                                                    final Reducer<In, Out> reducer)
+    {
+        assert !sources.isEmpty();
+        if (sources.size() == 1)
+            return reducer.trivialReduceIsTrivial()
+                   ? new TrivialOneToOne<In, Out>(sources, reducer)
+                   : new OneToOne<In, Out>(sources, reducer);
+        return new ManyToOne<In, Out>(sources, comparator, reducer);
     }
 
     public Iterable<? extends CloseableIterator<In>> iterators()
@@ -71,23 +53,6 @@ public abstract class MergeIterator<In,O
         return iterators;
     }
 
-    /**
-     * Consumes sorted items from the queue: should only remove items from the queue,
-     * not add them.
-     */
-    protected abstract Out consume();
-
-    /**
-     * Returns consumed items to the queue.
-     */
-    protected abstract void advance();
-
-    protected final Out computeNext()
-    {
-        advance();
-        return consume();
-    }
-
     public void close()
     {
         for (CloseableIterator<In> iterator : this.iterators)
@@ -103,47 +68,38 @@ public abstract class MergeIterator<In,O
         }
     }
 
-    /** A MergeIterator that returns a single value for each one consumed. */
-    private static final class OneToOne<E> extends MergeIterator<E,E>
-    {
-        // the last returned candidate, so that we can lazily call 'advance()'
-        protected Candidate<E> candidate;
-        public OneToOne(List<? extends CloseableIterator<E>> iters, Comparator<E> comp)
-        {
-            super(iters, comp);
-        }
-
-        protected final E consume()
-        {
-            candidate = queue.poll();
-            if (candidate == null)
-                return endOfData();
-            return candidate.item;
-        }
-
-        protected final void advance()
-        {
-            if (candidate != null && candidate.advance())
-                // has more items
-                queue.add(candidate);
-        }
-    }
-
     /** A MergeIterator that consumes multiple input values per output value. */
     private static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
     {
-        protected final Reducer<In,Out> reducer;
+        public final Comparator<In> comp;
+        // a queue for return: all candidates must be open and have at least one item
+        protected final PriorityQueue<Candidate<In>> queue;
         // a stack of the last consumed candidates, so that we can lazily call 'advance()'
         // TODO: if we had our own PriorityQueue implementation we could stash items
         // at the end of its array, so we wouldn't need this storage
         protected final ArrayDeque<Candidate<In>> candidates;
         public ManyToOne(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer)
         {
-            super(iters, comp);
-            this.reducer = reducer;
+            super(iters, reducer);
+            this.comp = comp;
+            this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size()));
+            for (CloseableIterator<In> iter : iters)
+            {
+                Candidate<In> candidate = new Candidate<In>(iter, comp);
+                if (!candidate.advance())
+                    // was empty
+                    continue;
+                this.queue.add(candidate);
+            }
             this.candidates = new ArrayDeque<Candidate<In>>(queue.size());
         }
 
+        protected final Out computeNext()
+        {
+            advance();
+            return consume();
+        }
+
         /** Consume values by sending them to the reducer while they are equal. */
         protected final Out consume()
         {
@@ -177,17 +133,13 @@ public abstract class MergeIterator<In,O
         private final CloseableIterator<In> iter;
         private final Comparator<In> comp;
         private In item;
+
         public Candidate(CloseableIterator<In> iter, Comparator<In> comp)
         {
             this.iter = iter;
             this.comp = comp;
         }
 
-        public In item()
-        {
-            return item;
-        }
-
         /** @return True if our iterator had an item, and it is now available */
         protected boolean advance()
         {
@@ -207,6 +159,11 @@ public abstract class MergeIterator<In,O
     public static abstract class Reducer<In,Out>
     {
         /**
+         * @return true if Out is the same as In for the case of a single source iterator
+         */
+        public abstract boolean trivialReduceIsTrivial();
+
+        /**
          * combine this object with the previous ones.
          * intermediate state is up to your implementation.
          */
@@ -221,4 +178,42 @@ public abstract class MergeIterator<In,O
          */
         protected void onKeyChange() {}
     }
+
+    private static class OneToOne<In, Out> extends MergeIterator<In, Out>
+    {
+        private final CloseableIterator<In> source;
+
+        public OneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer)
+        {
+            super(sources, reducer);
+            source = sources.get(0);
+        }
+
+        protected Out computeNext()
+        {
+            if (!source.hasNext())
+                return endOfData();
+            reducer.onKeyChange();
+            reducer.reduce(source.next());
+            return reducer.getReduced();
+        }
+    }
+
+    private static class TrivialOneToOne<In, Out> extends MergeIterator<In, Out>
+    {
+        private final CloseableIterator<?> source;
+
+        public TrivialOneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer)
+        {
+            super(sources, reducer);
+            source = sources.get(0);
+        }
+
+        protected Out computeNext()
+        {
+            if (!source.hasNext())
+                return endOfData();
+            return (Out) source.next();
+        }
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1174383&r1=1174382&r2=1174383&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Sep 22 20:50:26 2011
@@ -19,9 +19,7 @@
 package org.apache.cassandra.utils;
 
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
@@ -45,16 +43,6 @@ public class MergeIteratorTest
         d = new CLI();
     }
 
-    @Test
-    public void testOneToOne() throws Exception
-    {
-        MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
-                                                             Ordering.<String>natural());
-        assert Iterators.elementsEqual(all, smi);
-        smi.close();
-        assert a.closed && b.closed && c.closed && d.closed;
-    }
-
     /** Test that duplicate values are concatted. */
     @Test
     public void testManyToOne() throws Exception
@@ -62,6 +50,12 @@ public class MergeIteratorTest
         MergeIterator.Reducer<String,String> reducer = new MergeIterator.Reducer<String,String>()
         {
             String concatted = "";
+
+            public boolean trivialReduceIsTrivial()
+            {
+                return false; // technically true, but let's not optimize anything away here...
+            }
+
             public void reduce(String value)
             {
                 concatted += value;
@@ -74,7 +68,7 @@ public class MergeIteratorTest
                 return tmp;
             }
         };
-        MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+        IMergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
                                                              Ordering.<String>natural(),
                                                              reducer);
         assert Iterators.elementsEqual(cat, smi);