You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/04 14:18:41 UTC
[02/10] cassandra git commit: Release sstables of failed stream
sessions only when outgoing transfers are finished
Release sstables of failed stream sessions only when outgoing transfers are finished
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212
Branch: refs/heads/cassandra-3.0
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
* Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// stream requests to send to the peer
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
- private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
private boolean aborted = false;
- private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ @VisibleForTesting
+ protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
private long totalSize;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
public final long repairedAt;
public final int sstableLevel;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
*/
public long size()
{
- long size = 0;
+ return size;
+ }
+
+ private long calculateSize()
+ {
+ long transferSize = 0;
if (compressionInfo != null)
{
// calculate total length of transferring chunks
for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
+ transferSize += chunk.length + 4; // 4 bytes for CRC
}
else if (compressionMetadata != null)
{
- size = compressionMetadata.getTotalSizeForSections(sections);
+ transferSize = compressionMetadata.getTotalSizeForSections(sections);
}
else
{
for (Pair<Long, Long> section : sections)
- size += section.right - section.left;
+ transferSize += section.right - section.left;
}
- return size;
+ return transferSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- message.serialize(out, version, session);
- session.fileSent(message.header);
+ message.startTransfer();
+ try
+ {
+ message.serialize(out, version, session);
+ session.fileSent(message.header);
+ }
+ finally
+ {
+ message.finishTransfer();
+ }
}
};
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
private final Ref<SSTableReader> ref;
private final String filename;
private boolean completed = false;
+ private boolean transferring = false;
public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
{
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
writer.write(out);
}
+ @VisibleForTesting
+ public synchronized void finishTransfer()
+ {
+ transferring = false;
+ //session was aborted mid-transfer, now it's safe to release
+ if (completed)
+ {
+ ref.release();
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void startTransfer()
+ {
+ transferring = true;
+ }
+
public synchronized void complete()
{
if (!completed)
{
completed = true;
- ref.release();
+ //release only if not transferring
+ if (!transferring)
+ {
+ ref.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
}
+ @After
+ public void tearDown()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ cfs.clearUnsafe();
+ }
+
@Test
public void testScheduleTimeout() throws Exception
{
- String ks = KEYSPACE1;
- String cf = "Standard1";
-
InetAddress peer = FBUtilities.getBroadcastAddress();
StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
- ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
for (int i = 0; i < 2; i++)
{
- SchemaLoader.insertData(ks, cf, i, 1);
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
cfs.forceBlockingFlush();
}
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}