You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/04 14:18:42 UTC

[03/10] cassandra git commit: Release sstables of failed stream sessions only when outgoing transfers are finished

Release sstables of failed stream sessions only when outgoing transfers are finished

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212

Branch: refs/heads/cassandra-3.9
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
  * Revert CASSANDRA-11427 (CASSANDRA-12351)
  * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
     private boolean aborted = false;
 
-    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    @VisibleForTesting
+    protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
     public final long repairedAt;
     public final int sstableLevel;
 
+    /* cached size value */
+    private transient final long size;
+
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
         this.compressionMetadata = null;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
         this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.size = calculateSize();
     }
 
     public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
      */
     public long size()
     {
-        long size = 0;
+        return size;
+    }
+
+    private long calculateSize()
+    {
+        long transferSize = 0;
         if (compressionInfo != null)
         {
             // calculate total length of transferring chunks
             for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                size += chunk.length + 4; // 4 bytes for CRC
+                transferSize += chunk.length + 4; // 4 bytes for CRC
         }
         else if (compressionMetadata != null)
         {
-            size = compressionMetadata.getTotalSizeForSections(sections);
+            transferSize = compressionMetadata.getTotalSizeForSections(sections);
         }
         else
         {
             for (Pair<Long, Long> section : sections)
-                size += section.right - section.left;
+                transferSize += section.right - section.left;
         }
-        return size;
+        return transferSize;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
 
         public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            message.serialize(out, version, session);
-            session.fileSent(message.header);
+            message.startTransfer();
+            try
+            {
+                message.serialize(out, version, session);
+                session.fileSent(message.header);
+            }
+            finally
+            {
+                message.finishTransfer();
+            }
         }
     };
 
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
     private final Ref<SSTableReader> ref;
     private final String filename;
     private boolean completed = false;
+    private boolean transferring = false;
 
     public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
     {
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
         writer.write(out);
     }
 
+    @VisibleForTesting
+    public synchronized void finishTransfer()
+    {
+        transferring = false;
+        //session was aborted mid-transfer, now it's safe to release
+        if (completed)
+        {
+            ref.release();
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void startTransfer()
+    {
+        transferring = true;
+    }
+
     public synchronized void complete()
     {
         if (!completed)
         {
             completed = true;
-            ref.release();
+            //release only if not transferring
+            if (!transferring)
+            {
+                ref.release();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
     }
 
+    @After
+    public void tearDown()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+        cfs.clearUnsafe();
+    }
+
     @Test
     public void testScheduleTimeout() throws Exception
     {
-        String ks = KEYSPACE1;
-        String cf = "Standard1";
-
         InetAddress peer = FBUtilities.getBroadcastAddress();
         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
-        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
         for (int i = 0; i < 2; i++)
         {
-            SchemaLoader.insertData(ks, cf, i, 1);
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
             cfs.forceBlockingFlush();
         }
 
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
         // when all streaming are done, time out task should not be scheduled.
         assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+    {
+        InetAddress peer = FBUtilities.getBroadcastAddress();
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+        session.init(future);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+        // create two sstables
+        for (int i = 0; i < 2; i++)
+        {
+            SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+            cfs.forceBlockingFlush();
+        }
+
+        // create streaming task that streams those two sstables
+        StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+        List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            List<Range<Token>> ranges = new ArrayList<>();
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+            Ref<SSTableReader> ref = sstable.selfRef();
+            refs.add(ref);
+            task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+        }
+        assertEquals(2, task.getTotalNumberOfFiles());
+
+        //add task to stream session, so it is aborted when stream session fails
+        session.transfers.put(UUID.randomUUID(), task);
+
+        //make a copy of outgoing file messages, since task is cleared when it's aborted
+        Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+        //simulate start transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.startTransfer();
+        }
+
+        //fail stream session mid-transfer
+        session.onError(new Exception("Fake exception"));
+
+        //make sure reference was not released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(1, ref.globalCount());
+        }
+
+        //simulate finish transfer
+        for (OutgoingFileMessage file : files)
+        {
+            file.finishTransfer();
+        }
+
+        //now reference should be released
+        for (Ref<SSTableReader> ref : refs)
+        {
+            assertEquals(0, ref.globalCount());
+        }
+    }
 }