You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/08/18 20:27:50 UTC
[04/10] git commit: partial backport 3569
partial backport 3569
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/384d4f0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/384d4f0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/384d4f0e
Branch: refs/heads/trunk
Commit: 384d4f0e202d5492aaced9311f5ecb302ac7ff00
Parents: c44526b
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 18 12:43:00 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 18 13:26:51 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../repair/IRepairJobEventListener.java | 31 ++++++++++++++++++++
.../org/apache/cassandra/repair/RepairJob.java | 15 ++++++++--
.../apache/cassandra/repair/RepairSession.java | 12 ++++++--
4 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b610d7..4b1becc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,7 +48,7 @@
* Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
* Add option to disable STCS in L0 (CASSANDRA-6621)
* Fix error when doing reversed queries with static columns (CASSANDRA-7490)
- * Backport CASSANDRA-6747 (CASSANDRA-7560)
+ * Backport CASSNADRA-3569/CASSANDRA-6747 (CASSANDRA-7560)
* Track max/min timestamps for range tombstones (CASSANDRA-7647)
* Fix NPE when listing saved caches dir (CASSANDRA-7632)
* Fix sstableloader unable to connect encrypted node (CASSANDRA-7585)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
new file mode 100644
index 0000000..778c09d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+/**
+ * Implemented by the RepairSession to accept callbacks from sequential snapshot creation failure.
+ */
+
+public interface IRepairJobEventListener
+{
+ /**
+ * Signal that there was a failure during the snapshot creation process.
+ *
+ */
+ public void failedSnapshot();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 13fe511..931f95a 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -58,11 +58,20 @@ public class RepairJob
/* Count down as sync completes */
private AtomicInteger waitForSync;
+ private final IRepairJobEventListener listener;
+
/**
* Create repair job to run on specific columnfamily
*/
- public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
+ public RepairJob(IRepairJobEventListener listener,
+ UUID sessionId,
+ String keyspace,
+ String columnFamily,
+ Range<Token> range,
+ boolean isSequential,
+ ListeningExecutorService taskExecutor)
{
+ this.listener = listener;
this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
this.taskExecutor = taskExecutor;
@@ -113,8 +122,8 @@ public class RepairJob
public void onFailure(Throwable throwable)
{
- // TODO need to propagate error to RepairSession
- logger.error("Error while snapshot", throwable);
+ logger.error("Error occurred during snapshot phase", throwable);
+ listener.failedSnapshot();
failed = true;
}
}, taskExecutor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 942049b..c9a9671 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -73,7 +73,9 @@ import org.apache.cassandra.utils.*;
* Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle
* all of them in parallel otherwise.
*/
-public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener,
+ IRepairJobEventListener
{
private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
@@ -268,7 +270,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
+ RepairJob job = new RepairJob(this, id, keyspace, cfname, range, isSequential, taskExecutor);
jobs.offer(job);
}
@@ -316,6 +318,12 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
completed.signalAll();
}
+ public void failedSnapshot()
+ {
+ exception = new IOException("Failed during snapshot creation.");
+ forceShutdown();
+ }
+
void failedNode(InetAddress remote)
{
String errorMsg = String.format("Endpoint %s died", remote);