You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/04 14:18:40 UTC

[01/10] cassandra git commit: Release sstables of failed stream sessions only when outgoing transfers are finished

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 f28631e0c -> 039852126
  refs/heads/cassandra-3.0 52be7bac4 -> cc8f6cc51
  refs/heads/cassandra-3.9 042e1f76a -> 2e0ace7bc
  refs/heads/trunk 54836ec0c -> 90ba50f6a


Release sstables of failed stream sessions only when outgoing transfers are finished

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212

Branch: refs/heads/cassandra-2.2
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
  * Revert CASSANDRA-11427 (CASSANDRA-12351)
  * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
     private boolean aborted = false;
 
-    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    @VisibleForTesting
+    protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
     public final long repairedAt;
     public final int sstableLevel;
 
+    /* cached size value */
+    private transient final long size;
+
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
         this.compressionMetadata = null;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
         this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
      */
     public long size()
     {
-        long size = 0;
+        return size;
+    }
+
+    private long calculateSize()
+    {
+        long transferSize = 0;
         if (compressionInfo != null)
         {
             // calculate total length of transferring chunks
             for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                size += chunk.length + 4; // 4 bytes for CRC
+                transferSize += chunk.length + 4; // 4 bytes for CRC
         }
         else if (compressionMetadata != null)
         {
-            size = compressionMetadata.getTotalSizeForSections(sections);
+            transferSize = compressionMetadata.getTotalSizeForSections(sections);
         }
         else
         {
             for (Pair<Long, Long> section : sections)
-                size += section.right - section.left;
+                transferSize += section.right - section.left;
         }
-        return size;
+        return transferSize;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
 
         public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            message.serialize(out, version, session);
-            session.fileSent(message.header);
+            message.startTransfer();
+            try
+            {
+                message.serialize(out, version, session);
+                session.fileSent(message.header);
+            }
+            finally
+            {
+                message.finishTransfer();
+            }
         }
     };
 
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
     private final Ref<SSTableReader> ref;
     private final String filename;
     private boolean completed = false;
+    private boolean transferring = false;
 
     public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
     {
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
         writer.write(out);
     }
 
+    @VisibleForTesting
+    public synchronized void finishTransfer()
+    {
+        transferring = false;
+        //session was aborted mid-transfer, now it's safe to release
+        if (completed)
+        {
+            ref.release();
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void startTransfer()
+    {
+        transferring = true;
+    }
+
     public synchronized void complete()
     {
         if (!completed)
         {
             completed = true;
-            ref.release();
+            //release only if not transferring
+            if (!transferring)
+            {
+                ref.release();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
     }
 
+    @After
+    public void tearDown()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+        cfs.clearUnsafe();
+    }
+
     @Test
     public void testScheduleTimeout() throws Exception
     {
-        String ks = KEYSPACE1;
-        String cf = "Standard1";
-
         InetAddress peer = FBUtilities.getBroadcastAddress();
         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
-        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
         for (int i = 0; i < 2; i++)
         {
-            SchemaLoader.insertData(ks, cf, i, 1);
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
             cfs.forceBlockingFlush();
         }
 
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
         // when all streaming are done, time out task should not be scheduled.
         assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+    {
+        InetAddress peer = FBUtilities.getBroadcastAddress();
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+        session.init(future);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+        // create two sstables
+        for (int i = 0; i < 2; i++)
+        {
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+            cfs.forceBlockingFlush();
+        }
+
+        // create streaming task that streams those two sstables
+        StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            List<Range<Token>> ranges = new ArrayList<>();
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+            Ref<SSTableReader> ref = sstable.selfRef();
+            refs.add(ref);
+            task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+        }
+        assertEquals(2, task.getTotalNumberOfFiles());
+
+        //add task to stream session, so it is aborted when stream session fails
+        session.transfers.put(UUID.randomUUID(), task);
+
+        //make a copy of outgoing file messages, since task is cleared when it's aborted
+        Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+        //simulate start transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.startTransfer();
+        }
+
+        //fail stream session mid-transfer
+        session.onError(new Exception("Fake exception"));
+
+        //make sure reference was not released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(1, ref.globalCount());
+        }
+
+        //simulate finish transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.finishTransfer();
+        }
+
+        //now reference should be released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(0, ref.globalCount());
+        }
+    }
 }


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5

Branch: refs/heads/trunk
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
 -2.2.8
 +3.0.9
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
 - * Revert CASSANDRA-11427 (CASSANDRA-12351)
   * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
   * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
   * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
      private final CompressionMetadata compressionMetadata;
      public final long repairedAt;
      public final int sstableLevel;
 +    public final SerializationHeader.Component header;
  
+     /* cached size value */
+     private transient final long size;
+ 
      public FileMessageHeader(UUID cfId,
                               int sequenceNumber,
 -                             String version,
 +                             Version version,
                               SSTableFormat.Type format,
                               long estimatedKeys,
                               List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
          this.compressionMetadata = null;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
          this.compressionMetadata = compressionMetadata;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public boolean isCompressed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
  import java.util.concurrent.Future;
  import java.util.concurrent.TimeUnit;
  
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.junit.BeforeClass;
+ import org.junit.After;
  import org.junit.Test;
  
  import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
+         StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+         session.init(future);
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
 -        for (SSTableReader sstable : cfs.getSSTables())
++        for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when it's aborted
+         Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5

Branch: refs/heads/cassandra-3.0
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
 -2.2.8
 +3.0.9
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
 - * Revert CASSANDRA-11427 (CASSANDRA-12351)
   * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
   * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
   * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
      private final CompressionMetadata compressionMetadata;
      public final long repairedAt;
      public final int sstableLevel;
 +    public final SerializationHeader.Component header;
  
+     /* cached size value */
+     private transient final long size;
+ 
      public FileMessageHeader(UUID cfId,
                               int sequenceNumber,
 -                             String version,
 +                             Version version,
                               SSTableFormat.Type format,
                               long estimatedKeys,
                               List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
          this.compressionMetadata = null;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
          this.compressionMetadata = compressionMetadata;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public boolean isCompressed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
  import java.util.concurrent.Future;
  import java.util.concurrent.TimeUnit;
  
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.junit.BeforeClass;
+ import org.junit.After;
  import org.junit.Test;
  
  import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
+         StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+         session.init(future);
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
 -        for (SSTableReader sstable : cfs.getSSTables())
++        for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when it's aborted
+         Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }


[10/10] cassandra git commit: Merge branch 'cassandra-3.9' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.9' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90ba50f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90ba50f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90ba50f6

Branch: refs/heads/trunk
Commit: 90ba50f6a5592508116800f018bae3c14febe5d3
Parents: 54836ec 2e0ace7
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 4 09:18:18 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 4 09:18:18 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/90ba50f6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90ba50f6/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90ba50f6/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index dc2eb8f,71f40ee..bfee782
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -21,6 -21,9 +21,8 @@@ import java.io.IOException
  import java.nio.channels.ReadableByteChannel;
  import java.util.List;
  
+ import com.google.common.annotations.VisibleForTesting;
+ 
 -import org.apache.cassandra.io.compress.CompressionMetadata;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.DataOutputStreamPlus;
  import org.apache.cassandra.streaming.StreamSession;


[04/10] cassandra git commit: Release sstables of failed stream sessions only when outgoing transfers are finished

Posted by yu...@apache.org.
Release sstables of failed stream sessions only when outgoing transfers are finished

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212

Branch: refs/heads/trunk
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
  * Revert CASSANDRA-11427 (CASSANDRA-12351)
  * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
     private boolean aborted = false;
 
-    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    @VisibleForTesting
+    protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
     public final long repairedAt;
     public final int sstableLevel;
 
+    /* cached size value */
+    private transient final long size;
+
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
         this.compressionMetadata = null;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
         this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
      */
     public long size()
     {
-        long size = 0;
+        return size;
+    }
+
+    private long calculateSize()
+    {
+        long transferSize = 0;
         if (compressionInfo != null)
         {
             // calculate total length of transferring chunks
             for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                size += chunk.length + 4; // 4 bytes for CRC
+                transferSize += chunk.length + 4; // 4 bytes for CRC
         }
         else if (compressionMetadata != null)
         {
-            size = compressionMetadata.getTotalSizeForSections(sections);
+            transferSize = compressionMetadata.getTotalSizeForSections(sections);
         }
         else
         {
             for (Pair<Long, Long> section : sections)
-                size += section.right - section.left;
+                transferSize += section.right - section.left;
         }
-        return size;
+        return transferSize;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
 
         public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            message.serialize(out, version, session);
-            session.fileSent(message.header);
+            message.startTransfer();
+            try
+            {
+                message.serialize(out, version, session);
+                session.fileSent(message.header);
+            }
+            finally
+            {
+                message.finishTransfer();
+            }
         }
     };
 
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
     private final Ref<SSTableReader> ref;
     private final String filename;
     private boolean completed = false;
+    private boolean transferring = false;
 
     public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
     {
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
         writer.write(out);
     }
 
+    @VisibleForTesting
+    public synchronized void finishTransfer()
+    {
+        transferring = false;
+        //session was aborted mid-transfer, now it's safe to release
+        if (completed)
+        {
+            ref.release();
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void startTransfer()
+    {
+        transferring = true;
+    }
+
     public synchronized void complete()
     {
         if (!completed)
         {
             completed = true;
-            ref.release();
+            //release only if not transferring
+            if (!transferring)
+            {
+                ref.release();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
     }
 
+    @After
+    public void tearDown()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+        cfs.clearUnsafe();
+    }
+
     @Test
     public void testScheduleTimeout() throws Exception
     {
-        String ks = KEYSPACE1;
-        String cf = "Standard1";
-
         InetAddress peer = FBUtilities.getBroadcastAddress();
         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
-        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
         for (int i = 0; i < 2; i++)
         {
-            SchemaLoader.insertData(ks, cf, i, 1);
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
             cfs.forceBlockingFlush();
         }
 
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
         // when all streaming are done, time out task should not be scheduled.
         assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+    {
+        InetAddress peer = FBUtilities.getBroadcastAddress();
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+        session.init(future);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+        // create two sstables
+        for (int i = 0; i < 2; i++)
+        {
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+            cfs.forceBlockingFlush();
+        }
+
+        // create streaming task that streams those two sstables
+        StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            List<Range<Token>> ranges = new ArrayList<>();
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+            Ref<SSTableReader> ref = sstable.selfRef();
+            refs.add(ref);
+            task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+        }
+        assertEquals(2, task.getTotalNumberOfFiles());
+
+        //add task to stream session, so it is aborted when stream session fails
+        session.transfers.put(UUID.randomUUID(), task);
+
+        //make a copy of outgoing file messages, since task is cleared when it's aborted
+        Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+        //simulate start transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.startTransfer();
+        }
+
+        //fail stream session mid-transfer
+        session.onError(new Exception("Fake exception"));
+
+        //make sure reference was not released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(1, ref.globalCount());
+        }
+
+        //simulate finish transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.finishTransfer();
+        }
+
+        //now reference should be released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(0, ref.globalCount());
+        }
+    }
 }


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.9

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e0ace7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e0ace7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e0ace7b

Branch: refs/heads/trunk
Commit: 2e0ace7bca45a33e5b220660bebc6afbdbbd8e5c
Parents: 042e1f7 cc8f6cc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:39:30 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:39:30 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 31d9434,49733d3..dd04ddf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -18,45 -13,6 +18,46 @@@ Merged from 3.0
     to connect with too low of a protocol version (CASSANDRA-11464)
   * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
   * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 +Merged from 2.2:
++ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
 + * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
 + * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
 + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 +Merged from 2.1:
 + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
 + * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
 +
 +
 +3.8
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
   * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
   * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
   * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index e8d0cae,c1c5055..4f313c3
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -22,7 -22,9 +22,8 @@@ import java.util.concurrent.*
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.atomic.AtomicInteger;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
 -import com.google.common.collect.Iterables;
  
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.io.sstable.format.SSTableReader;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,dce56eb..fe75da1
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -103,4 -114,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
 -        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
++        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false);
+         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+         session.init(future);
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+         for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when it's aborted
+         Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }


[02/10] cassandra git commit: Release sstables of failed stream sessions only when outgoing transfers are finished

Posted by yu...@apache.org.
Release sstables of failed stream sessions only when outgoing transfers are finished

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212

Branch: refs/heads/cassandra-3.0
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
  * Revert CASSANDRA-11427 (CASSANDRA-12351)
  * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
     private boolean aborted = false;
 
-    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    @VisibleForTesting
+    protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
     public final long repairedAt;
     public final int sstableLevel;
 
+    /* cached size value */
+    private transient final long size;
+
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
         this.compressionMetadata = null;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
         this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
      */
     public long size()
     {
-        long size = 0;
+        return size;
+    }
+
+    private long calculateSize()
+    {
+        long transferSize = 0;
         if (compressionInfo != null)
         {
             // calculate total length of transferring chunks
             for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                size += chunk.length + 4; // 4 bytes for CRC
+                transferSize += chunk.length + 4; // 4 bytes for CRC
         }
         else if (compressionMetadata != null)
         {
-            size = compressionMetadata.getTotalSizeForSections(sections);
+            transferSize = compressionMetadata.getTotalSizeForSections(sections);
         }
         else
         {
             for (Pair<Long, Long> section : sections)
-                size += section.right - section.left;
+                transferSize += section.right - section.left;
         }
-        return size;
+        return transferSize;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
 
         public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            message.serialize(out, version, session);
-            session.fileSent(message.header);
+            message.startTransfer();
+            try
+            {
+                message.serialize(out, version, session);
+                session.fileSent(message.header);
+            }
+            finally
+            {
+                message.finishTransfer();
+            }
         }
     };
 
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
     private final Ref<SSTableReader> ref;
     private final String filename;
     private boolean completed = false;
+    private boolean transferring = false;
 
     public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
     {
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
         writer.write(out);
     }
 
+    @VisibleForTesting
+    public synchronized void finishTransfer()
+    {
+        transferring = false;
+        //session was aborted mid-transfer, now it's safe to release
+        if (completed)
+        {
+            ref.release();
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void startTransfer()
+    {
+        transferring = true;
+    }
+
     public synchronized void complete()
     {
         if (!completed)
         {
             completed = true;
-            ref.release();
+            //release only if not transferring
+            if (!transferring)
+            {
+                ref.release();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
     }
 
+    @After
+    public void tearDown()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+        cfs.clearUnsafe();
+    }
+
     @Test
     public void testScheduleTimeout() throws Exception
     {
-        String ks = KEYSPACE1;
-        String cf = "Standard1";
-
         InetAddress peer = FBUtilities.getBroadcastAddress();
         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
-        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
         for (int i = 0; i < 2; i++)
         {
-            SchemaLoader.insertData(ks, cf, i, 1);
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
             cfs.forceBlockingFlush();
         }
 
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
         // when all streaming are done, time out task should not be scheduled.
         assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+    {
+        InetAddress peer = FBUtilities.getBroadcastAddress();
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+        session.init(future);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+        // create two sstables
+        for (int i = 0; i < 2; i++)
+        {
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+            cfs.forceBlockingFlush();
+        }
+
+        // create streaming task that streams those two sstables
+        StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            List<Range<Token>> ranges = new ArrayList<>();
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+            Ref<SSTableReader> ref = sstable.selfRef();
+            refs.add(ref);
+            task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+        }
+        assertEquals(2, task.getTotalNumberOfFiles());
+
+        //add task to stream session, so it is aborted when stream session fails
+        session.transfers.put(UUID.randomUUID(), task);
+
+        //make a copy of outgoing file messages, since task is cleared when it's aborted
+        Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+        //simulate start transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.startTransfer();
+        }
+
+        //fail stream session mid-transfer
+        session.onError(new Exception("Fake exception"));
+
+        //make sure reference was not released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(1, ref.globalCount());
+        }
+
+        //simulate finish transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.finishTransfer();
+        }
+
+        //now reference should be released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(0, ref.globalCount());
+        }
+    }
 }


[03/10] cassandra git commit: Release sstables of failed stream sessions only when outgoing transfers are finished

Posted by yu...@apache.org.
Release sstables of failed stream sessions only when outgoing transfers are finished

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212

Branch: refs/heads/cassandra-3.9
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
  * Revert CASSANDRA-11427 (CASSANDRA-12351)
  * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
     private boolean aborted = false;
 
-    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    @VisibleForTesting
+    protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
     public final long repairedAt;
     public final int sstableLevel;
 
+    /* cached size value */
+    private transient final long size;
+
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
         this.compressionMetadata = null;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
         this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
      */
     public long size()
     {
-        long size = 0;
+        return size;
+    }
+
+    private long calculateSize()
+    {
+        long transferSize = 0;
         if (compressionInfo != null)
         {
             // calculate total length of transferring chunks
             for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                size += chunk.length + 4; // 4 bytes for CRC
+                transferSize += chunk.length + 4; // 4 bytes for CRC
         }
         else if (compressionMetadata != null)
         {
-            size = compressionMetadata.getTotalSizeForSections(sections);
+            transferSize = compressionMetadata.getTotalSizeForSections(sections);
         }
         else
         {
             for (Pair<Long, Long> section : sections)
-                size += section.right - section.left;
+                transferSize += section.right - section.left;
         }
-        return size;
+        return transferSize;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
 
         public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            message.serialize(out, version, session);
-            session.fileSent(message.header);
+            message.startTransfer();
+            try
+            {
+                message.serialize(out, version, session);
+                session.fileSent(message.header);
+            }
+            finally
+            {
+                message.finishTransfer();
+            }
         }
     };
 
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
     private final Ref<SSTableReader> ref;
     private final String filename;
     private boolean completed = false;
+    private boolean transferring = false;
 
     public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
     {
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
         writer.write(out);
     }
 
+    @VisibleForTesting
+    public synchronized void finishTransfer()
+    {
+        transferring = false;
+        //session was aborted mid-transfer, now it's safe to release
+        if (completed)
+        {
+            ref.release();
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void startTransfer()
+    {
+        transferring = true;
+    }
+
     public synchronized void complete()
     {
         if (!completed)
         {
             completed = true;
-            ref.release();
+            //release only if not transferring
+            if (!transferring)
+            {
+                ref.release();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
     }
 
+    @After
+    public void tearDown()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+        cfs.clearUnsafe();
+    }
+
     @Test
     public void testScheduleTimeout() throws Exception
     {
-        String ks = KEYSPACE1;
-        String cf = "Standard1";
-
         InetAddress peer = FBUtilities.getBroadcastAddress();
         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
-        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
         for (int i = 0; i < 2; i++)
         {
-            SchemaLoader.insertData(ks, cf, i, 1);
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
             cfs.forceBlockingFlush();
         }
 
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
         // when all streaming are done, time out task should not be scheduled.
         assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+    {
+        InetAddress peer = FBUtilities.getBroadcastAddress();
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+        session.init(future);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+        // create two sstables
+        for (int i = 0; i < 2; i++)
+        {
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+            cfs.forceBlockingFlush();
+        }
+
+        // create streaming task that streams those two sstables
+        StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            List<Range<Token>> ranges = new ArrayList<>();
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+            Ref<SSTableReader> ref = sstable.selfRef();
+            refs.add(ref);
+            task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+        }
+        assertEquals(2, task.getTotalNumberOfFiles());
+
+        //add task to stream session, so it is aborted when stream session fails
+        session.transfers.put(UUID.randomUUID(), task);
+
+        //make a copy of outgoing file messages, since task is cleared when it's aborted
+        Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+        //simulate start transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.startTransfer();
+        }
+
+        //fail stream session mid-transfer
+        session.onError(new Exception("Fake exception"));
+
+        //make sure reference was not released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(1, ref.globalCount());
+        }
+
+        //simulate finish transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.finishTransfer();
+        }
+
+        //now reference should be released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(0, ref.globalCount());
+        }
+    }
 }


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5

Branch: refs/heads/cassandra-3.9
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
 -2.2.8
 +3.0.9
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
 - * Revert CASSANDRA-11427 (CASSANDRA-12351)
   * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
   * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
   * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
      private final CompressionMetadata compressionMetadata;
      public final long repairedAt;
      public final int sstableLevel;
 +    public final SerializationHeader.Component header;
  
+     /* cached size value */
+     private transient final long size;
+ 
      public FileMessageHeader(UUID cfId,
                               int sequenceNumber,
 -                             String version,
 +                             Version version,
                               SSTableFormat.Type format,
                               long estimatedKeys,
                               List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
          this.compressionMetadata = null;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
          this.compressionMetadata = compressionMetadata;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public boolean isCompressed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
  import java.util.concurrent.Future;
  import java.util.concurrent.TimeUnit;
  
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.junit.BeforeClass;
+ import org.junit.After;
  import org.junit.Test;
  
  import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
+         StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+         session.init(future);
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
 -        for (SSTableReader sstable : cfs.getSSTables())
++        for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when it's aborted
+         Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.9

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e0ace7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e0ace7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e0ace7b

Branch: refs/heads/cassandra-3.9
Commit: 2e0ace7bca45a33e5b220660bebc6afbdbbd8e5c
Parents: 042e1f7 cc8f6cc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:39:30 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:39:30 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 31d9434,49733d3..dd04ddf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -18,45 -13,6 +18,46 @@@ Merged from 3.0
     to connect with too low of a protocol version (CASSANDRA-11464)
   * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
   * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 +Merged from 2.2:
++ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
 + * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
 + * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
 + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 +Merged from 2.1:
 + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
 + * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
 +
 +
 +3.8
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
   * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
   * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
   * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index e8d0cae,c1c5055..4f313c3
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -22,7 -22,9 +22,8 @@@ import java.util.concurrent.*
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.atomic.AtomicInteger;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
 -import com.google.common.collect.Iterables;
  
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.io.sstable.format.SSTableReader;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,dce56eb..fe75da1
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -103,4 -114,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
 -        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
++        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false);
+         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+         session.init(future);
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+         for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when it's aborted
+         Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }