You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/04 14:18:44 UTC
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5
Branch: refs/heads/cassandra-3.9
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 3 +-
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../streaming/messages/FileMessageHeader.java | 20 +++--
.../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
.../streaming/StreamTransferTaskTest.java | 85 ++++++++++++++++++--
6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
-2.2.8
+3.0.9
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
+ * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345)
- * Revert CASSANDRA-11427 (CASSANDRA-12351)
* Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
* cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
* Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
private final CompressionMetadata compressionMetadata;
public final long repairedAt;
public final int sstableLevel;
+ public final SerializationHeader.Component header;
+ /* cached size value */
+ private transient final long size;
+
public FileMessageHeader(UUID cfId,
int sequenceNumber,
- String version,
+ Version version,
SSTableFormat.Type format,
long estimatedKeys,
List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
this.compressionMetadata = null;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
this.sstableLevel = sstableLevel;
+ this.header = header;
+ this.size = calculateSize();
}
public boolean isCompressed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.BeforeClass;
+ import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
// when all streaming are done, time out task should not be scheduled.
assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+ {
+ InetAddress peer = FBUtilities.getBroadcastAddress();
+ StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+ session.init(future);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
- for (SSTableReader sstable : cfs.getSSTables())
++ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ Ref<SSTableReader> ref = sstable.selfRef();
+ refs.add(ref);
+ task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ //add task to stream session, so it is aborted when stream session fails
+ session.transfers.put(UUID.randomUUID(), task);
+
+ //make a copy of outgoing file messages, since task is cleared when it's aborted
+ Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+
+ //simulate start transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.startTransfer();
+ }
+
+ //fail stream session mid-transfer
+ session.onError(new Exception("Fake exception"));
+
+ //make sure reference was not released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(1, ref.globalCount());
+ }
+
+ //simulate finish transfer
+ for (OutgoingFileMessage file : files)
+ {
+ file.finishTransfer();
+ }
+
+ //now reference should be released
+ for (Ref<SSTableReader> ref : refs)
+ {
+ assertEquals(0, ref.globalCount());
+ }
+ }
}