You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/04 14:18:40 UTC
[01/10] cassandra git commit: Release sstables of failed stream
sessions only when outgoing transfers are finished
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 f28631e0c -> 039852126
refs/heads/cassandra-3.0 52be7bac4 -> cc8f6cc51
refs/heads/cassandra-3.9 042e1f76a -> 2e0ace7bc
refs/heads/trunk 54836ec0c -> 90ba50f6a
Release sstables of failed stream sessions only when outgoing transfers are finished
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212
Branch: refs/heads/cassandra-2.2
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
* Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// stream requests to send to the peer
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
- private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
private boolean aborted = false;
- private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ @VisibleForTesting
+ protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
private long totalSize;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
public final long repairedAt;
public final int sstableLevel;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
*/
public long size()
{
- long size = 0;
+ return size;
+ }
+
+ private long calculateSize()
+ {
+ long transferSize = 0;
if (compressionInfo != null)
{
// calculate total length of transferring chunks
for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
+ transferSize += chunk.length + 4; // 4 bytes for CRC
}
else if (compressionMetadata != null)
{
- size = compressionMetadata.getTotalSizeForSections(sections);
+ transferSize = compressionMetadata.getTotalSizeForSections(sections);
}
else
{
for (Pair<Long, Long> section : sections)
- size += section.right - section.left;
+ transferSize += section.right - section.left;
}
- return size;
+ return transferSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- message.serialize(out, version, session);
- session.fileSent(message.header);
+ message.startTransfer();
+ try
+ {
+ message.serialize(out, version, session);
+ session.fileSent(message.header);
+ }
+ finally
+ {
+ message.finishTransfer();
+ }
}
};
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
private final Ref<SSTableReader> ref;
private final String filename;
private boolean completed = false;
+ private boolean transferring = false;
public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
{
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
writer.write(out);
}
+ @VisibleForTesting
+ public synchronized void finishTransfer()
+ {
+ transferring = false;
+ //session was aborted mid-transfer, now it's safe to release
+ if (completed)
+ {
+ ref.release();
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void startTransfer()
+ {
+ transferring = true;
+ }
+
public synchronized void complete()
{
if (!completed)
{
completed = true;
- ref.release();
+ //release only if not transferring
+ if (!transferring)
+ {
+ ref.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
}
+ @After
+ public void tearDown()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ cfs.clearUnsafe();
+ }
+
@Test
public void testScheduleTimeout() throws Exception
{
- String ks = KEYSPACE1;
- String cf = "Standard1";
-
InetAddress peer = FBUtilities.getBroadcastAddress();
StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
- ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
for (int i = 0; i < 2; i++)
{
- SchemaLoader.insertData(ks, cf, i, 1);
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
cfs.forceBlockingFlush();
}
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5
Branch: refs/heads/trunk
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
-2.2.8
+3.0.9
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
- * Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
* Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
private final CompressionMetadata compressionMetadata;
public final long repairedAt;
public final int sstableLevel;
+ public final SerializationHeader.Component header;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
- String version,
+ Version version,
SSTableFormat.Type format,
long estimatedKeys,
List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public boolean isCompressed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+ import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
- for (SSTableReader sstable : cfs.getSSTables())
++ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5
Branch: refs/heads/cassandra-3.0
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
-2.2.8
+3.0.9
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
- * Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
* Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
private final CompressionMetadata compressionMetadata;
public final long repairedAt;
public final int sstableLevel;
+ public final SerializationHeader.Component header;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
- String version,
+ Version version,
SSTableFormat.Type format,
long estimatedKeys,
List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public boolean isCompressed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+ import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
- for (SSTableReader sstable : cfs.getSSTables())
++ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[10/10] cassandra git commit: Merge branch 'cassandra-3.9' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.9' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90ba50f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90ba50f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90ba50f6
Branch: refs/heads/trunk
Commit: 90ba50f6a5592508116800f018bae3c14febe5d3
Parents: 54836ec 2e0ace7
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 4 09:18:18 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 4 09:18:18 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/90ba50f6/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/90ba50f6/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/90ba50f6/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index dc2eb8f,71f40ee..bfee782
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -21,6 -21,9 +21,8 @@@ import java.io.IOException
import java.nio.channels.ReadableByteChannel;
import java.util.List;
+ import com.google.common.annotations.VisibleForTesting;
+
-import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
[04/10] cassandra git commit: Release sstables of failed stream
sessions only when outgoing transfers are finished
Posted by yu...@apache.org.
Release sstables of failed stream sessions only when outgoing transfers are finished
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212
Branch: refs/heads/trunk
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
* Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// stream requests to send to the peer
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
- private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
private boolean aborted = false;
- private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ @VisibleForTesting
+ protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
private long totalSize;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
public final long repairedAt;
public final int sstableLevel;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
*/
public long size()
{
- long size = 0;
+ return size;
+ }
+
+ private long calculateSize()
+ {
+ long transferSize = 0;
if (compressionInfo != null)
{
// calculate total length of transferring chunks
for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
+ transferSize += chunk.length + 4; // 4 bytes for CRC
}
else if (compressionMetadata != null)
{
- size = compressionMetadata.getTotalSizeForSections(sections);
+ transferSize = compressionMetadata.getTotalSizeForSections(sections);
}
else
{
for (Pair<Long, Long> section : sections)
- size += section.right - section.left;
+ transferSize += section.right - section.left;
}
- return size;
+ return transferSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- message.serialize(out, version, session);
- session.fileSent(message.header);
+ message.startTransfer();
+ try
+ {
+ message.serialize(out, version, session);
+ session.fileSent(message.header);
+ }
+ finally
+ {
+ message.finishTransfer();
+ }
}
};
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
private final Ref<SSTableReader> ref;
private final String filename;
private boolean completed = false;
+ private boolean transferring = false;
public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
{
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
writer.write(out);
}
+ @VisibleForTesting
+ public synchronized void finishTransfer()
+ {
+ transferring = false;
+ //session was aborted mid-transfer, now it's safe to release
+ if (completed)
+ {
+ ref.release();
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void startTransfer()
+ {
+ transferring = true;
+ }
+
public synchronized void complete()
{
if (!completed)
{
completed = true;
- ref.release();
+ //release only if not transferring
+ if (!transferring)
+ {
+ ref.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
}
+ @After
+ public void tearDown()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ cfs.clearUnsafe();
+ }
+
@Test
public void testScheduleTimeout() throws Exception
{
- String ks = KEYSPACE1;
- String cf = "Standard1";
-
InetAddress peer = FBUtilities.getBroadcastAddress();
StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
- ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
for (int i = 0; i < 2; i++)
{
- SchemaLoader.insertData(ks, cf, i, 1);
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
cfs.forceBlockingFlush();
}
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.9
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e0ace7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e0ace7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e0ace7b
Branch: refs/heads/trunk
Commit: 2e0ace7bca45a33e5b220660bebc6afbdbbd8e5c
Parents: 042e1f7 cc8f6cc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:39:30 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:39:30 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 31d9434,49733d3..dd04ddf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -18,45 -13,6 +18,46 @@@ Merged from 3.0
to connect with too low of a protocol version (CASSANDRA-11464)
* NullPointerExpception when reading/compacting table (CASSANDRA-11988)
* Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+Merged from 2.2:
++ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
+ * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
+ * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
+ * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
+Merged from 2.1:
+ * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
+ * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
+
+
+3.8
+ * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
+ * Fix hdr logging for single operation workloads (CASSANDRA-12145)
+ * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
+ * Increase size of flushExecutor thread pool (CASSANDRA-12071)
+ * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
+ * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
+ * Improve details in compaction log message (CASSANDRA-12080)
+ * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
+ * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
+ * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
+ * Move skip_stop_words filter before stemming (CASSANDRA-12078)
+ * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
+ * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
+ * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
+ * Add cross-DC latency metrics (CASSANDRA-11596)
+ * Allow terms in selection clause (CASSANDRA-10783)
+ * Add bind variables to trace (CASSANDRA-11719)
+ * Switch counter shards' clock to timestamps (CASSANDRA-9811)
+ * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
+ * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
+ * Support older ant versions (CASSANDRA-11807)
+ * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
+ * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
+ * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
+ * Faster streaming (CASSANDRA-9766)
+ * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
+ * Add repaired percentage metric (CASSANDRA-11503)
+ * Add Change-Data-Capture (CASSANDRA-8844)
+Merged from 3.0:
* Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
* Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
* Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index e8d0cae,c1c5055..4f313c3
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -22,7 -22,9 +22,8 @@@ import java.util.concurrent.*
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.format.SSTableReader;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,dce56eb..fe75da1
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -103,4 -114,68 +114,68 @@@ public class StreamTransferTaskTes
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
- StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
++ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[02/10] cassandra git commit: Release sstables of failed stream
sessions only when outgoing transfers are finished
Posted by yu...@apache.org.
Release sstables of failed stream sessions only when outgoing transfers are finished
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212
Branch: refs/heads/cassandra-3.0
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
* Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// stream requests to send to the peer
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
- private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
private boolean aborted = false;
- private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ @VisibleForTesting
+ protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
private long totalSize;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
public final long repairedAt;
public final int sstableLevel;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
*/
public long size()
{
- long size = 0;
+ return size;
+ }
+
+ private long calculateSize()
+ {
+ long transferSize = 0;
if (compressionInfo != null)
{
// calculate total length of transferring chunks
for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
+ transferSize += chunk.length + 4; // 4 bytes for CRC
}
else if (compressionMetadata != null)
{
- size = compressionMetadata.getTotalSizeForSections(sections);
+ transferSize = compressionMetadata.getTotalSizeForSections(sections);
}
else
{
for (Pair<Long, Long> section : sections)
- size += section.right - section.left;
+ transferSize += section.right - section.left;
}
- return size;
+ return transferSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- message.serialize(out, version, session);
- session.fileSent(message.header);
+ message.startTransfer();
+ try
+ {
+ message.serialize(out, version, session);
+ session.fileSent(message.header);
+ }
+ finally
+ {
+ message.finishTransfer();
+ }
}
};
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
private final Ref<SSTableReader> ref;
private final String filename;
private boolean completed = false;
+ private boolean transferring = false;
public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
{
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
writer.write(out);
}
+ @VisibleForTesting
+ public synchronized void finishTransfer()
+ {
+ transferring = false;
+ //session was aborted mid-transfer, now it's safe to release
+ if (completed)
+ {
+ ref.release();
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void startTransfer()
+ {
+ transferring = true;
+ }
+
public synchronized void complete()
{
if (!completed)
{
completed = true;
- ref.release();
+ //release only if not transferring
+ if (!transferring)
+ {
+ ref.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
}
+ @After
+ public void tearDown()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ cfs.clearUnsafe();
+ }
+
@Test
public void testScheduleTimeout() throws Exception
{
- String ks = KEYSPACE1;
- String cf = "Standard1";
-
InetAddress peer = FBUtilities.getBroadcastAddress();
StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
- ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
for (int i = 0; i < 2; i++)
{
- SchemaLoader.insertData(ks, cf, i, 1);
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
cfs.forceBlockingFlush();
}
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[03/10] cassandra git commit: Release sstables of failed stream
sessions only when outgoing transfers are finished
Posted by yu...@apache.org.
Release sstables of failed stream sessions only when outgoing transfers are finished
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03985212
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03985212
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03985212
Branch: refs/heads/cassandra-3.9
Commit: 03985212644112d2751cdabc72bd954dda9ff3ba
Parents: f28631e
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 09:34:27 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 09:34:27 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f709f7..87228d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
* Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f4c900e..294b9c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -139,7 +139,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// stream requests to send to the peer
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
- private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..c1c5055 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@@ -42,7 +43,8 @@ public class StreamTransferTask extends StreamTask
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
private boolean aborted = false;
- private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ @VisibleForTesting
+ protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
private long totalSize;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9a727f..b2af699 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -59,6 +59,9 @@ public class FileMessageHeader
public final long repairedAt;
public final int sstableLevel;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
String version,
@@ -79,6 +82,7 @@ public class FileMessageHeader
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@ -101,6 +105,7 @@ public class FileMessageHeader
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.size = calculateSize();
}
public boolean isCompressed()
@@ -113,23 +118,28 @@ public class FileMessageHeader
*/
public long size()
{
- long size = 0;
+ return size;
+ }
+
+ private long calculateSize()
+ {
+ long transferSize = 0;
if (compressionInfo != null)
{
// calculate total length of transferring chunks
for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
+ transferSize += chunk.length + 4; // 4 bytes for CRC
}
else if (compressionMetadata != null)
{
- size = compressionMetadata.getTotalSizeForSections(sections);
+ transferSize = compressionMetadata.getTotalSizeForSections(sections);
}
else
{
for (Pair<Long, Long> section : sections)
- size += section.right - section.left;
+ transferSize += section.right - section.left;
}
- return size;
+ return transferSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index c8175ea..a88386e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -45,8 +47,16 @@ public class OutgoingFileMessage extends StreamMessage
public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- message.serialize(out, version, session);
- session.fileSent(message.header);
+ message.startTransfer();
+ try
+ {
+ message.serialize(out, version, session);
+ session.fileSent(message.header);
+ }
+ finally
+ {
+ message.finishTransfer();
+ }
}
};
@@ -54,6 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
private final Ref<SSTableReader> ref;
private final String filename;
private boolean completed = false;
+ private boolean transferring = false;
public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
{
@@ -90,12 +101,33 @@ public class OutgoingFileMessage extends StreamMessage
writer.write(out);
}
+ @VisibleForTesting
+ public synchronized void finishTransfer()
+ {
+ transferring = false;
+ //session was aborted mid-transfer, now it's safe to release
+ if (completed)
+ {
+ ref.release();
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void startTransfer()
+ {
+ transferring = true;
+ }
+
public synchronized void complete()
{
if (!completed)
{
completed = true;
- ref.release();
+ //release only if not transferring
+ if (!transferring)
+ {
+ ref.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03985212/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index c3c16b8..02af9a7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@ -37,7 +42,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -57,20 +64,24 @@ public class StreamTransferTaskTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
}
+ @After
+ public void tearDown()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ cfs.clearUnsafe();
+ }
+
@Test
public void testScheduleTimeout() throws Exception
{
- String ks = KEYSPACE1;
- String cf = "Standard1";
-
InetAddress peer = FBUtilities.getBroadcastAddress();
StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
- ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
for (int i = 0; i < 2; i++)
{
- SchemaLoader.insertData(ks, cf, i, 1);
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
cfs.forceBlockingFlush();
}
@@ -104,4 +115,68 @@ public class StreamTransferTaskTest
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5
Branch: refs/heads/cassandra-3.9
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
-2.2.8
+3.0.9
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
- * Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
* Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
private final CompressionMetadata compressionMetadata;
public final long repairedAt;
public final int sstableLevel;
+ public final SerializationHeader.Component header;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
- String version,
+ Version version,
SSTableFormat.Type format,
long estimatedKeys,
List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public boolean isCompressed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+ import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
- for (SSTableReader sstable : cfs.getSSTables())
++ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.9
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e0ace7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e0ace7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e0ace7b
Branch: refs/heads/cassandra-3.9
Commit: 2e0ace7bca45a33e5b220660bebc6afbdbbd8e5c
Parents: 042e1f7 cc8f6cc
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:39:30 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:39:30 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 31d9434,49733d3..dd04ddf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -18,45 -13,6 +18,46 @@@ Merged from 3.0
to connect with too low of a protocol version (CASSANDRA-11464)
* NullPointerExpception when reading/compacting table (CASSANDRA-11988)
* Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+Merged from 2.2:
++ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
+ * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
+ * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
+ * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
+Merged from 2.1:
+ * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
+ * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
+
+
+3.8
+ * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
+ * Fix hdr logging for single operation workloads (CASSANDRA-12145)
+ * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
+ * Increase size of flushExecutor thread pool (CASSANDRA-12071)
+ * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
+ * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
+ * Improve details in compaction log message (CASSANDRA-12080)
+ * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
+ * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
+ * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
+ * Move skip_stop_words filter before stemming (CASSANDRA-12078)
+ * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
+ * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
+ * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
+ * Add cross-DC latency metrics (CASSANDRA-11596)
+ * Allow terms in selection clause (CASSANDRA-10783)
+ * Add bind variables to trace (CASSANDRA-11719)
+ * Switch counter shards' clock to timestamps (CASSANDRA-9811)
+ * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
+ * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
+ * Support older ant versions (CASSANDRA-11807)
+ * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
+ * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
+ * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
+ * Faster streaming (CASSANDRA-9766)
+ * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
+ * Add repaired percentage metric (CASSANDRA-11503)
+ * Add Change-Data-Capture (CASSANDRA-8844)
+Merged from 3.0:
* Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
* Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
* Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index e8d0cae,c1c5055..4f313c3
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -22,7 -22,9 +22,8 @@@ import java.util.concurrent.*
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.format.SSTableReader;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,dce56eb..fe75da1
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -103,4 -114,68 +114,68 @@@ public class StreamTransferTaskTes
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
- StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
++ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}