You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/12/03 21:32:58 UTC

[18/21] git commit: RangeStreamer throws when any range did not complete. Patch by brandonwilliams reviewed by yukim for CASSANDRA-5009

RangeStreamer throws when any range did not complete.
Patch by brandonwilliams reviewed by yukim for CASSANDRA-5009


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d1d37203
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d1d37203
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d1d37203

Branch: refs/heads/cassandra-1.1
Commit: d1d372033b2f0a2ef0a63c9591b369d7789851db
Parents: 3706749
Author: Brandon Williams <br...@apache.org>
Authored: Mon Dec 3 14:21:23 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Dec 3 14:21:23 2012 -0600

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/RangeStreamer.java    |   23 +++++++++++----
 1 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1d37203/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 4e5cfb8..a0e1a93 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,12 +19,14 @@ package org.apache.cassandra.dht;
 
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.streaming.IStreamCallback;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +40,6 @@ import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.streaming.StreamIn;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Assists in streaming ranges to a node.
@@ -52,6 +53,9 @@ public class RangeStreamer
     private final OperationType opType;
     private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
     private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
+    // protected for testing.
+    protected CountDownLatch latch;
+    private Set<Range<Token>> completed = Collections.newSetFromMap(new ConcurrentHashMap<Range<Token>, Boolean>());
 
     /**
      * A filter applied to sources to stream from when constructing a fetch map.
@@ -140,7 +144,7 @@ public class RangeStreamer
     private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String table, Collection<Range<Token>> desiredRanges)
     {
         AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
-        Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata);
+        Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap());
 
         Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
         for (Range<Token> desiredRange : desiredRanges)
@@ -217,17 +221,19 @@ public class RangeStreamer
 
     public void fetch()
     {
-        final CountDownLatch latch = new CountDownLatch(toFetch().entries().size());
+        latch = new CountDownLatch(toFetch.entries().size());
+
         for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
         {
             final String table = entry.getKey();
             final InetAddress source = entry.getValue().getKey();
-            Collection<Range<Token>> ranges = entry.getValue().getValue();
+            final Collection<Range<Token>> ranges = entry.getValue().getValue();
             /* Send messages to respective folks to stream data over to me */
             IStreamCallback callback = new IStreamCallback()
             {
                 public void onSuccess()
                 {
+                    completed.addAll(ranges);
                     latch.countDown();
                     if (logger.isDebugEnabled())
                         logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s",
@@ -236,8 +242,8 @@ public class RangeStreamer
 
                 public void onFailure()
                 {
+                    latch.countDown();
                     logger.warn("Streaming from " + source + " failed");
-                    onSuccess(); // calling onSuccess for latch countdown
                 }
             };
             if (logger.isDebugEnabled())
@@ -248,6 +254,11 @@ public class RangeStreamer
         try
         {
             latch.await();
+            for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
+            {
+                if (!completed.containsAll(entry.getValue().getValue()))
+                    throw new RuntimeException(String.format("Unable to fetch range %s for keyspace %s from any hosts", entry.getValue().getValue(), entry.getKey()));
+            }
         }
         catch (InterruptedException e)
         {