You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/13 16:20:07 UTC
[2/3] cassandra git commit: Fix streaming not holding ref when stream
error
Fix streaming not holding ref when stream error
also fix crc validation error by not streaming early opened SSTable.
patch by yukim; reviewed by benedict for CASSANDRA-9295
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f7ab09f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f7ab09f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f7ab09f
Branch: refs/heads/trunk
Commit: 9f7ab09f733659c94e918db03d72e6a860d654b4
Parents: 4012134
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 13 09:04:41 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 13 09:07:25 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 25 ----------
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/streaming/StreamSession.java | 42 +++++++++++++----
.../cassandra/streaming/StreamTransferTask.java | 12 ++---
.../streaming/messages/OutgoingFileMessage.java | 49 ++++++++++++++------
.../apache/cassandra/utils/concurrent/Refs.java | 2 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../streaming/StreamTransferTaskTest.java | 2 +-
.../streaming/StreamingTransferTest.java | 2 +-
10 files changed, 78 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7cb0dfd..033d75d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@
* Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261)
* Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151)
* Repair waits for anticompaction to finish (CASSANDRA-9097)
+ * Fix streaming not holding ref when stream error (CASSANDRA-9295)
Merged from 2.0:
* Fix counting of tombstones for TombstoneOverwhelmingException (CASSANDRA-9299)
* Fix ReconnectableSnitch reconnecting to peers during upgrade (CASSANDRA-6702)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 41ceb50..978037e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1874,31 +1874,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
};
}
- /**
- * @return a ViewFragment containing the sstables and memtables that may need to be merged
- * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
- */
- public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
- {
- return new Function<DataTracker.View, List<SSTableReader>>()
- {
- public List<SSTableReader> apply(DataTracker.View view)
- {
- Set<SSTableReader> sstables = Sets.newHashSet();
- for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
- {
- for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
- {
- if (includeRepaired || !sstable.isRepaired())
- sstables.add(sstable);
- }
- }
-
- return ImmutableList.copyOf(sstables);
- }
- };
- }
-
public List<String> getSSTablesForKey(String key)
{
DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b66f8dc..249c084 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -129,7 +129,7 @@ public class SSTableLoader implements StreamEventHandler
Ref ref = sstable.tryRef();
if (ref == null)
throw new IllegalStateException("Could not acquire ref for "+sstable);
- StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
+ StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a9f5075..a316d12 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -24,12 +24,16 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.AbstractBounds;
@@ -45,7 +49,6 @@ import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -291,17 +294,38 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return stores;
}
- private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental)
{
Refs<SSTableReader> refs = new Refs<>();
try
{
for (ColumnFamilyStore cfStore : stores)
{
- List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
rowBoundsList.add(range.toRowBounds());
- refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
+ refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>()
+ {
+ public List<SSTableReader> apply(DataTracker.View view)
+ {
+ List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view);
+ Set<SSTableReader> sstables = Sets.newHashSet();
+ if (filteredSSTables != null)
+ {
+ for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
+ {
+ // sstableInBounds may contain early opened sstables
+ for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+ {
+ if (filteredSSTables.contains(sstable) && (!isIncremental || !sstable.isRepaired()))
+ sstables.add(sstable);
+ }
+ }
+ }
+
+ return ImmutableList.copyOf(sstables);
+ }
+ }).refs);
}
List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
@@ -310,7 +334,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
long repairedAt = overriddenRepairedAt;
if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
repairedAt = sstable.getSSTableMetadata().repairedAt;
- sections.add(new SSTableStreamingSections(sstable, refs.get(sstable),
+ sections.add(new SSTableStreamingSections(refs.get(sstable),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges),
repairedAt));
@@ -338,29 +362,27 @@ public class StreamSession implements IEndpointStateChangeSubscriber
continue;
}
- UUID cfId = details.sstable.metadata.cfId;
+ UUID cfId = details.ref.get().metadata.cfId;
StreamTransferTask task = transfers.get(cfId);
if (task == null)
{
task = new StreamTransferTask(this, cfId);
transfers.put(cfId, task);
}
- task.addTransferFile(details.sstable, details.ref, details.estimatedKeys, details.sections, details.repairedAt);
+ task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt);
iter.remove();
}
}
public static class SSTableStreamingSections
{
- public final SSTableReader sstable;
public final Ref<SSTableReader> ref;
public final List<Pair<Long, Long>> sections;
public final long estimatedKeys;
public final long repairedAt;
- public SSTableStreamingSections(SSTableReader sstable, Ref ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
+ public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
{
- this.sstable = sstable;
this.ref = ref;
this.sections = sections;
this.estimatedKeys = estimatedKeys;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 3add478..1727bae 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -52,10 +52,10 @@ public class StreamTransferTask extends StreamTask
super(session, cfId);
}
- public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+ public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
- assert sstable != null && cfId.equals(sstable.metadata.cfId);
- OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
+ assert ref.get() != null && cfId.equals(ref.get().metadata.cfId);
+ OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
files.put(message.header.sequenceNumber, message);
totalSize += message.header.size();
}
@@ -76,7 +76,7 @@ public class StreamTransferTask extends StreamTask
OutgoingFileMessage file = files.remove(sequenceNumber);
if (file != null)
- file.ref.release();
+ file.complete();
signalComplete = files.isEmpty();
}
@@ -101,7 +101,7 @@ public class StreamTransferTask extends StreamTask
{
try
{
- file.ref.release();
+ file.complete();
}
catch (Throwable t)
{
@@ -127,7 +127,7 @@ public class StreamTransferTask extends StreamTask
public synchronized Collection<OutgoingFileMessage> getFileMessages()
{
// We may race between queuing all those messages and the completion of the completion of
- // the first ones. So copy tthe values to avoid a ConcurrentModificationException
+ // the first ones. So copy the values to avoid a ConcurrentModificationException
return new ArrayList<>(files.values());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 069e97f..082e306 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamWriter;
@@ -47,29 +46,23 @@ public class OutgoingFileMessage extends StreamMessage
public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
{
- FileMessageHeader.serializer.serialize(message.header, out, version);
-
- final SSTableReader reader = message.sstable;
- StreamWriter writer = message.header.compressionInfo == null ?
- new StreamWriter(reader, message.header.sections, session) :
- new CompressedStreamWriter(reader,
- message.header.sections,
- message.header.compressionInfo, session);
- writer.write(out.getChannel());
+ message.serialize(out, version, session);
session.fileSent(message.header);
}
};
public final FileMessageHeader header;
- public final SSTableReader sstable;
- public final Ref<SSTableReader> ref;
+ private final Ref<SSTableReader> ref;
+ private final String filename;
+ private boolean completed = false;
- public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+ public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
super(Type.FILE);
- this.sstable = sstable;
this.ref = ref;
+ SSTableReader sstable = ref.get();
+ filename = sstable.getFilename();
CompressionInfo compressionInfo = null;
if (sstable.compression)
{
@@ -85,10 +78,36 @@ public class OutgoingFileMessage extends StreamMessage
repairedAt);
}
+ public synchronized void serialize(DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ {
+ if (completed)
+ {
+ return;
+ }
+
+ FileMessageHeader.serializer.serialize(header, out, version);
+
+ final SSTableReader reader = ref.get();
+ StreamWriter writer = header.compressionInfo == null ?
+ new StreamWriter(reader, header.sections, session) :
+ new CompressedStreamWriter(reader, header.sections,
+ header.compressionInfo, session);
+ writer.write(out.getChannel());
+ }
+
+ public synchronized void complete()
+ {
+ if (!completed)
+ {
+ completed = true;
+ ref.release();
+ }
+ }
+
@Override
public String toString()
{
- return "File (" + header + ", file: " + sstable.getFilename() + ")";
+ return "File (" + header + ", file: " + filename + ")";
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index b24fc2f..1c6486e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -59,7 +59,7 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
* @param referenced the object we have a Ref to
* @return the Ref to said object
*/
- public Ref get(T referenced)
+ public Ref<T> get(T referenced)
{
return references.get(referenced);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 19ba274..51d695f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -118,7 +118,7 @@ public class LegacySSTableTest extends SchemaLoader
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
- details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.ref(),
+ details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 1447b29..306afc0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest extends SchemaLoader
{
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
- task.addTransferFile(sstable, sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0);
+ task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0);
}
assertEquals(2, task.getTotalNumberOfFiles());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index a5112b7..06ebdd3 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -212,7 +212,7 @@ public class StreamingTransferTest extends SchemaLoader
ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
for (SSTableReader sstable : sstables)
{
- details.add(new StreamSession.SSTableStreamingSections(sstable, sstables.get(sstable),
+ details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
}