You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/13 16:20:07 UTC

[2/3] cassandra git commit: Fix streaming not holding ref when stream error

Fix streaming not holding ref when stream error

also fix crc validation error by not streaming early opened SSTable.

patch by yukim; reviewed by benedict for CASSANDRA-9295


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f7ab09f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f7ab09f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f7ab09f

Branch: refs/heads/trunk
Commit: 9f7ab09f733659c94e918db03d72e6a860d654b4
Parents: 4012134
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 13 09:04:41 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 13 09:07:25 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 25 ----------
 .../cassandra/io/sstable/SSTableLoader.java     |  2 +-
 .../cassandra/streaming/StreamSession.java      | 42 +++++++++++++----
 .../cassandra/streaming/StreamTransferTask.java | 12 ++---
 .../streaming/messages/OutgoingFileMessage.java | 49 ++++++++++++++------
 .../apache/cassandra/utils/concurrent/Refs.java |  2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |  2 +-
 .../streaming/StreamTransferTaskTest.java       |  2 +-
 .../streaming/StreamingTransferTest.java        |  2 +-
 10 files changed, 78 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7cb0dfd..033d75d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@
  * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261)
  * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151)
  * Repair waits for anticompaction to finish (CASSANDRA-9097)
+ * Fix streaming not holding ref when stream error (CASSANDRA-9295)
 Merged from 2.0:
  * Fix counting of tombstones for TombstoneOverwhelmingException (CASSANDRA-9299)
  * Fix ReconnectableSnitch reconnecting to peers during upgrade (CASSANDRA-6702)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 41ceb50..978037e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1874,31 +1874,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         };
     }
 
-    /**
-     * @return a ViewFragment containing the sstables and memtables that may need to be merged
-     * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
-     */
-    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
-    {
-        return new Function<DataTracker.View, List<SSTableReader>>()
-        {
-            public List<SSTableReader> apply(DataTracker.View view)
-            {
-                Set<SSTableReader> sstables = Sets.newHashSet();
-                for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
-                {
-                    for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
-                    {
-                        if (includeRepaired || !sstable.isRepaired())
-                            sstables.add(sstable);
-                    }
-                }
-
-                return ImmutableList.copyOf(sstables);
-            }
-        };
-    }
-
     public List<String> getSSTablesForKey(String key)
     {
         DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b66f8dc..249c084 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -129,7 +129,7 @@ public class SSTableLoader implements StreamEventHandler
                         Ref ref = sstable.tryRef();
                         if (ref == null)
                             throw new IllegalStateException("Could not acquire ref for "+sstable);
-                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
+                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
                         streamingDetails.put(endpoint, details);
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a9f5075..a316d12 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -24,12 +24,16 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -45,7 +49,6 @@ import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -291,17 +294,38 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return stores;
     }
 
-    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
+    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental)
     {
         Refs<SSTableReader> refs = new Refs<>();
         try
         {
             for (ColumnFamilyStore cfStore : stores)
             {
-                List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+                final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(range.toRowBounds());
-                refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
+                refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>()
+                {
+                    public List<SSTableReader> apply(DataTracker.View view)
+                    {
+                        List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view);
+                        Set<SSTableReader> sstables = Sets.newHashSet();
+                        if (filteredSSTables != null)
+                        {
+                            for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
+                            {
+                                // sstableInBounds may contain early opened sstables
+                                for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+                                {
+                                    if (filteredSSTables.contains(sstable) && (!isIncremental || !sstable.isRepaired()))
+                                        sstables.add(sstable);
+                                }
+                            }
+                        }
+
+                        return ImmutableList.copyOf(sstables);
+                    }
+                }).refs);
             }
 
             List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
@@ -310,7 +334,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 long repairedAt = overriddenRepairedAt;
                 if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
                     repairedAt = sstable.getSSTableMetadata().repairedAt;
-                sections.add(new SSTableStreamingSections(sstable, refs.get(sstable),
+                sections.add(new SSTableStreamingSections(refs.get(sstable),
                                                           sstable.getPositionsForRanges(ranges),
                                                           sstable.estimatedKeysForRanges(ranges),
                                                           repairedAt));
@@ -338,29 +362,27 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 continue;
             }
 
-            UUID cfId = details.sstable.metadata.cfId;
+            UUID cfId = details.ref.get().metadata.cfId;
             StreamTransferTask task = transfers.get(cfId);
             if (task == null)
             {
                 task = new StreamTransferTask(this, cfId);
                 transfers.put(cfId, task);
             }
-            task.addTransferFile(details.sstable, details.ref, details.estimatedKeys, details.sections, details.repairedAt);
+            task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt);
             iter.remove();
         }
     }
 
     public static class SSTableStreamingSections
     {
-        public final SSTableReader sstable;
         public final Ref<SSTableReader> ref;
         public final List<Pair<Long, Long>> sections;
         public final long estimatedKeys;
         public final long repairedAt;
 
-        public SSTableStreamingSections(SSTableReader sstable, Ref ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
+        public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
         {
-            this.sstable = sstable;
             this.ref = ref;
             this.sections = sections;
             this.estimatedKeys = estimatedKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 3add478..1727bae 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -52,10 +52,10 @@ public class StreamTransferTask extends StreamTask
         super(session, cfId);
     }
 
-    public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+    public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
-        assert sstable != null && cfId.equals(sstable.metadata.cfId);
-        OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
+        assert ref.get() != null && cfId.equals(ref.get().metadata.cfId);
+        OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }
@@ -76,7 +76,7 @@ public class StreamTransferTask extends StreamTask
 
             OutgoingFileMessage file = files.remove(sequenceNumber);
             if (file != null)
-                file.ref.release();
+                file.complete();
 
             signalComplete = files.isEmpty();
         }
@@ -101,7 +101,7 @@ public class StreamTransferTask extends StreamTask
         {
             try
             {
-                file.ref.release();
+                file.complete();
             }
             catch (Throwable t)
             {
@@ -127,7 +127,7 @@ public class StreamTransferTask extends StreamTask
     public synchronized Collection<OutgoingFileMessage> getFileMessages()
     {
         // We may race between queuing all those messages and the completion of the completion of
-        // the first ones. So copy tthe values to avoid a ConcurrentModificationException
+        // the first ones. So copy the values to avoid a ConcurrentModificationException
         return new ArrayList<>(files.values());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 069e97f..082e306 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamWriter;
@@ -47,29 +46,23 @@ public class OutgoingFileMessage extends StreamMessage
 
         public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
         {
-            FileMessageHeader.serializer.serialize(message.header, out, version);
-
-            final SSTableReader reader = message.sstable;
-            StreamWriter writer = message.header.compressionInfo == null ?
-                    new StreamWriter(reader, message.header.sections, session) :
-                    new CompressedStreamWriter(reader,
-                            message.header.sections,
-                            message.header.compressionInfo, session);
-            writer.write(out.getChannel());
+            message.serialize(out, version, session);
             session.fileSent(message.header);
         }
     };
 
     public final FileMessageHeader header;
-    public final SSTableReader sstable;
-    public final Ref<SSTableReader> ref;
+    private final Ref<SSTableReader> ref;
+    private final String filename;
+    private boolean completed = false;
 
-    public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+    public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
         super(Type.FILE);
-        this.sstable = sstable;
         this.ref = ref;
 
+        SSTableReader sstable = ref.get();
+        filename = sstable.getFilename();
         CompressionInfo compressionInfo = null;
         if (sstable.compression)
         {
@@ -85,10 +78,36 @@ public class OutgoingFileMessage extends StreamMessage
                                             repairedAt);
     }
 
+    public synchronized void serialize(DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+    {
+        if (completed)
+        {
+            return;
+        }
+
+        FileMessageHeader.serializer.serialize(header, out, version);
+
+        final SSTableReader reader = ref.get();
+        StreamWriter writer = header.compressionInfo == null ?
+                                      new StreamWriter(reader, header.sections, session) :
+                                      new CompressedStreamWriter(reader, header.sections,
+                                                                 header.compressionInfo, session);
+        writer.write(out.getChannel());
+    }
+
+    public synchronized void complete()
+    {
+        if (!completed)
+        {
+            completed = true;
+            ref.release();
+        }
+    }
+
     @Override
     public String toString()
     {
-        return "File (" + header + ", file: " + sstable.getFilename() + ")";
+        return "File (" + header + ", file: " + filename + ")";
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index b24fc2f..1c6486e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -59,7 +59,7 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
      * @param referenced the object we have a Ref to
      * @return the Ref to said object
      */
-    public Ref get(T referenced)
+    public Ref<T> get(T referenced)
     {
         return references.get(referenced);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 19ba274..51d695f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -118,7 +118,7 @@ public class LegacySSTableTest extends SchemaLoader
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
         ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
-        details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.ref(),
+        details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
                                                                sstable.getPositionsForRanges(ranges),
                                                                sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 1447b29..306afc0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest extends SchemaLoader
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
-            task.addTransferFile(sstable, sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0);
+            task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0);
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index a5112b7..06ebdd3 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -212,7 +212,7 @@ public class StreamingTransferTest extends SchemaLoader
         ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
         for (SSTableReader sstable : sstables)
         {
-            details.add(new StreamSession.SSTableStreamingSections(sstable, sstables.get(sstable),
+            details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable),
                                                                    sstable.getPositionsForRanges(ranges),
                                                                    sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         }