You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/08/18 20:27:49 UTC

[03/10] git commit: partial backport 3569

partial backport 3569


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/384d4f0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/384d4f0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/384d4f0e

Branch: refs/heads/cassandra-2.1
Commit: 384d4f0e202d5492aaced9311f5ecb302ac7ff00
Parents: c44526b
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 18 12:43:00 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 18 13:26:51 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../repair/IRepairJobEventListener.java         | 31 ++++++++++++++++++++
 .../org/apache/cassandra/repair/RepairJob.java  | 15 ++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 12 ++++++--
 4 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b610d7..4b1becc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,7 +48,7 @@
  * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
  * Add option to disable STCS in L0 (CASSANDRA-6621)
  * Fix error when doing reversed queries with static columns (CASSANDRA-7490)
- * Backport CASSANDRA-6747 (CASSANDRA-7560)
+ * Backport CASSNADRA-3569/CASSANDRA-6747 (CASSANDRA-7560)
  * Track max/min timestamps for range tombstones (CASSANDRA-7647)
  * Fix NPE when listing saved caches dir (CASSANDRA-7632)
  * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
new file mode 100644
index 0000000..778c09d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+/**
+ * Implemented by the RepairSession to accept callbacks from sequential snapshot creation failure.
+ */
+
+public interface IRepairJobEventListener
+{
+    /**
+     * Signal that there was a failure during the snapshot creation process.
+     *
+     */
+    public void failedSnapshot();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 13fe511..931f95a 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -58,11 +58,20 @@ public class RepairJob
     /* Count down as sync completes */
     private AtomicInteger waitForSync;
 
+    private final IRepairJobEventListener listener;
+
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
+    public RepairJob(IRepairJobEventListener listener,
+                     UUID sessionId,
+                     String keyspace,
+                     String columnFamily,
+                     Range<Token> range,
+                     boolean isSequential,
+                     ListeningExecutorService taskExecutor)
     {
+        this.listener = listener;
         this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
         this.isSequential = isSequential;
         this.taskExecutor = taskExecutor;
@@ -113,8 +122,8 @@ public class RepairJob
 
                 public void onFailure(Throwable throwable)
                 {
-                    // TODO need to propagate error to RepairSession
-                    logger.error("Error while snapshot", throwable);
+                    logger.error("Error occurred during snapshot phase", throwable);
+                    listener.failedSnapshot();
                     failed = true;
                 }
             }, taskExecutor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 942049b..c9a9671 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -73,7 +73,9 @@ import org.apache.cassandra.utils.*;
  * Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle
  * all of them in parallel otherwise.
  */
-public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
+                                                              IFailureDetectionEventListener,
+                                                              IRepairJobEventListener
 {
     private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
@@ -268,7 +270,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
+                RepairJob job = new RepairJob(this, id, keyspace, cfname, range, isSequential, taskExecutor);
                 jobs.offer(job);
             }
 
@@ -316,6 +318,12 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         completed.signalAll();
     }
 
+    public void failedSnapshot()
+    {
+        exception = new IOException("Failed during snapshot creation.");
+        forceShutdown();
+    }
+
     void failedNode(InetAddress remote)
     {
         String errorMsg = String.format("Endpoint %s died", remote);