You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by ifesdjeen <gi...@git.apache.org> on 2018/09/17 15:26:26 UTC

[GitHub] cassandra pull request #269: Review tr range movements

GitHub user ifesdjeen opened a pull request:

    https://github.com/apache/cassandra/pull/269

    Review tr range movements

    CASSANDRA-14756

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ifesdjeen/cassandra review-tr-range-movements

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/cassandra/pull/269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #269
    
----
commit e51899d09a254e4a4d86c4fea26b6ada7eaebbcd
Author: Alex Petrov <ol...@...>
Date:   2018-09-17T09:51:56Z

    Enable dtests

commit 60927ae3a8c5c54b273a519110cc4c0540826d48
Author: Alex Petrov <ol...@...>
Date:   2018-09-13T08:39:01Z

    Simplify iteration in calculateRangesToFetchWithPreferredEndpoints
    
    Extract RangeRelocator
    
    Minor changes to calculateRangesToFetchWithPreferredEndpoints to improve readability:
    
      * short-circuit range.contains(), remove one level of nesting
      * remove and use isSufficient instead since it duplicates the source filters that are already applied (see usages of RangeStreamer.FailureDetectorSourceFilter and ExcludeLocalNodeFilter())
      * use explicit fullness checks instead of “any” check
    
    Simplify RangeRelocator code
    
    Further simplify calculateRangesToStreamWithEndpoints after Benedict’s comment
    
    Fix range relocation
    
    Simplify calculateStreamAndFetchRanges
    
    Unify request/transfer ranges interface (Added benefit of this change is that we have a check for non-intersecting ranges)
    
    Simplify iteration in calculateRangesToFetchWithPreferredEndpoints
    
    Extract RangeRelocator
    Simplify RangeRelocator code
    
    Minor changes to calculateRangesToFetchWithPreferredEndpoints to improve readability:
    
      * short-circuit range.contains(), remove one level of nesting
      * remove and use isSufficient instead since it duplicates the source filters that are already applied (see usages of RangeStreamer.FailureDetectorSourceFilter and ExcludeLocalNodeFilter())
      * use explicit fullness checks instead of “any” check
    
    Simplify calculateStreamAndFetchRanges
    
    Unify request/transfer ranges interface (Added benefit of this change is that we have a check for non-intersecting ranges)
    
    Improve error messages

commit 2b13c4a34b19a4de80eb60c9cd059b674a4b19d3
Author: Alex Petrov <ol...@...>
Date:   2018-09-17T12:17:33Z

    Switch dtest branch

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r220455656
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -424,62 +440,58 @@ else if (useStrictConsistency)
                      EndpointsForRange sources;
                      if (useStrictConsistency)
                      {
    -                     //Start with two sets of who replicates the range before and who replicates it after
    -                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                     logger.debug("Old endpoints {}", oldEndpoints);
    -                     logger.debug("New endpoints {}", newEndpoints);
    -
    +                     EndpointsForRange strictEndpoints;
                          //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
                          //So we need to be careful to only be strict when endpoints == RF
                          if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
                          {
    -                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         //Start with two sets of who replicates the range before and who replicates it after
    +                         EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                         logger.debug("Old endpoints {}", oldEndpoints);
    +                         logger.debug("New endpoints {}", newEndpoints);
    +
                              // Remove new endpoints from old endpoints based on address
    -                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -                         oldEndpoints.filter(testSourceFilters);
    +                         strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
     
    -                         if (oldEndpoints.size() > 1)
    -                             throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    +                         //We have to check the source filters here to see if they will remove any replicas
    +                         //required for strict consistency
    +                         if (!all(strictEndpoints, testSourceFilters))
    +                             throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
    +
    +                         if (strictEndpoints.size() > 1)
    --- End diff --
    
    We're not really filtering anymore at all. We can get here only in case test source filters didn't filter any nodes (otherwise we'd fail with the same assertion later). We've asserted that the resulting endpoint is not going to be yanked from under us.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on the issue:

    https://github.com/apache/cassandra/pull/269
  
    When you extracted RangeRelocator did you make any changes to it? It's hard to tell if there were changes to it. I need to know if I need to jury rig them side by side to compare.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218554481
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -328,7 +356,6 @@ else if (useStrictConsistency)
          * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
          * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
          * consistency.
    -     *
          **/
          public static EndpointsByReplica
          calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
    --- End diff --
    
    Why should people have to provide the not-self check if everyone needs it? Seems like we should just add it automatically.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219488682
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -338,165 +386,152 @@ else if (useStrictConsistency)
                                                       boolean useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> isAlive,
                                                       String keyspace,
    -                                                  Collection<Predicate<Replica>> sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end up with multiple types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch from all of them if they are there
    -                    //With transient replication and strict consistency this is to get the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range before and who replicates it after
    -                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    -                        //So we need to be careful to only be strict when endpoints == RF
    -                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node required to move the data consistently is down: "
    -                                                                + oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    -                            //since we are already a transient replica and the existing replica remains.
    -                            //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we need.
    -                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we include a matching node
    -                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to see if they will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on correctness so no point in fetching from
    -                        //a random full + transient replica since it's also likely to lose data
    -                        //Also apply testSourceFilters that were given to us so we can safely select a single source
    -                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
    -                        //Limit it to just the first possible source, we don't need more than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
    -             * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
    -                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data consistently is down");
    -                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
    -                                                        "Ensure this keyspace contains replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data consistently is down");
    -                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    +
    +         //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may end up with multiple types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull();
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from all of them if they are there
    +                 //With transient replication and strict consistency this is to get the full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range before and who replicates it after
    +                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    +                     //So we need to be careful to only be strict when endpoints == RF
    +                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based on address
    +                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218462328
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java ---
    @@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
         /**
          * Request data in {@code keyspace} and {@code ranges} from specific node.
          *
    +     * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
    +     * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
    +     *
    +     * At the other end the distinction between full and transient is ignored it just used the transient status
    +     * of the Replica objects we send to determine what to send. The real reason we have this split down to
    --- End diff --
    
    We care https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java#L151


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218470693
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java ---
    @@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
         /**
          * Request data in {@code keyspace} and {@code ranges} from specific node.
          *
    +     * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
    +     * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
    +     *
    +     * At the other end the distinction between full and transient is ignored it just used the transient status
    +     * of the Replica objects we send to determine what to send. The real reason we have this split down to
    --- End diff --
    
    Thanks, I assumed as much.  We should clarify the comment to explain why we care then, as it currently reads as though the information may be redundant.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218561162
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -600,39 +628,38 @@ public StreamResultFuture fetchAsync()
                 sources.asMap().forEach((source, fetchReplicas) -> {
     
                     // filter out already streamed ranges
    -                RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
    +                Pair<Set<Range<Token>>, Set<Range<Token>>> available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
     
                     Predicate<FetchReplica> isAvailable = fetch -> {
    -                    Replica availableRange =  available.byRange().get(fetch.local.range());
    -                    if (availableRange == null)
    +                    boolean isInFull = available.left.contains(fetch.local.range());
    +                    boolean isInTrans = available.right.contains(fetch.local.range());
    +
    +                    if (!isInFull && !isInTrans)
                             //Range is unavailable
                             return false;
    +
    +                    assert isInFull != isInTrans : "Range can't be simultaneously full and transient: " + isInFull + " " + isInTrans;
    --- End diff --
    
    It can right? Isn't that the point of this whole exercise? We need to fetch it twice and it might have already been fetched both fully and transiently.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219271571
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -265,10 +318,11 @@ public void addRanges(String keyspaceName, ReplicaCollection<?> replicas)
                 workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
             }
     
    -        toFetch.put(keyspaceName, workMap);
    -        for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
    +        assert toFetch.put(keyspaceName, workMap) == null : "Keyspace is already added to fetch map";
    --- End diff --
    
    If asserts are disabled this doesn't work :-P


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r220456105
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -424,62 +440,58 @@ else if (useStrictConsistency)
                      EndpointsForRange sources;
                      if (useStrictConsistency)
                      {
    -                     //Start with two sets of who replicates the range before and who replicates it after
    -                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                     logger.debug("Old endpoints {}", oldEndpoints);
    -                     logger.debug("New endpoints {}", newEndpoints);
    -
    +                     EndpointsForRange strictEndpoints;
                          //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
                          //So we need to be careful to only be strict when endpoints == RF
                          if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
                          {
    -                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         //Start with two sets of who replicates the range before and who replicates it after
    +                         EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                         logger.debug("Old endpoints {}", oldEndpoints);
    +                         logger.debug("New endpoints {}", newEndpoints);
    +
                              // Remove new endpoints from old endpoints based on address
    -                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -                         oldEndpoints.filter(testSourceFilters);
    +                         strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
     
    -                         if (oldEndpoints.size() > 1)
    -                             throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    +                         //We have to check the source filters here to see if they will remove any replicas
    +                         //required for strict consistency
    +                         if (!all(strictEndpoints, testSourceFilters))
    +                             throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
    +
    +                         if (strictEndpoints.size() > 1)
    --- End diff --
    
    Actually, I do see your point. If it's about ordering of error messages, it makes sense that this one takes precedence.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218382719
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -328,7 +356,6 @@ else if (useStrictConsistency)
          * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
          * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
          * consistency.
    -     *
          **/
          public static EndpointsByReplica
          calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity,
    --- End diff --
    
    A comment specifying the requirement to now provide filters for liveness and not-self checks would help prevent future misuse, and make it easier to read the logic.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218382891
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -87,8 +85,8 @@
         private final InetAddressAndPort address;
         /* streaming description */
         private final String description;
    -    private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create();
    -    private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
    +    private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>();
    +    private final Set<SourceFilter> sourceFilters = new HashSet<>();
    --- End diff --
    
    Does this need to be a Set?  (I realise this is old, but it doesn't make much sense AFAICT)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218731160
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -600,39 +628,38 @@ public StreamResultFuture fetchAsync()
                 sources.asMap().forEach((source, fetchReplicas) -> {
     
                     // filter out already streamed ranges
    -                RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
    +                Pair<Set<Range<Token>>, Set<Range<Token>>> available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
     
                     Predicate<FetchReplica> isAvailable = fetch -> {
    -                    Replica availableRange =  available.byRange().get(fetch.local.range());
    -                    if (availableRange == null)
    +                    boolean isInFull = available.left.contains(fetch.local.range());
    +                    boolean isInTrans = available.right.contains(fetch.local.range());
    +
    +                    if (!isInFull && !isInTrans)
                             //Range is unavailable
                             return false;
    +
    +                    assert isInFull != isInTrans : "Range can't be simultaneously full and transient: " + isInFull + " " + isInTrans;
    +
                         if (fetch.local.isFull())
                             //For full, pick only replicas with matching transientness
    -                        return availableRange.isFull() == fetch.remote.isFull();
    +                        return isInFull == fetch.remote.isFull();
     
                         // Any transient or full will do
                         return true;
                     };
     
                     List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
     
    -                if (remaining.size() < available.size())
    +                if (remaining.size() < available.left.size() + available.right.size())
                     {
                         List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
                         logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
    -                                fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges());
    +                                fetchReplicas, skipped, available.left, available.right);
                     }
     
                     if (logger.isTraceEnabled())
                         logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
     
    -                //At the other end the distinction between full and transient is ignored it just used the transient status
    --- End diff --
    
    I haven't removed it, I just moved it to the common callee. I can add an additional comment on both call sites.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219415043
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -338,165 +386,152 @@ else if (useStrictConsistency)
                                                       boolean useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> isAlive,
                                                       String keyspace,
    -                                                  Collection<Predicate<Replica>> sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end up with multiple types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch from all of them if they are there
    -                    //With transient replication and strict consistency this is to get the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range before and who replicates it after
    -                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    -                        //So we need to be careful to only be strict when endpoints == RF
    -                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node required to move the data consistently is down: "
    -                                                                + oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    -                            //since we are already a transient replica and the existing replica remains.
    -                            //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we need.
    -                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we include a matching node
    -                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to see if they will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on correctness so no point in fetching from
    -                        //a random full + transient replica since it's also likely to lose data
    -                        //Also apply testSourceFilters that were given to us so we can safely select a single source
    -                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
    -                        //Limit it to just the first possible source, we don't need more than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
    -             * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
    -                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data consistently is down");
    -                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
    -                                                        "Ensure this keyspace contains replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data consistently is down");
    -                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    +
    +         //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may end up with multiple types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull();
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from all of them if they are there
    +                 //With transient replication and strict consistency this is to get the full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range before and who replicates it after
    +                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    +                     //So we need to be careful to only be strict when endpoints == RF
    +                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based on address
    +                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    --- End diff --
    
    Can call `oldEndpoints.without(endpointsStillReplicated)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218388108
  
    --- Diff: src/java/org/apache/cassandra/dht/StreamStateStore.java ---
    @@ -54,8 +56,10 @@ public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partiti
         @VisibleForTesting
         public boolean isDataAvailable(String keyspace, Token token)
         {
    -        RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
    -        return availableRanges.ranges().stream().anyMatch(range -> range.contains(token));
    +        Pair<Set<Range<Token>>, Set<Range<Token>>> availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
    +
    +        return Streams.concat(availableRanges.left.stream(),
    +                              availableRanges.right.stream()).anyMatch(range -> range.contains(token));
    --- End diff --
    
    It would be clearer IMO to have .anyMatch indent on the newline, preferably inline with .concat() - I initially read this as only performing the anyMatch on right, and was trying to figure out why


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219281323
  
    --- Diff: src/java/org/apache/cassandra/service/RangeRelocator.java ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.service;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Multimap;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.db.Keyspace;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.RangeStreamer;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.gms.FailureDetector;
    +import org.apache.cassandra.locator.AbstractReplicationStrategy;
    +import org.apache.cassandra.locator.EndpointsByReplica;
    +import org.apache.cassandra.locator.EndpointsForRange;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.locator.RangesAtEndpoint;
    +import org.apache.cassandra.locator.RangesByEndpoint;
    +import org.apache.cassandra.locator.Replica;
    +import org.apache.cassandra.locator.TokenMetadata;
    +import org.apache.cassandra.streaming.StreamOperation;
    +import org.apache.cassandra.streaming.StreamPlan;
    +import org.apache.cassandra.streaming.StreamState;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.Pair;
    +
    +@VisibleForTesting
    +public class RangeRelocator
    +{
    +    private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
    +
    +    private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
    +    private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +    private final TokenMetadata tokenMetaCloneAllSettled;
    +    // clone to avoid concurrent modification in calculateNaturalReplicas
    +    private final TokenMetadata tokenMetaClone;
    +    private final Collection<Token> tokens;
    +    private final List<String> keyspaceNames;
    +
    +
    +    RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
    +    {
    +        this.tokens = tokens;
    +        this.keyspaceNames = keyspaceNames;
    +        this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
    +        // clone to avoid concurrent modification in calculateNaturalReplicas
    +        this.tokenMetaClone = tmd.cloneOnlyTokenMap();
    +    }
    +
    +    @VisibleForTesting
    +    public RangeRelocator()
    +    {
    +        this.tokens = null;
    +        this.keyspaceNames = null;
    +        this.tokenMetaCloneAllSettled = null;
    +        this.tokenMetaClone = null;
    +    }
    +
    +    /**
    +     * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
    +     */
    +    private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges,
    +                                                                                                                         AbstractReplicationStrategy strategy,
    +                                                                                                                         String keyspace,
    +                                                                                                                         TokenMetadata tmdBefore,
    +                                                                                                                         TokenMetadata tmdAfter)
    +    {
    +        EndpointsByReplica preferredEndpoints =
    +        RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity,
    +                                                                   strategy,
    +                                                                   fetchRanges,
    +                                                                   StorageService.useStrictConsistency,
    +                                                                   tmdBefore,
    +                                                                   tmdAfter,
    +                                                                   keyspace,
    +                                                                   Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance),
    +                                                                                 new RangeStreamer.ExcludeLocalNodeFilter()));
    +        return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
    +    }
    +
    +    /**
    +     * calculating endpoints to stream current ranges to if needed
    +     * in some situations node will handle current ranges as part of the new ranges
    +     **/
    +    public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
    +                                                                        AbstractReplicationStrategy strat,
    +                                                                        TokenMetadata tmdBefore,
    +                                                                        TokenMetadata tmdAfter)
    +    {
    +        RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
    +        for (Replica toStream : streamRanges)
    +        {
    +            //If the range we are sending is full only send it to the new full replica
    +            //There will also be a new transient replica we need to send the data to, but not
    +            //the repaired data
    +            EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
    +            EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
    +            logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, oldEndpoints, newEndpoints);
    +
    +            for (Replica newEndpoint : newEndpoints)
    +            {
    +                Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint());
    +
    +                // Nothing to do
    +                if (newEndpoint.equals(oldEndpoint))
    +                    continue;
    +
    +                // Completely new range for this endpoint
    +                if (oldEndpoint == null)
    +                {
    +                    if (toStream.isTransient() && newEndpoint.isFull())
    +                        throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", newEndpoint, toStream));
    +
    +                    for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range()))
    +                    {
    +                        endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection));
    +                    }
    +                }
    +                else
    +                {
    +                    Set<Range<Token>> subsToStream = Collections.singleton(toStream.range());
    +
    +                    //First subtract what we already have
    +                    if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull())
    +                        subsToStream = toStream.range().subtract(oldEndpoint.range());
    +
    +                    //Now we only stream what is still replicated
    +                    subsToStream.stream()
    +                                .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream())
    +                                .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(tokenRange)));
    +                }
    +            }
    +        }
    +        return endpointRanges.asImmutableView();
    +    }
    +
    +    public void calculateToFromStreams()
    +    {
    +        logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled);
    +
    +        for (String keyspace : keyspaceNames)
    +        {
    +            // replication strategy of the current keyspace
    +            AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
    +
    +            logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
    +            //From what I have seen we only ever call this with a single token from StorageService.move(Token)
    +            for (Token newToken : tokens)
    +            {
    +                Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
    +                if (currentTokens.size() > 1 || currentTokens.isEmpty())
    +                {
    +                    throw new AssertionError("Unexpected current tokens: " + currentTokens);
    +                }
    +
    +                // calculated parts of the ranges to request/stream from/to nodes in the ring
    +                Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges;
    +
    +                //In the single node token move there is nothing to do and Range subtraction is broken
    +                //so it's easier to just identify this case up front.
    +                if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
    +)).size() > 1)
    +                {
    +                    // getting collection of the currently used ranges by this keyspace
    +                    RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
    +
    +                    // collection of ranges which this node will serve after move to the new token
    +                    RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
    +
    +                    streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
    +                }
    +                else
    +                {
    +                     streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
    +                }
    +
    +                RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
    +                logger.info("Endpoint ranges to stream to " + rangesToStream);
    +
    +                // stream ranges
    +                for (InetAddressAndPort address : rangesToStream.keySet())
    +                {
    +                    logger.debug("Will stream range {} of keyspace {} to endpoint {}", rangesToStream.get(address), keyspace, address);
    +                    RangesAtEndpoint ranges = rangesToStream.get(address);
    +                    streamPlan.transferRanges(address, keyspace, ranges);
    +                }
    +
    +                Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch = calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace, tokenMetaClone, tokenMetaCloneAllSettled);
    +
    +                // stream requests
    +                rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> {
    +                    RangesAtEndpoint full = sourceAndOurReplicas.stream()
    +                            .filter(pair -> pair.remote.isFull())
    +                            .map(pair -> pair.local)
    +                            .collect(RangesAtEndpoint.collector(localAddress));
    +                    RangesAtEndpoint trans = sourceAndOurReplicas.stream()
    +                            .filter(pair -> pair.remote.isTransient())
    +                            .map(pair -> pair.local)
    +                            .collect(RangesAtEndpoint.collector(localAddress));
    +                    logger.debug("Will request range {} of keyspace {} from endpoint {}", rangesToFetch.get(address), keyspace, address);
    +                    streamPlan.requestRanges(address, keyspace, full, trans);
    +                });
    +
    +                logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Calculate pair of ranges to stream/fetch for given two range collections
    +     * (current ranges for keyspace and ranges after move to new token)
    +     *
    +     * With transient replication the added wrinkle is that if a range transitions from full to transient then
    +     * we need to stream the range despite the fact that we are retaining it as transient. Some replica
    +     * somewhere needs to transition from transient to full and we will be the source.
    +     *
    +     * If the range is transient and is transitioning to full then always fetch even if the range was already transient
    +     * since a transiently replicated obviously needs to fetch data to become full.
    +     *
    +     * This why there is a continue after checking for instersection because intersection is not sufficient reason
    +     * to do the subtraction since we might need to stream/fetch data anyways.
    +     *
    +     * @param currentRanges collection of the ranges by current token
    +     * @param updatedRanges collection of the ranges after token is changed
    +     * @return pair of ranges to stream/fetch for given current and updated range collections
    +     */
    +    public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint currentRanges, RangesAtEndpoint updatedRanges)
    +    {
    +        // FIXME: transient replication
    --- End diff --
    
    Can you remove this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218387701
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -1288,24 +1288,25 @@ public static synchronized void updateAvailableRanges(String keyspace, Collectio
         /**
          * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from.
          */
    -    public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
    +    public static synchronized Pair<Set<Range<Token>>, Set<Range<Token>>> getAvailableRanges(String keyspace, IPartitioner partitioner)
    --- End diff --
    
    Could we introduce a concrete class for this concept, of full/transient ranges?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen closed the pull request at:

    https://github.com/apache/cassandra/pull/269


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218387370
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java ---
    @@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
         /**
          * Request data in {@code keyspace} and {@code ranges} from specific node.
          *
    +     * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
    +     * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
    +     *
    +     * At the other end the distinction between full and transient is ignored it just used the transient status
    +     * of the Replica objects we send to determine what to send. The real reason we have this split down to
    --- End diff --
    
    "At the other end the distinction between full and transient is ignored it just used the transient status of the Replica objects we send to determine what to send"
    
    This isn't terribly clear - perhaps
    
    "At the other end the distinction between full and transient is ignored; it is used only to create the Replica objects that are used to determine what to send"
    
    This does raise the question of whether we should be sending Replica objects at all?  Presumably we *do* look at the transient/full status, in some way?  Or do we only care about the Range<Token>?  In which case, perhaps we should just send that, and avoid the confusion?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219279196
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -473,7 +472,7 @@ else if (useStrictConsistency)
                          }
     
                          //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
    --- End diff --
    
    The comment no longer applies


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219278292
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -446,16 +447,14 @@ else if (useStrictConsistency)
                              //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
                              //So it's an error if we don't find what we need.
                              if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                         {
                                  throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                         }
     
    -                         if (!any(oldEndpoints, isSufficient))
    +                         if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient()))
    --- End diff --
    
    This doesn't seem clearer to me as a more complex boolean expression.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219278572
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -446,16 +447,14 @@ else if (useStrictConsistency)
                              //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
                              //So it's an error if we don't find what we need.
                              if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                         {
                                  throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                         }
     
    -                         if (!any(oldEndpoints, isSufficient))
    +                         if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient()))
                              {
                                  // need an additional replica
                                  EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
                                  // include all our filters, to ensure we include a matching node
    -                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(isSufficient, testSourceFilters)).toJavaUtil();
    +                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(Replica::isFull, testSourceFilters)).toJavaUtil();
    --- End diff --
    
    Sure you can switch to using Replica::isFull, but isSufficient is still logically what we are looking for it just happens to be that nested this deep we have already eliminated part of the expression. I don't think this makes it clearer and it pairs nicely if you use isSufficient above.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219488307
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -265,10 +318,11 @@ public void addRanges(String keyspaceName, ReplicaCollection<?> replicas)
                 workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName);
             }
     
    -        toFetch.put(keyspaceName, workMap);
    -        for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet())
    +        assert toFetch.put(keyspaceName, workMap) == null : "Keyspace is already added to fetch map";
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218433645
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -337,165 +364,167 @@ else if (useStrictConsistency)
                                                       boolean useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> isAlive,
                                                       String keyspace,
    -                                                  Collection<Predicate<Replica>> sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end up with multiple types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch from all of them if they are there
    -                    //With transient replication and strict consistency this is to get the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range before and who replicates it after
    -                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    -                        //So we need to be careful to only be strict when endpoints == RF
    -                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node required to move the data consistently is down: "
    -                                                                + oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    -                            //since we are already a transient replica and the existing replica remains.
    -                            //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we need.
    -                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we include a matching node
    -                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to see if they will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on correctness so no point in fetching from
    -                        //a random full + transient replica since it's also likely to lose data
    -                        //Also apply testSourceFilters that were given to us so we can safely select a single source
    -                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
    -                        //Limit it to just the first possible source, we don't need more than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
    -             * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
    -                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data consistently is down");
    -                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
    -                                                        "Ensure this keyspace contains replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data consistently is down");
    -                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    +
    +         //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may end up with multiple types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from all of them if they are there
    +                 //With transient replication and strict consistency this is to get the full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range before and who replicates it after
    +                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    +                     //So we need to be careful to only be strict when endpoints == RF
    +                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based on address
    +                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    +
    +                         if (oldEndpoints.size() > 1)
    +                             throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    +
    +                         //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    +                         //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    +                         //since we are already a transient replica and the existing replica remains.
    +                         //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    +                         //So it's an error if we don't find what we need.
    +                         if (oldEndpoints.isEmpty() && toFetch.isTransient())
    +                         {
    +                             throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    +                         }
    +
    +                         if (!any(oldEndpoints, isSufficient))
    +                         {
    +                             // need an additional replica
    +                             EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    +                             // include all our filters, to ensure we include a matching node
    +                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(isSufficient, testSourceFilters)).toJavaUtil();
    +                             if (fullReplica.isPresent())
    +                                 oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    +                             else
    +                                 throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    +                         }
    +
    +                         //We have to check the source filters here to see if they will remove any replicas
    +                         //required for strict consistency
    +                         if (!all(oldEndpoints, testSourceFilters))
    +                         {
    +                             StringBuilder failureMessage = new StringBuilder();
    --- End diff --
    
    +1 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219413764
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -446,16 +447,14 @@ else if (useStrictConsistency)
                              //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
                              //So it's an error if we don't find what we need.
                              if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                         {
                                  throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                         }
     
    -                         if (!any(oldEndpoints, isSufficient))
    +                         if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient()))
    --- End diff --
    
    I definitely agree the old expression is clearer.
    
    If you don't like negating `any` (though I find it clearer than `none`, personally, as it reads "it's not the case that any endpoints is/are sufficient" as opposed to "none endpoints is/are sufficient"), I've proposed introducing `any` `all` and `none` in [14726](https://issues.apache.org/jira/browse/CASSANDRA-14726)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218547170
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -600,39 +628,38 @@ public StreamResultFuture fetchAsync()
                 sources.asMap().forEach((source, fetchReplicas) -> {
     
                     // filter out already streamed ranges
    -                RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
    +                Pair<Set<Range<Token>>, Set<Range<Token>>> available = stateStore.getAvailableRanges(keyspace, metadata.partitioner);
     
                     Predicate<FetchReplica> isAvailable = fetch -> {
    -                    Replica availableRange =  available.byRange().get(fetch.local.range());
    -                    if (availableRange == null)
    +                    boolean isInFull = available.left.contains(fetch.local.range());
    +                    boolean isInTrans = available.right.contains(fetch.local.range());
    +
    +                    if (!isInFull && !isInTrans)
                             //Range is unavailable
                             return false;
    +
    +                    assert isInFull != isInTrans : "Range can't be simultaneously full and transient: " + isInFull + " " + isInTrans;
    +
                         if (fetch.local.isFull())
                             //For full, pick only replicas with matching transientness
    -                        return availableRange.isFull() == fetch.remote.isFull();
    +                        return isInFull == fetch.remote.isFull();
     
                         // Any transient or full will do
                         return true;
                     };
     
                     List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList());
     
    -                if (remaining.size() < available.size())
    +                if (remaining.size() < available.left.size() + available.right.size())
                     {
                         List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList());
                         logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}",
    -                                fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges());
    +                                fetchReplicas, skipped, available.left, available.right);
                     }
     
                     if (logger.isTraceEnabled())
                         logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", "));
     
    -                //At the other end the distinction between full and transient is ignored it just used the transient status
    --- End diff --
    
    Is it clearer with this comment removed?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219410710
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -446,16 +447,14 @@ else if (useStrictConsistency)
                              //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
                              //So it's an error if we don't find what we need.
                              if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                         {
                                  throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                         }
     
    -                         if (!any(oldEndpoints, isSufficient))
    +                         if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient()))
    --- End diff --
    
    My main problem with `!any` is that it negates. Iteration is harder to reason about when negated in my opinion. But this is fairly minor so I'm fine with leaving it as it was


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218547456
  
    --- Diff: src/java/org/apache/cassandra/locator/RangesAtEndpoint.java ---
    @@ -302,6 +303,11 @@ public static RangesAtEndpoint toDummyList(Collection<Range<Token>> ranges)
                     .collect(collector(dummy));
         }
     
    +    public static boolean isDummyList(RangesAtEndpoint ranges)
    --- End diff --
    
    How does all replicas being local mean it's a dummy list? Only the 2nd half of this predicate is dummy list related.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on the issue:

    https://github.com/apache/cassandra/pull/269
  
    Looks mostly solid. There is one real bug in the assertion, and the filter that doesn't do anything and doesn't seem necessary. Two comments that can be deleted. And the nit about how I think isSufficient really is clearer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218432647
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -87,8 +85,8 @@
         private final InetAddressAndPort address;
         /* streaming description */
         private final String description;
    -    private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create();
    -    private final Set<Predicate<Replica>> sourceFilters = new HashSet<>();
    +    private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>();
    +    private final Set<SourceFilter> sourceFilters = new HashSet<>();
    --- End diff --
    
    You're right it doesn't. Filtering is idempotent (e.g. first op will filter the element out and subsequent ones will just be no-op)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218463068
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamPlan.java ---
    @@ -70,6 +70,16 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
         /**
          * Request data in {@code keyspace} and {@code ranges} from specific node.
          *
    +     * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
    +     * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
    +     *
    +     * At the other end the distinction between full and transient is ignored it just used the transient status
    +     * of the Replica objects we send to determine what to send. The real reason we have this split down to
    --- End diff --
    
    We also care about it being split up into two collections https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/dht/StreamStateStore.java#L81
    
    It's used to determine what remote nodes have already provided us.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219277526
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -436,6 +436,7 @@ else if (useStrictConsistency)
                              Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
                              // Remove new endpoints from old endpoints based on address
                              oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    +                         oldEndpoints.filter(testSourceFilters);
    --- End diff --
    
    Uh, this doesn't store the result and the tests all still pass so it wasn't doing anything anyways?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218371266
  
    --- Diff: test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---
    @@ -125,21 +125,19 @@ public boolean isAlive(InetAddressAndPort ep)
             s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
     
     
    -        Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName);
    +        Multimap<InetAddressAndPort, FetchReplica> toFetch = s.toFetch().get(keyspaceName);
     
             // Check we get get RF new ranges in total
    -        long rangesCount = toFetch.stream()
    -               .map(Multimap::values)
    -               .flatMap(Collection::stream)
    -               .map(f -> f.remote)
    -               .map(Replica::range)
    -               .count();
    +        long rangesCount = toFetch.values().stream()
    --- End diff --
    
    surely we can just take toFetch.size()?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219410357
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -446,16 +447,14 @@ else if (useStrictConsistency)
                              //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
                              //So it's an error if we don't find what we need.
                              if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                         {
                                  throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                         }
     
    -                         if (!any(oldEndpoints, isSufficient))
    +                         if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient()))
                              {
                                  // need an additional replica
                                  EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
                                  // include all our filters, to ensure we include a matching node
    -                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(isSufficient, testSourceFilters)).toJavaUtil();
    +                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(Replica::isFull, testSourceFilters)).toJavaUtil();
    --- End diff --
    
    Switched back to `isSufficient` here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218383055
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -337,165 +364,167 @@ else if (useStrictConsistency)
                                                       boolean useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> isAlive,
                                                       String keyspace,
    -                                                  Collection<Predicate<Replica>> sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end up with multiple types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch from all of them if they are there
    -                    //With transient replication and strict consistency this is to get the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range before and who replicates it after
    -                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    -                        //So we need to be careful to only be strict when endpoints == RF
    -                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node required to move the data consistently is down: "
    -                                                                + oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    -                            //since we are already a transient replica and the existing replica remains.
    -                            //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we need.
    -                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we include a matching node
    -                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to see if they will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on correctness so no point in fetching from
    -                        //a random full + transient replica since it's also likely to lose data
    -                        //Also apply testSourceFilters that were given to us so we can safely select a single source
    -                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
    -                        //Limit it to just the first possible source, we don't need more than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
    -             * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
    -                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data consistently is down");
    -                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
    -                                                        "Ensure this keyspace contains replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data consistently is down");
    -                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    +
    +         //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may end up with multiple types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from all of them if they are there
    +                 //With transient replication and strict consistency this is to get the full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range before and who replicates it after
    +                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    +                     //So we need to be careful to only be strict when endpoints == RF
    +                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based on address
    +                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    +
    +                         if (oldEndpoints.size() > 1)
    +                             throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    +
    +                         //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    +                         //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    +                         //since we are already a transient replica and the existing replica remains.
    +                         //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    +                         //So it's an error if we don't find what we need.
    +                         if (oldEndpoints.isEmpty() && toFetch.isTransient())
    +                         {
    +                             throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    +                         }
    +
    +                         if (!any(oldEndpoints, isSufficient))
    +                         {
    +                             // need an additional replica
    +                             EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    +                             // include all our filters, to ensure we include a matching node
    +                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(isSufficient, testSourceFilters)).toJavaUtil();
    +                             if (fullReplica.isPresent())
    +                                 oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    +                             else
    +                                 throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    +                         }
    +
    +                         //We have to check the source filters here to see if they will remove any replicas
    +                         //required for strict consistency
    +                         if (!all(oldEndpoints, testSourceFilters))
    +                         {
    +                             StringBuilder failureMessage = new StringBuilder();
    +                             for (Replica r : oldEndpoints)
    +                             {
    +                                 for (SourceFilter filter : sourceFilters)
    +                                 {
    +                                     if (!filter.apply(r))
    +                                     {
    +                                         failureMessage.append(filter.message(r));
    +                                         break;
    +                                     }
    +                                 }
    +                             }
    +                             throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + failureMessage);
    +                         }
    +                     }
    +                     else
    +                     {
    +                         oldEndpoints = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters)));
    --- End diff --
    
    We are now filtering testSourceFilters twice, once here, and once on L468


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219488654
  
    --- Diff: src/java/org/apache/cassandra/service/RangeRelocator.java ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.service;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Multimap;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.db.Keyspace;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.RangeStreamer;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.gms.FailureDetector;
    +import org.apache.cassandra.locator.AbstractReplicationStrategy;
    +import org.apache.cassandra.locator.EndpointsByReplica;
    +import org.apache.cassandra.locator.EndpointsForRange;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.locator.RangesAtEndpoint;
    +import org.apache.cassandra.locator.RangesByEndpoint;
    +import org.apache.cassandra.locator.Replica;
    +import org.apache.cassandra.locator.TokenMetadata;
    +import org.apache.cassandra.streaming.StreamOperation;
    +import org.apache.cassandra.streaming.StreamPlan;
    +import org.apache.cassandra.streaming.StreamState;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.Pair;
    +
    +@VisibleForTesting
    +public class RangeRelocator
    +{
    +    private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
    +
    +    private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
    +    private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +    private final TokenMetadata tokenMetaCloneAllSettled;
    +    // clone to avoid concurrent modification in calculateNaturalReplicas
    +    private final TokenMetadata tokenMetaClone;
    +    private final Collection<Token> tokens;
    +    private final List<String> keyspaceNames;
    +
    +
    +    RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd)
    +    {
    +        this.tokens = tokens;
    +        this.keyspaceNames = keyspaceNames;
    +        this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
    +        // clone to avoid concurrent modification in calculateNaturalReplicas
    +        this.tokenMetaClone = tmd.cloneOnlyTokenMap();
    +    }
    +
    +    @VisibleForTesting
    +    public RangeRelocator()
    +    {
    +        this.tokens = null;
    +        this.keyspaceNames = null;
    +        this.tokenMetaCloneAllSettled = null;
    +        this.tokenMetaClone = null;
    +    }
    +
    +    /**
    +     * Wrapper that supplies accessors to the real implementations of the various dependencies for this method
    +     */
    +    private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges,
    +                                                                                                                         AbstractReplicationStrategy strategy,
    +                                                                                                                         String keyspace,
    +                                                                                                                         TokenMetadata tmdBefore,
    +                                                                                                                         TokenMetadata tmdAfter)
    +    {
    +        EndpointsByReplica preferredEndpoints =
    +        RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity,
    +                                                                   strategy,
    +                                                                   fetchRanges,
    +                                                                   StorageService.useStrictConsistency,
    +                                                                   tmdBefore,
    +                                                                   tmdAfter,
    +                                                                   keyspace,
    +                                                                   Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance),
    +                                                                                 new RangeStreamer.ExcludeLocalNodeFilter()));
    +        return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
    +    }
    +
    +    /**
    +     * calculating endpoints to stream current ranges to if needed
    +     * in some situations node will handle current ranges as part of the new ranges
    +     **/
    +    public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges,
    +                                                                        AbstractReplicationStrategy strat,
    +                                                                        TokenMetadata tmdBefore,
    +                                                                        TokenMetadata tmdAfter)
    +    {
    +        RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
    +        for (Replica toStream : streamRanges)
    +        {
    +            //If the range we are sending is full only send it to the new full replica
    +            //There will also be a new transient replica we need to send the data to, but not
    +            //the repaired data
    +            EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore);
    +            EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter);
    +            logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, oldEndpoints, newEndpoints);
    +
    +            for (Replica newEndpoint : newEndpoints)
    +            {
    +                Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint());
    +
    +                // Nothing to do
    +                if (newEndpoint.equals(oldEndpoint))
    +                    continue;
    +
    +                // Completely new range for this endpoint
    +                if (oldEndpoint == null)
    +                {
    +                    if (toStream.isTransient() && newEndpoint.isFull())
    +                        throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", newEndpoint, toStream));
    +
    +                    for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range()))
    +                    {
    +                        endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection));
    +                    }
    +                }
    +                else
    +                {
    +                    Set<Range<Token>> subsToStream = Collections.singleton(toStream.range());
    +
    +                    //First subtract what we already have
    +                    if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull())
    +                        subsToStream = toStream.range().subtract(oldEndpoint.range());
    +
    +                    //Now we only stream what is still replicated
    +                    subsToStream.stream()
    +                                .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream())
    +                                .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(tokenRange)));
    +                }
    +            }
    +        }
    +        return endpointRanges.asImmutableView();
    +    }
    +
    +    public void calculateToFromStreams()
    +    {
    +        logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled);
    +
    +        for (String keyspace : keyspaceNames)
    +        {
    +            // replication strategy of the current keyspace
    +            AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
    +
    +            logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
    +            //From what I have seen we only ever call this with a single token from StorageService.move(Token)
    +            for (Token newToken : tokens)
    +            {
    +                Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
    +                if (currentTokens.size() > 1 || currentTokens.isEmpty())
    +                {
    +                    throw new AssertionError("Unexpected current tokens: " + currentTokens);
    +                }
    +
    +                // calculated parts of the ranges to request/stream from/to nodes in the ring
    +                Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges;
    +
    +                //In the single node token move there is nothing to do and Range subtraction is broken
    +                //so it's easier to just identify this case up front.
    +                if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
    +)).size() > 1)
    +                {
    +                    // getting collection of the currently used ranges by this keyspace
    +                    RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
    +
    +                    // collection of ranges which this node will serve after move to the new token
    +                    RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
    +
    +                    streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas);
    +                }
    +                else
    +                {
    +                     streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress));
    +                }
    +
    +                RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled);
    +                logger.info("Endpoint ranges to stream to " + rangesToStream);
    +
    +                // stream ranges
    +                for (InetAddressAndPort address : rangesToStream.keySet())
    +                {
    +                    logger.debug("Will stream range {} of keyspace {} to endpoint {}", rangesToStream.get(address), keyspace, address);
    +                    RangesAtEndpoint ranges = rangesToStream.get(address);
    +                    streamPlan.transferRanges(address, keyspace, ranges);
    +                }
    +
    +                Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch = calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace, tokenMetaClone, tokenMetaCloneAllSettled);
    +
    +                // stream requests
    +                rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> {
    +                    RangesAtEndpoint full = sourceAndOurReplicas.stream()
    +                            .filter(pair -> pair.remote.isFull())
    +                            .map(pair -> pair.local)
    +                            .collect(RangesAtEndpoint.collector(localAddress));
    +                    RangesAtEndpoint trans = sourceAndOurReplicas.stream()
    +                            .filter(pair -> pair.remote.isTransient())
    +                            .map(pair -> pair.local)
    +                            .collect(RangesAtEndpoint.collector(localAddress));
    +                    logger.debug("Will request range {} of keyspace {} from endpoint {}", rangesToFetch.get(address), keyspace, address);
    +                    streamPlan.requestRanges(address, keyspace, full, trans);
    +                });
    +
    +                logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Calculate pair of ranges to stream/fetch for given two range collections
    +     * (current ranges for keyspace and ranges after move to new token)
    +     *
    +     * With transient replication the added wrinkle is that if a range transitions from full to transient then
    +     * we need to stream the range despite the fact that we are retaining it as transient. Some replica
    +     * somewhere needs to transition from transient to full and we will be the source.
    +     *
    +     * If the range is transient and is transitioning to full then always fetch even if the range was already transient
    +     * since a transiently replicated obviously needs to fetch data to become full.
    +     *
    +     * This why there is a continue after checking for instersection because intersection is not sufficient reason
    +     * to do the subtraction since we might need to stream/fetch data anyways.
    +     *
    +     * @param currentRanges collection of the ranges by current token
    +     * @param updatedRanges collection of the ranges after token is changed
    +     * @return pair of ranges to stream/fetch for given current and updated range collections
    +     */
    +    public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint currentRanges, RangesAtEndpoint updatedRanges)
    +    {
    +        // FIXME: transient replication
    --- End diff --
    
    Done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219488550
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -446,16 +447,14 @@ else if (useStrictConsistency)
                              //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
                              //So it's an error if we don't find what we need.
                              if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                         {
                                  throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                         }
     
    -                         if (!any(oldEndpoints, isSufficient))
    +                         if (toFetch.isFull() && (oldEndpoints.isEmpty() || oldEndpoints.get(0).isTransient()))
    --- End diff --
    
    Ok, reverted to the old one


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on the issue:

    https://github.com/apache/cassandra/pull/269
  
    There were changes to {{calculateStreamAndFetchRanges}} and {{calculateRangesToStreamWithEndpoints}}...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219488342
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -436,6 +436,7 @@ else if (useStrictConsistency)
                              Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
                              // Remove new endpoints from old endpoints based on address
                              oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    +                         oldEndpoints.filter(testSourceFilters);
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r220336800
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -424,62 +440,58 @@ else if (useStrictConsistency)
                      EndpointsForRange sources;
                      if (useStrictConsistency)
                      {
    -                     //Start with two sets of who replicates the range before and who replicates it after
    -                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                     logger.debug("Old endpoints {}", oldEndpoints);
    -                     logger.debug("New endpoints {}", newEndpoints);
    -
    +                     EndpointsForRange strictEndpoints;
                          //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
                          //So we need to be careful to only be strict when endpoints == RF
                          if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
                          {
    -                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         //Start with two sets of who replicates the range before and who replicates it after
    +                         EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                         logger.debug("Old endpoints {}", oldEndpoints);
    +                         logger.debug("New endpoints {}", newEndpoints);
    +
                              // Remove new endpoints from old endpoints based on address
    -                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -                         oldEndpoints.filter(testSourceFilters);
    +                         strictEndpoints = oldEndpoints.without(newEndpoints.endpoints());
     
    -                         if (oldEndpoints.size() > 1)
    -                             throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    +                         //We have to check the source filters here to see if they will remove any replicas
    +                         //required for strict consistency
    +                         if (!all(strictEndpoints, testSourceFilters))
    +                             throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints));
    +
    +                         if (strictEndpoints.size() > 1)
    --- End diff --
    
    This no longer asserts what it did before if you are filtering before we get to this assertion. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #269: Review tr range movements

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r218383166
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -337,165 +364,167 @@ else if (useStrictConsistency)
                                                       boolean useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> isAlive,
                                                       String keyspace,
    -                                                  Collection<Predicate<Replica>> sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end up with multiple types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch from all of them if they are there
    -                    //With transient replication and strict consistency this is to get the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range before and who replicates it after
    -                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    -                        //So we need to be careful to only be strict when endpoints == RF
    -                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node required to move the data consistently is down: "
    -                                                                + oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    -                            //since we are already a transient replica and the existing replica remains.
    -                            //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we need.
    -                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we include a matching node
    -                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to see if they will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on correctness so no point in fetching from
    -                        //a random full + transient replica since it's also likely to lose data
    -                        //Also apply testSourceFilters that were given to us so we can safely select a single source
    -                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
    -                        //Limit it to just the first possible source, we don't need more than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses
    -             * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1))
    -                throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data consistently is down");
    -                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
    -                                                        "Ensure this keyspace contains replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data consistently is down");
    -                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    +
    +         //This list of replicas is just candidates. With strict consistency it's going to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may end up with multiple types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull());
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from all of them if they are there
    +                 //With transient replication and strict consistency this is to get the full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range before and who replicates it after
    +                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    +                     //So we need to be careful to only be strict when endpoints == RF
    +                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based on address
    +                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    +
    +                         if (oldEndpoints.size() > 1)
    +                             throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints);
    +
    +                         //If we are transitioning from transient to full and and the set of replicas for the range is not changing
    +                         //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely
    +                         //since we are already a transient replica and the existing replica remains.
    +                         //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore.
    +                         //So it's an error if we don't find what we need.
    +                         if (oldEndpoints.isEmpty() && toFetch.isTransient())
    +                         {
    +                             throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch);
    +                         }
    +
    +                         if (!any(oldEndpoints, isSufficient))
    +                         {
    +                             // need an additional replica
    +                             EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    +                             // include all our filters, to ensure we include a matching node
    +                             Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(isSufficient, testSourceFilters)).toJavaUtil();
    +                             if (fullReplica.isPresent())
    +                                 oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    +                             else
    +                                 throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange);
    +                         }
    +
    +                         //We have to check the source filters here to see if they will remove any replicas
    +                         //required for strict consistency
    +                         if (!all(oldEndpoints, testSourceFilters))
    +                         {
    +                             StringBuilder failureMessage = new StringBuilder();
    --- End diff --
    
    Maybe split into a helper?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org