You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/07/06 18:32:39 UTC

[01/16] cassandra git commit: Range tombstones that are masked by row tombstones should not be written out

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 43c741e25 -> 00e7ecf13
  refs/heads/cassandra-3.0 9ed3b42d3 -> 778f2a46e
  refs/heads/cassandra-3.9 5ad17634a -> 59ee46e55
  refs/heads/trunk b4133f38d -> 9fd607778


Range tombstones that are masked by row tombstones should not be written out

patch by Nachiket Patil; reviewed by Sylvain Lebresne for CASSANDRA-12030


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98f5f77b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98f5f77b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98f5f77b

Branch: refs/heads/cassandra-3.9
Commit: 98f5f77bb3c5d50e52cbb6f577a463ca8a5134ad
Parents: 3c1653f
Author: Nachiket Patil <na...@apple.com>
Authored: Wed Jul 6 11:22:56 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:35:10 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LazilyCompactedRow.java       |  3 +-
 .../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1dcbe1..7fa995d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.16
+ * Don't write shadowed range tombstone (CASSANDRA-12030)
  * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
  * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index f912da2..dab5eeb 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -286,7 +286,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp() ||
+                    maxRowTombstone.markedForDeleteAt >= t.timestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 3292422..dfd6960 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNames;
@@ -543,6 +544,45 @@ public class RangeTombstoneTest extends SchemaLoader
     }
 
     @Test
+    public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+    {
+        long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+        Keyspace table = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+        ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+        // remove any existing sstables before starting
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+        Mutation rm = new Mutation(KSNAME, key);
+        for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+            add(rm, i, testTimeStamp);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new Mutation(KSNAME, key);
+        ColumnFamily cf = rm.addOrGet(CFNAME);
+
+        // Write the covering row tombstone
+        cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+
+        // Create range tombstones covered by row tombstone above.
+        for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+            delete(cf, 0, 7, testTimeStamp);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // there should be 2 sstables
+        assertEquals(2, cfs.getSSTables().size());
+
+        // compact down to nothing
+        CompactionManager.instance.performMaximal(cfs);
+        assertEquals(0, cfs.getSSTables().size());
+    }
+
+    @Test
     public void testOverwritesToDeletedColumns() throws Exception
     {
         Keyspace table = Keyspace.open(KSNAME);


[09/16] cassandra git commit: Improve streaming synchronization and fault tolerance

Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance

Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1

Branch: refs/heads/cassandra-3.9
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../org/apache/cassandra/streaming/ConnectionHandler.java    | 8 +++-----
 .../org/apache/cassandra/streaming/StreamReceiveTask.java    | 2 --
 3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
  * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 Merged from 2.1:
  * Don't write shadowed range tombstone (CASSANDRA-12030)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
 
         protected void signalCloseDone()
         {
+            if (closeFuture == null)
+                close();
+
             closeFuture.get().set(null);
 
             // We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
                     }
                 }
             }
-            catch (SocketException e)
-            {
-                // socket is closed
-                close();
-            }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;


[11/16] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46

Branch: refs/heads/trunk
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
 -2.2.8
 +3.0.9
+  * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
   * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
  Merged from 2.1:
 - * Don't write shadowed range tombstone (CASSANDRA-12030)
 - * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
   * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 - * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
  
  
 -2.2.7
 +3.0.8
 + * Fix potential race in schema during new table creation (CASSANDRA-12083)
 + * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070)
 + * Disable autocompaction during drain (CASSANDRA-11878)
 + * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327)
 + * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023)
 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 +Merged from 2.2:
   * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
   * Validate bloom_filter_fp_chance against lowest supported
     value when the table is created (CASSANDRA-11920)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
   */
  package org.apache.cassandra.streaming;
  
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.view.View;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
 -
++import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends 
      // total size of files to receive
      private final long totalSize;
  
 +    // Transaction tracking new files received
-     public final LifecycleTransaction txn;
++    private final LifecycleTransaction txn;
 +
      // true if task is done (either completed or aborted)
--    private boolean done = false;
++    private volatile boolean done = false;
  
      //  holds references to SSTables received
 -    protected Collection<SSTableWriter> sstables;
 +    protected Collection<SSTableReader> sstables;
 +
 +    private int remoteSSTablesReceived = 0;
  
      public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
      {
@@@ -92,18 -74,16 +90,32 @@@
       *
       * @param sstable SSTable file received.
       */
 -    public synchronized void received(SSTableWriter sstable)
 +    public synchronized void received(SSTableMultiWriter sstable)
      {
          if (done)
++        {
++            logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
++                        sstable.getFilename());
++            Throwables.maybeFail(sstable.abort(null));
              return;
++        }
+ 
 -        assert cfId.equals(sstable.metadata.cfId);
 +        remoteSSTablesReceived++;
 +        assert cfId.equals(sstable.getCfId());
  
-         Collection<SSTableReader> finished = sstable.finish(true);
 -        sstables.add(sstable);
++        Collection<SSTableReader> finished = null;
++        try
++        {
++            finished = sstable.finish(true);
++        }
++        catch (Throwable t)
++        {
++            Throwables.maybeFail(sstable.abort(t));
++        }
 +        txn.update(finished, false);
 +        sstables.addAll(finished);
  
 -        if (sstables.size() == totalFiles)
 +        if (remoteSSTablesReceived == totalFiles)
          {
              done = true;
              executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
          return totalSize;
      }
  
++    public synchronized LifecycleTransaction getTransaction()
++    {
++        if (done)
++            throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
++        return txn;
++    }
++
      private static class OnCompletionRunnable implements Runnable
      {
          private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
                  if (kscf == null)
                  {
                      // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
                      task.sstables.clear();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                    task.session.taskCompleted(task);
                      return;
                  }
 -                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
  
 -                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -                lockfile.create(task.sstables);
 -                List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 -                task.sstables.clear();
 +                Collection<SSTableReader> readers = task.sstables;
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
-                         task.txn.finish();
++                        task.finishTransaction();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
                          }
                      }
                  }
@@@ -211,21 -171,10 +230,20 @@@
              }
              catch (Throwable t)
              {
--                logger.error("Error applying streamed data: ", t);
                  JVMStabilityInspector.inspectThrowable(t);
                  task.session.onError(t);
              }
 +            finally
 +            {
 +                //We don't keep the streamed sstables since we've applied them manually
 +                //So we abort the txn and delete the streamed sstables
 +                if (hasViews)
 +                {
 +                    if (cfs != null)
 +                        cfs.forceBlockingFlush();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                }
 +            }
          }
      }
  
@@@ -241,7 -190,8 +259,17 @@@
              return;
  
          done = true;
-         txn.abort();
 -        for (SSTableWriter writer : sstables)
 -            writer.abort();
++        abortTransaction();
          sstables.clear();
      }
++
++    private synchronized void abortTransaction()
++    {
++        txn.abort();
++    }
++
++    private synchronized void finishTransaction()
++    {
++        txn.finish();
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
      }
  
  
 +    public LifecycleTransaction getTransaction(UUID cfId)
 +    {
 +        assert receivers.containsKey(cfId);
-         return receivers.get(cfId).txn;
++        return receivers.get(cfId).getTransaction();
 +    }
 +
      /**
       * Bind this session to report to specific {@link StreamResultFuture} and
       * perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
       * @param flushTables flush tables?
       * @param repairedAt the time the repair started.
       */
--    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++    public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
      {
++        failIfFinished();
          Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
          if (flushTables)
              flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
          }
      }
  
++    private void failIfFinished()
++    {
++        if (state() == State.COMPLETE || state() == State.FAILED)
++            throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
++    }
++
      private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
      {
          Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
          }
      }
  
--    public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
++    public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
      {
++        failIfFinished();
          Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
          while (iter.hasNext())
          {
@@@ -745,8 -743,8 +753,9 @@@
          FBUtilities.waitOnFutures(flushes);
      }
  
--    private void prepareReceiving(StreamSummary summary)
++    private synchronized void prepareReceiving(StreamSummary summary)
      {
++        failIfFinished();
          if (summary.files > 0)
              receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
          List<Range<Token>> ranges = new ArrayList<>();
          // wrapped range
          ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
--        new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add ranges after stream session is finished
++        try
++        {
++            streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
      {
--        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add files after stream session is finished
++        try
++        {
++            streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      /**
@@@ -312,36 -325,27 +336,36 @@@
          String cfname = "StandardInteger1";
          Keyspace keyspace = Keyspace.open(ks);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 +        ClusteringComparator comparator = cfs.getComparator();
  
 -        String key = "key0";
 -        Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
 -        // add columns of size slightly less than column_index_size to force insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
 -        ColumnFamily cf = rm.addOrGet(cfname);
 -        // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        String key = "key1";
 +
 +
 +        RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
  
 -        key = "key1";
 -        rm = new Mutation(ks, ByteBufferUtil.bytes(key));
          // add columns of size slightly less than column_index_size to force insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        cf = rm.addOrGet(cfname);
 +        updates.clustering(1)
 +                .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
 +                .build()
 +                .apply();
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
 +        updates.clustering(6)
 +                .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
-                .build()
++                .build()
 +                .apply();
 +
          // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
 +        //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
 +        //        .build()
 +        //        .apply();
 +
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
 +        updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7)))
 +                .build()
 +                .apply();
  
          cfs.forceBlockingFlush();
  


[12/16] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46

Branch: refs/heads/cassandra-3.9
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
 -2.2.8
 +3.0.9
+  * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
   * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
  Merged from 2.1:
 - * Don't write shadowed range tombstone (CASSANDRA-12030)
 - * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
   * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 - * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
  
  
 -2.2.7
 +3.0.8
 + * Fix potential race in schema during new table creation (CASSANDRA-12083)
 + * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070)
 + * Disable autocompaction during drain (CASSANDRA-11878)
 + * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327)
 + * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023)
 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 +Merged from 2.2:
   * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
   * Validate bloom_filter_fp_chance against lowest supported
     value when the table is created (CASSANDRA-11920)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
   */
  package org.apache.cassandra.streaming;
  
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.view.View;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
 -
++import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends 
      // total size of files to receive
      private final long totalSize;
  
 +    // Transaction tracking new files received
-     public final LifecycleTransaction txn;
++    private final LifecycleTransaction txn;
 +
      // true if task is done (either completed or aborted)
--    private boolean done = false;
++    private volatile boolean done = false;
  
      //  holds references to SSTables received
 -    protected Collection<SSTableWriter> sstables;
 +    protected Collection<SSTableReader> sstables;
 +
 +    private int remoteSSTablesReceived = 0;
  
      public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
      {
@@@ -92,18 -74,16 +90,32 @@@
       *
       * @param sstable SSTable file received.
       */
 -    public synchronized void received(SSTableWriter sstable)
 +    public synchronized void received(SSTableMultiWriter sstable)
      {
          if (done)
++        {
++            logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
++                        sstable.getFilename());
++            Throwables.maybeFail(sstable.abort(null));
              return;
++        }
+ 
 -        assert cfId.equals(sstable.metadata.cfId);
 +        remoteSSTablesReceived++;
 +        assert cfId.equals(sstable.getCfId());
  
-         Collection<SSTableReader> finished = sstable.finish(true);
 -        sstables.add(sstable);
++        Collection<SSTableReader> finished = null;
++        try
++        {
++            finished = sstable.finish(true);
++        }
++        catch (Throwable t)
++        {
++            Throwables.maybeFail(sstable.abort(t));
++        }
 +        txn.update(finished, false);
 +        sstables.addAll(finished);
  
 -        if (sstables.size() == totalFiles)
 +        if (remoteSSTablesReceived == totalFiles)
          {
              done = true;
              executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
          return totalSize;
      }
  
++    public synchronized LifecycleTransaction getTransaction()
++    {
++        if (done)
++            throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
++        return txn;
++    }
++
      private static class OnCompletionRunnable implements Runnable
      {
          private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
                  if (kscf == null)
                  {
                      // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
                      task.sstables.clear();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                    task.session.taskCompleted(task);
                      return;
                  }
 -                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
  
 -                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -                lockfile.create(task.sstables);
 -                List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 -                task.sstables.clear();
 +                Collection<SSTableReader> readers = task.sstables;
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
-                         task.txn.finish();
++                        task.finishTransaction();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
                          }
                      }
                  }
@@@ -211,21 -171,10 +230,20 @@@
              }
              catch (Throwable t)
              {
--                logger.error("Error applying streamed data: ", t);
                  JVMStabilityInspector.inspectThrowable(t);
                  task.session.onError(t);
              }
 +            finally
 +            {
 +                //We don't keep the streamed sstables since we've applied them manually
 +                //So we abort the txn and delete the streamed sstables
 +                if (hasViews)
 +                {
 +                    if (cfs != null)
 +                        cfs.forceBlockingFlush();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                }
 +            }
          }
      }
  
@@@ -241,7 -190,8 +259,17 @@@
              return;
  
          done = true;
-         txn.abort();
 -        for (SSTableWriter writer : sstables)
 -            writer.abort();
++        abortTransaction();
          sstables.clear();
      }
++
++    private synchronized void abortTransaction()
++    {
++        txn.abort();
++    }
++
++    private synchronized void finishTransaction()
++    {
++        txn.finish();
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
      }
  
  
 +    public LifecycleTransaction getTransaction(UUID cfId)
 +    {
 +        assert receivers.containsKey(cfId);
-         return receivers.get(cfId).txn;
++        return receivers.get(cfId).getTransaction();
 +    }
 +
      /**
       * Bind this session to report to specific {@link StreamResultFuture} and
       * perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
       * @param flushTables flush tables?
       * @param repairedAt the time the repair started.
       */
--    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++    public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
      {
++        failIfFinished();
          Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
          if (flushTables)
              flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
          }
      }
  
++    private void failIfFinished()
++    {
++        if (state() == State.COMPLETE || state() == State.FAILED)
++            throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
++    }
++
      private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
      {
          Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
          }
      }
  
--    public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
++    public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
      {
++        failIfFinished();
          Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
          while (iter.hasNext())
          {
@@@ -745,8 -743,8 +753,9 @@@
          FBUtilities.waitOnFutures(flushes);
      }
  
--    private void prepareReceiving(StreamSummary summary)
++    private synchronized void prepareReceiving(StreamSummary summary)
      {
++        failIfFinished();
          if (summary.files > 0)
              receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
          List<Range<Token>> ranges = new ArrayList<>();
          // wrapped range
          ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
--        new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add ranges after stream session is finished
++        try
++        {
++            streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
      {
--        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add files after stream session is finished
++        try
++        {
++            streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      /**
@@@ -312,36 -325,27 +336,36 @@@
          String cfname = "StandardInteger1";
          Keyspace keyspace = Keyspace.open(ks);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 +        ClusteringComparator comparator = cfs.getComparator();
  
 -        String key = "key0";
 -        Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
 -        // add columns of size slightly less than column_index_size to force insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
 -        ColumnFamily cf = rm.addOrGet(cfname);
 -        // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        String key = "key1";
 +
 +
 +        RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
  
 -        key = "key1";
 -        rm = new Mutation(ks, ByteBufferUtil.bytes(key));
          // add columns of size slightly less than column_index_size to force insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        cf = rm.addOrGet(cfname);
 +        updates.clustering(1)
 +                .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
 +                .build()
 +                .apply();
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
 +        updates.clustering(6)
 +                .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
-                .build()
++                .build()
 +                .apply();
 +
          // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
 +        //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
 +        //        .build()
 +        //        .apply();
 +
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
 +        updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7)))
 +                .build()
 +                .apply();
  
          cfs.forceBlockingFlush();
  


[07/16] cassandra git commit: Improve streaming synchronization and fault tolerance

Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance

Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1

Branch: refs/heads/cassandra-3.0
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../org/apache/cassandra/streaming/ConnectionHandler.java    | 8 +++-----
 .../org/apache/cassandra/streaming/StreamReceiveTask.java    | 2 --
 3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
  * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 Merged from 2.1:
  * Don't write shadowed range tombstone (CASSANDRA-12030)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
 
         protected void signalCloseDone()
         {
+            if (closeFuture == null)
+                close();
+
             closeFuture.get().set(null);
 
             // We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
                     }
                 }
             }
-            catch (SocketException e)
-            {
-                // socket is closed
-                close();
-            }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;


[14/16] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.9

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59ee46e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59ee46e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59ee46e5

Branch: refs/heads/trunk
Commit: 59ee46e55a15775a49edde86de81b9b79875731d
Parents: 5ad1763 778f2a4
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:34:22 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:34:22 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2861cf7,8118de1..d459e34
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.9
 +3.9
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 +Merged from 3.0:
+  * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
   * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
   * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
   * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------


[15/16] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.9

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59ee46e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59ee46e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59ee46e5

Branch: refs/heads/cassandra-3.9
Commit: 59ee46e55a15775a49edde86de81b9b79875731d
Parents: 5ad1763 778f2a4
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:34:22 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:34:22 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2861cf7,8118de1..d459e34
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.9
 +3.9
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 +Merged from 3.0:
+  * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
   * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
   * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
   * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59ee46e5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------


[16/16] cassandra git commit: Merge branch 'cassandra-3.9' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.9' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fd60777
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fd60777
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fd60777

Branch: refs/heads/trunk
Commit: 9fd607778091c48910db557d7a95029cac077244
Parents: b4133f3 59ee46e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:34:30 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:34:30 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fd60777/CHANGES.txt
----------------------------------------------------------------------


[02/16] cassandra git commit: Range tombstones that are masked by row tombstones should not be written out

Posted by yu...@apache.org.
Range tombstones that are masked by row tombstones should not be written out

patch by Nachiket Patil; reviewed by Sylvain Lebresne for CASSANDRA-12030


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98f5f77b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98f5f77b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98f5f77b

Branch: refs/heads/trunk
Commit: 98f5f77bb3c5d50e52cbb6f577a463ca8a5134ad
Parents: 3c1653f
Author: Nachiket Patil <na...@apple.com>
Authored: Wed Jul 6 11:22:56 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:35:10 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LazilyCompactedRow.java       |  3 +-
 .../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1dcbe1..7fa995d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.16
+ * Don't write shadowed range tombstone (CASSANDRA-12030)
  * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
  * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index f912da2..dab5eeb 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -286,7 +286,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp() ||
+                    maxRowTombstone.markedForDeleteAt >= t.timestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98f5f77b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 3292422..dfd6960 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNames;
@@ -543,6 +544,45 @@ public class RangeTombstoneTest extends SchemaLoader
     }
 
     @Test
+    public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+    {
+        long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+        Keyspace table = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+        ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+        // remove any existing sstables before starting
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+        Mutation rm = new Mutation(KSNAME, key);
+        for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+            add(rm, i, testTimeStamp);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new Mutation(KSNAME, key);
+        ColumnFamily cf = rm.addOrGet(CFNAME);
+
+        // Write the covering row tombstone
+        cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+
+        // Create range tombstones covered by row tombstone above.
+        for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+            delete(cf, 0, 7, testTimeStamp);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // there should be 2 sstables
+        assertEquals(2, cfs.getSSTables().size());
+
+        // compact down to nothing
+        CompactionManager.instance.performMaximal(cfs);
+        assertEquals(0, cfs.getSSTables().size());
+    }
+
+    @Test
     public void testOverwritesToDeletedColumns() throws Exception
     {
         Keyspace table = Keyspace.open(KSNAME);


[10/16] cassandra git commit: Improve streaming synchronization and fault tolerance

Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance

Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1

Branch: refs/heads/cassandra-2.2
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../org/apache/cassandra/streaming/ConnectionHandler.java    | 8 +++-----
 .../org/apache/cassandra/streaming/StreamReceiveTask.java    | 2 --
 3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
  * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 Merged from 2.1:
  * Don't write shadowed range tombstone (CASSANDRA-12030)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
 
         protected void signalCloseDone()
         {
+            if (closeFuture == null)
+                close();
+
             closeFuture.get().set(null);
 
             // We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
                     }
                 }
             }
-            catch (SocketException e)
-            {
-                // socket is closed
-                close();
-            }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;


[05/16] cassandra git commit: Merge commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0

Posted by yu...@apache.org.
Merge commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0

* commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b':
  Range tombstones that are masked by row tombstones should not be written out


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed3b42d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed3b42d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed3b42d

Branch: refs/heads/trunk
Commit: 9ed3b42d3b50237f99485233857a7b34d5238d9a
Parents: dd05e46 43c741e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:52 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:52 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[06/16] cassandra git commit: Merge commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0

Posted by yu...@apache.org.
Merge commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b' into cassandra-3.0

* commit '43c741e251102bf5651ff8aa1b5ca078eb0ddc0b':
  Range tombstones that are masked by row tombstones should not be written out


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed3b42d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed3b42d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed3b42d

Branch: refs/heads/cassandra-3.9
Commit: 9ed3b42d3b50237f99485233857a7b34d5238d9a
Parents: dd05e46 43c741e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:52 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:52 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[13/16] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46

Branch: refs/heads/cassandra-3.0
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
 -2.2.8
 +3.0.9
+  * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
   * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
  Merged from 2.1:
 - * Don't write shadowed range tombstone (CASSANDRA-12030)
 - * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
   * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 - * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
  
  
 -2.2.7
 +3.0.8
 + * Fix potential race in schema during new table creation (CASSANDRA-12083)
 + * cqlsh: fix error handling in rare COPY FROM failure scenario (CASSANDRA-12070)
 + * Disable autocompaction during drain (CASSANDRA-11878)
 + * Add a metrics timer to MemtablePool and use it to track time spent blocked on memory in MemtableAllocator (CASSANDRA-11327)
 + * Fix upgrading schema with super columns with non-text subcomparators (CASSANDRA-12023)
 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 +Merged from 2.2:
   * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
   * Validate bloom_filter_fp_chance against lowest supported
     value when the table is created (CASSANDRA-11920)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
   */
  package org.apache.cassandra.streaming;
  
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.view.View;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
 -
++import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends 
      // total size of files to receive
      private final long totalSize;
  
 +    // Transaction tracking new files received
-     public final LifecycleTransaction txn;
++    private final LifecycleTransaction txn;
 +
      // true if task is done (either completed or aborted)
--    private boolean done = false;
++    private volatile boolean done = false;
  
      //  holds references to SSTables received
 -    protected Collection<SSTableWriter> sstables;
 +    protected Collection<SSTableReader> sstables;
 +
 +    private int remoteSSTablesReceived = 0;
  
      public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
      {
@@@ -92,18 -74,16 +90,32 @@@
       *
       * @param sstable SSTable file received.
       */
 -    public synchronized void received(SSTableWriter sstable)
 +    public synchronized void received(SSTableMultiWriter sstable)
      {
          if (done)
++        {
++            logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
++                        sstable.getFilename());
++            Throwables.maybeFail(sstable.abort(null));
              return;
++        }
+ 
 -        assert cfId.equals(sstable.metadata.cfId);
 +        remoteSSTablesReceived++;
 +        assert cfId.equals(sstable.getCfId());
  
-         Collection<SSTableReader> finished = sstable.finish(true);
 -        sstables.add(sstable);
++        Collection<SSTableReader> finished = null;
++        try
++        {
++            finished = sstable.finish(true);
++        }
++        catch (Throwable t)
++        {
++            Throwables.maybeFail(sstable.abort(t));
++        }
 +        txn.update(finished, false);
 +        sstables.addAll(finished);
  
 -        if (sstables.size() == totalFiles)
 +        if (remoteSSTablesReceived == totalFiles)
          {
              done = true;
              executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
          return totalSize;
      }
  
++    public synchronized LifecycleTransaction getTransaction()
++    {
++        if (done)
++            throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId));
++        return txn;
++    }
++
      private static class OnCompletionRunnable implements Runnable
      {
          private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
                  if (kscf == null)
                  {
                      // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
                      task.sstables.clear();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                    task.session.taskCompleted(task);
                      return;
                  }
 -                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
  
 -                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -                lockfile.create(task.sstables);
 -                List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 -                task.sstables.clear();
 +                Collection<SSTableReader> readers = task.sstables;
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
-                         task.txn.finish();
++                        task.finishTransaction();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
                          }
                      }
                  }
@@@ -211,21 -171,10 +230,20 @@@
              }
              catch (Throwable t)
              {
--                logger.error("Error applying streamed data: ", t);
                  JVMStabilityInspector.inspectThrowable(t);
                  task.session.onError(t);
              }
 +            finally
 +            {
 +                //We don't keep the streamed sstables since we've applied them manually
 +                //So we abort the txn and delete the streamed sstables
 +                if (hasViews)
 +                {
 +                    if (cfs != null)
 +                        cfs.forceBlockingFlush();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                }
 +            }
          }
      }
  
@@@ -241,7 -190,8 +259,17 @@@
              return;
  
          done = true;
-         txn.abort();
 -        for (SSTableWriter writer : sstables)
 -            writer.abort();
++        abortTransaction();
          sstables.clear();
      }
++
++    private synchronized void abortTransaction()
++    {
++        txn.abort();
++    }
++
++    private synchronized void finishTransaction()
++    {
++        txn.finish();
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
      }
  
  
 +    public LifecycleTransaction getTransaction(UUID cfId)
 +    {
 +        assert receivers.containsKey(cfId);
-         return receivers.get(cfId).txn;
++        return receivers.get(cfId).getTransaction();
 +    }
 +
      /**
       * Bind this session to report to specific {@link StreamResultFuture} and
       * perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
       * @param flushTables flush tables?
       * @param repairedAt the time the repair started.
       */
--    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++    public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
      {
++        failIfFinished();
          Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
          if (flushTables)
              flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
          }
      }
  
++    private void failIfFinished()
++    {
++        if (state() == State.COMPLETE || state() == State.FAILED)
++            throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
++    }
++
      private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
      {
          Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
          }
      }
  
--    public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
++    public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
      {
++        failIfFinished();
          Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
          while (iter.hasNext())
          {
@@@ -745,8 -743,8 +753,9 @@@
          FBUtilities.waitOnFutures(flushes);
      }
  
--    private void prepareReceiving(StreamSummary summary)
++    private synchronized void prepareReceiving(StreamSummary summary)
      {
++        failIfFinished();
          if (summary.files > 0)
              receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
          List<Range<Token>> ranges = new ArrayList<>();
          // wrapped range
          ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
--        new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add ranges after stream session is finished
++        try
++        {
++            streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
      {
--        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add files after stream session is finished
++        try
++        {
++            streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      /**
@@@ -312,36 -325,27 +336,36 @@@
          String cfname = "StandardInteger1";
          Keyspace keyspace = Keyspace.open(ks);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 +        ClusteringComparator comparator = cfs.getComparator();
  
 -        String key = "key0";
 -        Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
 -        // add columns of size slightly less than column_index_size to force insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        rm.add(cfname, cellname(6), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
 -        ColumnFamily cf = rm.addOrGet(cfname);
 -        // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        String key = "key1";
 +
 +
 +        RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
  
 -        key = "key1";
 -        rm = new Mutation(ks, ByteBufferUtil.bytes(key));
          // add columns of size slightly less than column_index_size to force insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        cf = rm.addOrGet(cfname);
 +        updates.clustering(1)
 +                .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
 +                .build()
 +                .apply();
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
 +        updates.clustering(6)
 +                .add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
-                .build()
++                .build()
 +                .apply();
 +
          // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        //updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
 +        //updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
 +        //        .build()
 +        //        .apply();
 +
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
 +        updates.addRangeTombstone(Slice.make(comparator.make(5), comparator.make(7)))
 +                .build()
 +                .apply();
  
          cfs.forceBlockingFlush();
  


[08/16] cassandra git commit: Improve streaming synchronization and fault tolerance

Posted by yu...@apache.org.
Improve streaming synchronization and fault tolerance

Patch by Paulo Motta; Reviewed by yukim for CASSANDRA-11414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00e7ecf1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00e7ecf1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00e7ecf1

Branch: refs/heads/trunk
Commit: 00e7ecf1394f8704e2f13369f7950e129459ce2c
Parents: 43c741e
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jul 6 12:16:16 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:32:39 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../org/apache/cassandra/streaming/ConnectionHandler.java    | 8 +++-----
 .../org/apache/cassandra/streaming/StreamReceiveTask.java    | 2 --
 3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bfd8aa2..7d62f97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
  * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 Merged from 2.1:
  * Don't write shadowed range tombstone (CASSANDRA-12030)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index c497a39..364435e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -233,6 +233,9 @@ public class ConnectionHandler
 
         protected void signalCloseDone()
         {
+            if (closeFuture == null)
+                close();
+
             closeFuture.get().set(null);
 
             // We can now close the socket
@@ -294,11 +297,6 @@ public class ConnectionHandler
                     }
                 }
             }
-            catch (SocketException e)
-            {
-                // socket is closed
-                close();
-            }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00e7ecf1/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6911ec6..b342edc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;


[03/16] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

* cassandra-2.1:
  Range tombstones that are masked by row tombstones should not be written out


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/43c741e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/43c741e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/43c741e2

Branch: refs/heads/trunk
Commit: 43c741e251102bf5651ff8aa1b5ca078eb0ddc0b
Parents: d5a15e4 98f5f77
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:13 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:13 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LazilyCompactedRow.java       |  3 +-
 .../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 65c7c1f,7fa995d..bfd8aa2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,11 +1,44 @@@
 -2.1.16
 +2.2.8
 + * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 +Merged from 2.1:
+  * Don't write shadowed range tombstone (CASSANDRA-12030)
 - * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
   * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
 -
 -2.1.15
 + * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
   * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
 +
 +
 +2.2.7
 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
 + * Validate bloom_filter_fp_chance against lowest supported
 +   value when the table is created (CASSANDRA-11920)
 + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
 + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
   * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
   * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
   * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 9ce1236,dfd6960..bff0ddf
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@@ -37,10 -33,13 +37,11 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.*;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.IndexType;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
  import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
  import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
  import org.apache.cassandra.db.composites.CellName;
  import org.apache.cassandra.db.composites.CellNames;
@@@ -559,6 -543,46 +560,45 @@@ public class RangeTombstoneTes
      }
  
      @Test
+     public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+     {
+         long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+         Keyspace table = Keyspace.open(KSNAME);
+         ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+         ByteBuffer key = ByteBufferUtil.bytes("k4");
+ 
+         // remove any existing sstables before starting
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+         cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+ 
+         Mutation rm = new Mutation(KSNAME, key);
+         for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+             add(rm, i, testTimeStamp);
+         rm.apply();
+         cfs.forceBlockingFlush();
+ 
+         rm = new Mutation(KSNAME, key);
+         ColumnFamily cf = rm.addOrGet(CFNAME);
+ 
+         // Write the covering row tombstone
+         cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+ 
+         // Create range tombstones covered by row tombstone above.
+         for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+             delete(cf, 0, 7, testTimeStamp);
+         rm.apply();
+         cfs.forceBlockingFlush();
+ 
+         // there should be 2 sstables
+         assertEquals(2, cfs.getSSTables().size());
+ 
+         // compact down to nothing
 -        CompactionManager.instance.performMaximal(cfs);
++        CompactionManager.instance.performMaximal(cfs, false);
+         assertEquals(0, cfs.getSSTables().size());
+     }
+ 
+     @Test
      public void testOverwritesToDeletedColumns() throws Exception
      {
          Keyspace table = Keyspace.open(KSNAME);


[04/16] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

* cassandra-2.1:
  Range tombstones that are masked by row tombstones should not be written out


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/43c741e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/43c741e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/43c741e2

Branch: refs/heads/cassandra-3.9
Commit: 43c741e251102bf5651ff8aa1b5ca078eb0ddc0b
Parents: d5a15e4 98f5f77
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jul 6 14:39:13 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jul 6 14:39:13 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LazilyCompactedRow.java       |  3 +-
 .../apache/cassandra/db/RangeTombstoneTest.java | 40 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 65c7c1f,7fa995d..bfd8aa2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,11 +1,44 @@@
 -2.1.16
 +2.2.8
 + * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 +Merged from 2.1:
+  * Don't write shadowed range tombstone (CASSANDRA-12030)
 - * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
   * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349)
 -
 -2.1.15
 + * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
   * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
 +
 +
 +2.2.7
 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
 + * Validate bloom_filter_fp_chance against lowest supported
 +   value when the table is created (CASSANDRA-11920)
 + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
 + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
   * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
   * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
   * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43c741e2/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 9ce1236,dfd6960..bff0ddf
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@@ -37,10 -33,13 +37,11 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.*;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.IndexType;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
  import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
  import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
  import org.apache.cassandra.db.composites.CellName;
  import org.apache.cassandra.db.composites.CellNames;
@@@ -559,6 -543,46 +560,45 @@@ public class RangeTombstoneTes
      }
  
      @Test
+     public void testCompactionOfRangeTombstonesCoveredByRowTombstone() throws Exception
+     {
+         long testTimeStamp = 1451606400L; // 01/01/2016 : 00:00:00 GMT
+         Keyspace table = Keyspace.open(KSNAME);
+         ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+         ByteBuffer key = ByteBufferUtil.bytes("k4");
+ 
+         // remove any existing sstables before starting
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+         cfs.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+ 
+         Mutation rm = new Mutation(KSNAME, key);
+         for (int i = 1; i < 11; i += 2, testTimeStamp += i * 10)
+             add(rm, i, testTimeStamp);
+         rm.apply();
+         cfs.forceBlockingFlush();
+ 
+         rm = new Mutation(KSNAME, key);
+         ColumnFamily cf = rm.addOrGet(CFNAME);
+ 
+         // Write the covering row tombstone
+         cf.delete(new DeletionTime(++testTimeStamp, (int) testTimeStamp));
+ 
+         // Create range tombstones covered by row tombstone above.
+         for (int i = 1; i < 11; i += 2, testTimeStamp -= i * 5)
+             delete(cf, 0, 7, testTimeStamp);
+         rm.apply();
+         cfs.forceBlockingFlush();
+ 
+         // there should be 2 sstables
+         assertEquals(2, cfs.getSSTables().size());
+ 
+         // compact down to nothing
 -        CompactionManager.instance.performMaximal(cfs);
++        CompactionManager.instance.performMaximal(cfs, false);
+         assertEquals(0, cfs.getSSTables().size());
+     }
+ 
+     @Test
      public void testOverwritesToDeletedColumns() throws Exception
      {
          Keyspace table = Keyspace.open(KSNAME);