You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/17 18:13:03 UTC
[6/9] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/service/AntiEntropyService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b50b2b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b50b2b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b50b2b2
Branch: refs/heads/cassandra-2.1
Commit: 4b50b2b2e41aeda86c166059826e8eb1498b24fc
Parents: 44cf4a6 6dfca3d
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Feb 17 11:12:26 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Feb 17 11:12:26 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/repair/RepairJob.java | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b50b2b2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c9fabd2,f146166..bdfec11
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,33 -9,24 +25,34 @@@ Merged from 1.2
* Use real node messaging versions for schema exchange decisions (CASSANDRA-6700)
* IN on the last clustering columns + ORDER BY DESC yield no results (CASSANDRA-6701)
* Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
+ * Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
-
-1.2.15
- * Move handling of migration event source to solve bootstrap race (CASSANDRA-6648)
- * Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
-
-
-1.2.14
- * Reverted code to limit CQL prepared statement cache by size (CASSANDRA-6592)
- * add cassandra.default_messaging_version property to allow easier
- upgrading from 1.1 (CASSANDRA-6619)
- * Allow executing CREATE statements multiple times (CASSANDRA-6471)
- * Don't send confusing info with timeouts (CASSANDRA-6491)
- * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
- * Don't drop local mutations without a hint (CASSANDRA-6510)
- * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
- * Validate SliceRange start and finish lengths (CASSANDRA-6521)
+2.0.5
+ * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
+ * Add ks.cf names to tombstone logging (CASSANDRA-6597)
+ * Use LOCAL_QUORUM for LWT operations at LOCAL_SERIAL (CASSANDRA-6495)
+ * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
+ * Delete unfinished compaction incrementally (CASSANDRA-6086)
+ * Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
+ * Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
+ * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
+ * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
+ * Add support for 2.1 global counter shards (CASSANDRA-6505)
+ * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
+ * Avoid rare duplicate read repair triggering (CASSANDRA-6606)
+ * Fix paging discardFirst (CASSANDRA-6555)
+ * Fix ArrayIndexOutOfBoundsException in 2ndary index query (CASSANDRA-6470)
+ * Release sstables upon rebuilding 2i (CASSANDRA-6635)
+ * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
+ * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
+ * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
+ * Switch stress to use ITransportFactory (CASSANDRA-6641)
+ * Fix IllegalArgumentException during prepare (CASSANDRA-6592)
+ * Fix possible loss of 2ndary index entries during compaction (CASSANDRA-6517)
+ * Fix direct Memory on architectures that do not support unaligned long access
+ (CASSANDRA-6628)
+ * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
+Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b50b2b2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 16daf4e,0000000..6705c95
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -1,224 -1,0 +1,224 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.SimpleCondition;
+
+/**
+ * RepairJob runs repair on given ColumnFamily.
+ */
+public class RepairJob
+{
+ private static Logger logger = LoggerFactory.getLogger(RepairJob.class);
+
+ public final RepairJobDesc desc;
+ private final boolean isSequential;
+ // first we send tree requests. this tracks the endpoints remaining to hear from
+ private final RequestCoordinator<InetAddress> treeRequests;
+ // tree responses are then tracked here
+ private final List<TreeResponse> trees = new ArrayList<>();
+ // once all responses are received, each tree is compared with each other, and differencer tasks
+ // are submitted. the job is done when all differencers are complete.
+ private final RequestCoordinator<Differencer> differencers;
+ private final Condition requestsSent = new SimpleCondition();
+ private CountDownLatch snapshotLatch = null;
+ private int gcBefore = -1;
+
+ private volatile boolean failed = false;
+
+ /**
+ * Create repair job to run on specific columnfamily
+ */
+ public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+ {
+ this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ this.isSequential = isSequential;
+ this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
+ {
+ public void send(InetAddress endpoint)
+ {
+ ValidationRequest request = new ValidationRequest(desc, gcBefore);
+ MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
+ }
+ };
+ this.differencers = new RequestCoordinator<Differencer>(isSequential)
+ {
+ public void send(Differencer d)
+ {
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
+ }
+ };
+ }
+
+ /**
+ * @return true if this job failed
+ */
+ public boolean isFailed()
+ {
+ return failed;
+ }
+
+ /**
+ * Send merkle tree request to every involved neighbor.
+ */
+ public void sendTreeRequests(Collection<InetAddress> endpoints)
+ {
+ // send requests to all nodes
+ List<InetAddress> allEndpoints = new ArrayList<>(endpoints);
+ allEndpoints.add(FBUtilities.getBroadcastAddress());
+
+ if (isSequential)
- makeSnapshots(endpoints);
++ makeSnapshots(allEndpoints);
+
+ this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+
+ for (InetAddress endpoint : allEndpoints)
+ treeRequests.add(endpoint);
+
+ logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
+ treeRequests.start();
+ requestsSent.signalAll();
+ }
+
+ public void makeSnapshots(Collection<InetAddress> endpoints)
+ {
+ try
+ {
+ snapshotLatch = new CountDownLatch(endpoints.size());
+ IAsyncCallback callback = new IAsyncCallback()
+ {
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ public void response(MessageIn msg)
+ {
+ RepairJob.this.snapshotLatch.countDown();
+ }
+ };
+ for (InetAddress endpoint : endpoints)
+ MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
+ snapshotLatch.await();
+ snapshotLatch = null;
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Add a new received tree and return the number of remaining tree to
+ * be received for the job to be complete.
+ *
+ * Callers may assume exactly one addTree call will result in zero remaining endpoints.
+ *
+ * @param endpoint address of the endpoint that sent response
+ * @param tree sent Merkle tree or null if validation failed on endpoint
+ * @return the number of responses waiting to receive
+ */
+ public synchronized int addTree(InetAddress endpoint, MerkleTree tree)
+ {
+ // Wait for all request to have been performed (see #3400)
+ try
+ {
+ requestsSent.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError("Interrupted while waiting for requests to be sent");
+ }
+
+ if (tree == null)
+ failed = true;
+ else
+ trees.add(new TreeResponse(endpoint, tree));
+ return treeRequests.completed(endpoint);
+ }
+
+ /**
+ * Submit differencers for running.
+ * All tree *must* have been received before this is called.
+ */
+ public void submitDifferencers()
+ {
+ assert !failed;
+
+ // We need to difference all trees one against another
+ for (int i = 0; i < trees.size() - 1; ++i)
+ {
+ TreeResponse r1 = trees.get(i);
+ for (int j = i + 1; j < trees.size(); ++j)
+ {
+ TreeResponse r2 = trees.get(j);
+ Differencer differencer = new Differencer(desc, r1, r2);
+ logger.debug("Queueing comparison {}", differencer);
+ differencers.add(differencer);
+ }
+ }
+ differencers.start();
+ trees.clear(); // allows gc to do its thing
+ }
+
+ /**
+ * @return true if the given node pair was the last remaining
+ */
+ synchronized boolean completedSynchronization(NodePair nodes, boolean success)
+ {
+ if (!success)
+ failed = true;
+ Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
+ return differencers.completed(completed) == 0;
+ }
+
+ /**
+ * terminate this job.
+ */
+ public void terminate()
+ {
+ if (snapshotLatch != null)
+ {
+ while (snapshotLatch.getCount() > 0)
+ snapshotLatch.countDown();
+ }
+ }
+}