You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/03 23:20:04 UTC

[2/6] git commit: Fix SSTable not released if stream session fails

Fix SSTable not released if stream session fails

patch by yukim; reviewed by Richard Low for CASSANDRA-6818


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04fd84c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04fd84c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04fd84c3

Branch: refs/heads/cassandra-2.1
Commit: 04fd84c3d536a82b1e20a6295bc09d9250b76d29
Parents: 123d5bc
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Apr 3 16:17:44 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Apr 3 16:17:44 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  8 +-
 .../apache/cassandra/streaming/StreamTask.java  |  6 ++
 .../cassandra/streaming/StreamTransferTask.java | 61 +++++++++++++--
 .../cassandra/streaming/StreamWriter.java       |  3 -
 .../compress/CompressedStreamWriter.java        |  2 -
 .../streaming/StreamTransferTaskTest.java       | 80 ++++++++++++++++++++
 7 files changed, 149 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3200941..38a6c3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,7 @@
  * Fix map element access in IF (CASSANDRA-6914)
  * Avoid costly range calculations for range queries on system keyspaces
    (CASSANDRA-6906)
+ * Fix SSTable not released if stream session fails (CASSANDRA-6818)
 Merged from 1.2:
  * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
  * add extra SSL cipher suites (CASSANDRA-6613)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7972183..449751d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.slf4j.Logger;
@@ -316,8 +318,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
 
         if (finalState == State.FAILED)
         {
-            for (StreamReceiveTask srt : receivers.values())
-                srt.abort();
+            for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
+                task.abort();
         }
 
         // Note that we shouldn't block on this close because this method is called on the handler
@@ -459,6 +461,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         long headerSize = header.size();
         StreamingMetrics.totalOutgoingBytes.inc(headerSize);
         metrics.outgoingBytes.inc(headerSize);
+        // schedule timeout for receiving ACK
+        transfers.get(header.cfId).scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java
index 9e9e06f..ac72cff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTask.java
@@ -46,6 +46,12 @@ public abstract class StreamTask
     public abstract long getTotalSize();
 
     /**
+     * Abort the task.
+     * Subclass should implement cleaning up resources.
+     */
+    public abstract void abort();
+
+    /**
      * @return StreamSummary that describes this task
      */
     public StreamSummary getSummary()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 8e461cc..86f4ee2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.streaming;
 
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -29,9 +30,13 @@ import org.apache.cassandra.utils.Pair;
  */
 public class StreamTransferTask extends StreamTask
 {
+    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
 
-    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
+
+    private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>();
 
     private long totalSize;
 
@@ -55,10 +60,26 @@ public class StreamTransferTask extends StreamTask
      */
     public void complete(int sequenceNumber)
     {
-        files.remove(sequenceNumber);
-        // all file sent, notify session this task is complete.
-        if (files.isEmpty())
-            session.taskCompleted(this);
+        OutgoingFileMessage file = files.remove(sequenceNumber);
+        if (file != null)
+        {
+            file.sstable.releaseReference();
+            // all file sent, notify session this task is complete.
+            if (files.isEmpty())
+            {
+                timeoutExecutor.shutdownNow();
+                session.taskCompleted(this);
+            }
+        }
+    }
+
+    public void abort()
+    {
+        for (OutgoingFileMessage file : files.values())
+        {
+            file.sstable.releaseReference();
+        }
+        timeoutExecutor.shutdownNow();
     }
 
     public int getTotalNumberOfFiles()
@@ -80,6 +101,36 @@ public class StreamTransferTask extends StreamTask
 
     public OutgoingFileMessage createMessageForRetry(int sequenceNumber)
     {
+        // remove previous time out task to be rescheduled later
+        ScheduledFuture future = timeoutTasks.get(sequenceNumber);
+        future.cancel(false);
         return files.get(sequenceNumber);
     }
+
+    /**
+     * Schedule timeout task to release reference for file sent.
+     * When not receiving ACK after sending to receiver in given time,
+     * the task will release reference.
+     *
+     * @param sequenceNumber sequence number of file sent.
+     * @param time time to timeout
+     * @param unit unit of given time
+     * @return scheduled future for timeout task
+     */
+    public ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit)
+    {
+        if (timeoutExecutor.isShutdown())
+            return null;
+
+        ScheduledFuture future = timeoutExecutor.schedule(new Runnable()
+        {
+            public void run()
+            {
+                StreamTransferTask.this.complete(sequenceNumber);
+                timeoutTasks.remove(sequenceNumber);
+            }
+        }, time, unit);
+        timeoutTasks.put(sequenceNumber, future);
+        return future;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index dbc7390..5609f20 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -115,9 +115,6 @@ public class StreamWriter
             FileUtils.closeQuietly(file);
             FileUtils.closeQuietly(validator);
         }
-
-        // release reference only when completed successfully
-        sstable.releaseReference();
     }
 
     protected long totalSize()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 80fcef5..001c927 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -83,8 +83,6 @@ public class CompressedStreamWriter extends StreamWriter
             // no matter what happens close file
             FileUtils.closeQuietly(file);
         }
-
-        sstable.releaseReference();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
new file mode 100644
index 0000000..9b02817
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StreamTransferTaskTest extends SchemaLoader
+{
+    @Test
+    public void testScheduleTimeout() throws Exception
+    {
+        String ks = "Keyspace1";
+        String cf = "Standard1";
+
+        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+
+        // create two sstables
+        for (int i = 0; i < 2; i++)
+        {
+            insertData(ks, cf, i, 1);
+            cfs.forceBlockingFlush();
+        }
+
+        // create streaming task that streams those two sstables
+        StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            List<Range<Token>> ranges = new ArrayList<>();
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+            task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges));
+        }
+        assertEquals(2, task.getTotalNumberOfFiles());
+
+        // if file sending completes before timeout then the task should be canceled.
+        ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS);
+        task.complete(0);
+        // timeout task may run after complete but it is noop
+        f.get();
+
+        // when timeout runs on second file, task should be completed
+        f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS);
+        f.get();
+        assertEquals(StreamSession.State.WAIT_COMPLETE, session.state());
+
+        // when all streaming are done, time out task should not be scheduled.
+        assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
+    }
+}