You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by aweisberg <gi...@git.apache.org> on 2018/09/10 18:29:28 UTC

[GitHub] cassandra pull request #265: 14705

GitHub user aweisberg opened a pull request:

    https://github.com/apache/cassandra/pull/265

    14705

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/belliottsmith/cassandra 14705

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/cassandra/pull/265.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #265
    
----
commit 949ec9e9965c84c3c80daf677422acb23719212f
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T10:41:28Z

    [rR]eplicaLayout->[rR]eplicaPlan, selected() -> contact()

commit 8190c79d6c8bc3e1cfcc41ea0ee222fb3fc21231
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T10:45:28Z

    assureSufficientLiveNodes -> assureSufficientReplicas

commit 70d615f8c1831831d97582b6b526621fc3d0bbf0
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:28:01Z

    main refactor

commit 45230008ce9f12639d28b304531bd315b5637fd0
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:38:31Z

    remove get(Live)?(Sorted)?Natural(AndPending)?Replicas

commit e384d189c98877bfbf02cd8f168694bb3c012403
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:41:10Z

    fix speculation->rr_extra_read->rr_extra_write

commit f4eb2d245cf52f86fd9ef71ca2de0df3037fb3bf
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:43:00Z

    cleanup BatchLog.sendSingleReplayMutation

commit d3eb71cf3265055be739089bf34c0fed64dac24b
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:44:13Z

    fix: could speculate to nodes that would not increase consistencyLevel, resulting in timeout instead of overload

commit f59dfa83ad6de829f79406c46acc82acb84fb645
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:45:36Z

    initialise Mutable HashMap with capacity

commit 14fc69fe50178fffe6d8c0d79e60cc671aa29a88
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T11:47:55Z

    circleci

commit d45a68a319e488ed7ee016a747df1dfd734753bd
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T13:00:18Z

    comment nit

commit b49a088578292c807bc568e8a6481a7ac75fe489
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-07T13:04:17Z

    remove unused ForRangeWrite

commit aab19a9e6fa9f2a88fc4b15d888f27f9dffd90dd
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-08T12:20:03Z

    comments

commit a9aa8dce4ee80574e06cb699c4896c135b2830eb
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-09T17:25:14Z

    ensure consistent application of predicates whose value varies (e.g. FailureDetector)

commit 13af81ded19fe22fbf1e5095f1f217c80985a44f
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-09T19:42:18Z

    ReplicaPlan.Shared, for sharing a ReplicaPlan between Resolver, Executor and ReadRepair

commit 71669f0ee60e6b93a8e5e5169acc80bbfe87727e
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-10T14:51:16Z

    stop using ReplicaLayout inside ReplicaPlan - only use it for constructing a ReplicaPlan

commit 75cb98e860ed4f5a33723cde0f041150edc018cb
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-10T15:40:36Z

    fix infinite recursion in AbstractReadExecutor.replicaPlan; cleanup prior replicaplan commit

commit 23235b46abb071bf209cd3ea2ce0bec09c27f736
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-10T16:36:19Z

    remove ReplicaPlan.with{Contact,CL}, move to more concrete classes where possible ; remove another type parameter

commit a06fef7f769ed6b4e4f2bb1437c63e7f7fcd9601
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-10T16:40:10Z

    fix unnecessary test breakage

commit c80a06432237bb262f09ec907acf8451eaadb0b6
Author: Benedict Elliott Smith <be...@...>
Date:   2018-09-10T17:18:50Z

    some comments

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216912914
  
    --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
    @@ -184,30 +185,28 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
             SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
     
    -        // Endpoints for Token
    -        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
    +        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
     
             // Speculative retry is disabled *OR*
             // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
             if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
    -            // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
    -            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
    +            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
     
             // There are simply no extra replicas to speculate.
             // Handle this separately so it can record failed attempts to speculate due to lack of replicas
    -        if (replicaLayout.selected().size() == replicaLayout.all().size())
    +        if (replicaPlan.contact().size() == replicaPlan.candidates().size())
             {
                 boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
    -            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
    +            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
             }
     
             // If CL.ALL, upgrade to AlwaysSpeculating;
    --- End diff --
    
    No idea.  I don't understand either, but as you say, it looks like the comment is anyway stale.  I'll remove it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217012379
  
    --- Diff: src/java/org/apache/cassandra/service/reads/DigestResolver.java ---
    @@ -93,16 +93,14 @@ public PartitionIterator getData()
             {
                 // This path can be triggered only if we've got responses from full replicas and they match, but
                 // transient replica response still contains data, which needs to be reconciled.
    -            DataResolver<E, L> dataResolver = new DataResolver<>(command,
    -                                                                 replicaLayout,
    -                                                                 (ReadRepair<E, L>) NoopReadRepair.instance,
    -                                                                 queryStartNanoTime);
    +            DataResolver<E, P> dataResolver
    +                    = new DataResolver<>(command, replicaPlan, (ReadRepair<E, P>) NoopReadRepair.instance, queryStartNanoTime);
     
                 dataResolver.preprocess(dataResponse);
                 // Forward differences to all full nodes
    --- End diff --
    
    I think this one should be already fixed in CASSANDRA-14698


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217185707
  
    --- Diff: src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---
    @@ -189,28 +189,30 @@ public final void expired()
         /**
          * @return the minimum number of endpoints that must reply.
          */
    -    protected int totalBlockFor()
    +    protected int blockFor()
         {
             // During bootstrap, we have to include the pending endpoints or we may fail the consistency level
             // guarantees (see #833)
    -        return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending());
    +        return replicaPlan.blockFor();
         }
     
         /**
    +     * TODO: this method is brittle for its purpose of deciding when we should fail a query;
    --- End diff --
    
    Should this be a JIRA rather than a TODO or just get done?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217185167
  
    --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
    @@ -53,15 +53,16 @@
         protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
         private final int blockFor;
     
    -    BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime)
    +    BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<P> replicaPlan, long queryStartNanoTime)
         {
             super(command, replicaPlan, queryStartNanoTime);
    -        this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
    +        this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
         }
     
         public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
         {
    -        return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this);
    +        // TODO: why are we referencing a different replicaPlan here?
    --- End diff --
    
    OK, I think I understand, the original replica plan contacted nodes that never responded and weren't part of the read repair. It's a new replica plan that only includes the nodes that responded.. The consistency level happens to be unchanged across both of them. It would be less misdirecting if it fetched the consistency level from the plan that it is given.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216473356
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
    @@ -249,12 +229,32 @@ public int requiredParticipants()
          * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
          * @param <P>
          */
    -    public static class Shared<P extends ReplicaPlan<?, ?>>
    +    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
         {
    -        private P replicaPlan;
    -        public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
    -        public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; }
    -        public P get() { return replicaPlan; }
    +        public void addToContact(Replica replica);
    +        public P get();
    +        public abstract P getWithContact(E endpoints);
         }
     
    +    public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
    +    {
    +        private ForTokenRead replicaPlan;
    +        public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    +        public ForTokenRead get() { return replicaPlan; }
    +        public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
    +    }
    +
    +    public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
    +    {
    +        private ForRangeRead replicaPlan;
    +        public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    +        public ForRangeRead get() { return replicaPlan; }
    +        public ForRangeRead getWithContact(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); }
    --- End diff --
    
    Wait, so it's not shared now? We made a new one without updating the reference? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216913748
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
    @@ -249,12 +229,32 @@ public int requiredParticipants()
          * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
          * @param <P>
          */
    -    public static class Shared<P extends ReplicaPlan<?, ?>>
    +    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
         {
    -        private P replicaPlan;
    -        public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
    -        public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; }
    -        public P get() { return replicaPlan; }
    +        public void addToContact(Replica replica);
    +        public P get();
    +        public abstract P getWithContact(E endpoints);
         }
     
    +    public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
    +    {
    +        private ForTokenRead replicaPlan;
    +        public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    +        public ForTokenRead get() { return replicaPlan; }
    +        public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
    +    }
    +
    +    public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
    +    {
    +        private ForRangeRead replicaPlan;
    +        public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    +        public ForRangeRead get() { return replicaPlan; }
    +        public ForRangeRead getWithContact(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); }
    --- End diff --
    
    This is a getter for the case where we don't want to update the reference.  I will remove it to avoid any ambiguity.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216913492
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
    @@ -249,12 +229,32 @@ public int requiredParticipants()
          * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
          * @param <P>
          */
    -    public static class Shared<P extends ReplicaPlan<?, ?>>
    +    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
         {
    -        private P replicaPlan;
    -        public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
    -        public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; }
    -        public P get() { return replicaPlan; }
    +        public void addToContact(Replica replica);
    +        public P get();
    +        public abstract P getWithContact(E endpoints);
         }
     
    +    public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
    +    {
    +        private ForTokenRead replicaPlan;
    +        public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    +        public ForTokenRead get() { return replicaPlan; }
    +        public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
    +    }
    +
    +    public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
    +    {
    +        private ForRangeRead replicaPlan;
    +        public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    --- End diff --
    
    It does; in general the non-pluralised form of contact doesn't work well with the method names.  Perhaps I should just pluralise and be done with it?  Not sure why I was averse to this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217173928
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
    @@ -237,4 +237,17 @@ public int requiredParticipants()
             }
         }
     
    +    /**
    +     * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contact' replicas
    +     * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
    +     * @param <P>
    +     */
    +    public static class Shared<P extends ReplicaPlan<?, ?, ?>>
    --- End diff --
    
    I'm not sure I really understand the question or criticism.  Could it be mutable?  For starters, it would require a lot of other changes to ReplicaPlan and ReplicaCollection, that don't seem warranted for this single use case.  Besides that, yes, but this would be less threadsafe (this version is actually perfectly safe to have multiple readers, as we do, they just may not see the latest 'contacts' value promptly).  I'd be happy to upgrade it to fully threadsafe if it would make us happier, but this was changed to non-threadsafe specifically in response to prior feedback about the old code that it unnecessarily used volatile.  It would be trivial to make it fully threadsafe, and the cost would actually be fairly low, and while I'm fairly sure it isn't needed, it would be easier to reason about.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216445669
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
    @@ -237,4 +237,17 @@ public int requiredParticipants()
             }
         }
     
    +    /**
    +     * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contact' replicas
    +     * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
    +     * @param <P>
    +     */
    +    public static class Shared<P extends ReplicaPlan<?, ?, ?>>
    --- End diff --
    
    This is kind of a weird place it end up in terms of mutability and immutability. It's not thread safe and if it's supposed to be always referencing the latest instance why immutable?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217175374
  
    --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
    @@ -53,15 +53,16 @@
         protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
         private final int blockFor;
     
    -    BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime)
    +    BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<P> replicaPlan, long queryStartNanoTime)
         {
             super(command, replicaPlan, queryStartNanoTime);
    -        this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
    +        this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
         }
     
         public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
         {
    -        return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this);
    +        // TODO: why are we referencing a different replicaPlan here?
    --- End diff --
    
    I think that was a stale comment.  The merging never modified the replicaPlan, and it should never be modified during the SRP.  Logically, it should be a snapshot of only the relevant replicas for the SRP.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216846177
  
    --- Diff: src/java/org/apache/cassandra/service/reads/DigestResolver.java ---
    @@ -93,16 +93,14 @@ public PartitionIterator getData()
             {
                 // This path can be triggered only if we've got responses from full replicas and they match, but
                 // transient replica response still contains data, which needs to be reconciled.
    -            DataResolver<E, L> dataResolver = new DataResolver<>(command,
    -                                                                 replicaLayout,
    -                                                                 (ReadRepair<E, L>) NoopReadRepair.instance,
    -                                                                 queryStartNanoTime);
    +            DataResolver<E, P> dataResolver
    +                    = new DataResolver<>(command, replicaPlan, (ReadRepair<E, P>) NoopReadRepair.instance, queryStartNanoTime);
     
                 dataResolver.preprocess(dataResponse);
                 // Forward differences to all full nodes
    --- End diff --
    
    This comment is wrong now right? We don't forward we just merge.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217112723
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
    @@ -18,364 +18,296 @@
     
     package org.apache.cassandra.locator;
     
    -import java.util.Collection;
    -import java.util.HashSet;
    -import java.util.List;
    -import java.util.Set;
    -import java.util.function.Predicate;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Predicates;
    -import com.google.common.collect.Iterables;
    -
     import org.apache.cassandra.config.DatabaseDescriptor;
    -import org.apache.cassandra.db.ConsistencyLevel;
    -import org.apache.cassandra.db.DecoratedKey;
     import org.apache.cassandra.db.Keyspace;
     import org.apache.cassandra.db.PartitionPosition;
     import org.apache.cassandra.dht.AbstractBounds;
     import org.apache.cassandra.dht.Token;
    -import org.apache.cassandra.exceptions.UnavailableException;
     import org.apache.cassandra.gms.FailureDetector;
    -import org.apache.cassandra.net.IAsyncCallback;
    -import org.apache.cassandra.service.StorageProxy;
     import org.apache.cassandra.service.StorageService;
    -import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
    -import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
     import org.apache.cassandra.utils.FBUtilities;
     
    -import static com.google.common.collect.Iterables.any;
    +import java.util.Set;
    +import java.util.function.Predicate;
     
     /**
    - * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
    - * for building the relevant layout.
    + * The relevant replicas for an operation over a given range or token.
      *
    - * Constitutes:
    - *  - the 'natural' replicas replicating the range or token relevant for the operation
    - *  - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
    - *  - the 'selected' replicas, those that should be targeted for any operation
    - *  - 'all' replicas represents natural+pending
    - *
    - * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
    - * @param <L> the type of itself, including its type parameters, for return type of modifying methods
    + * @param <E>
      */
    -public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
    +public abstract class ReplicaLayout<E extends Endpoints<E>>
     {
    -    private volatile E all;
    -    protected final E natural;
    -    protected final E pending;
    -    protected final E selected;
    -
    -    protected final Keyspace keyspace;
    -    protected final ConsistencyLevel consistencyLevel;
    -
    -    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
    -    {
    -        this(keyspace, consistencyLevel, natural, pending, selected, null);
    -    }
    +    private final E natural;
     
    -    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
    +    ReplicaLayout(E natural)
         {
    -        assert selected != null;
    -        assert pending == null || !Endpoints.haveConflicts(natural, pending);
    -        this.keyspace = keyspace;
    -        this.consistencyLevel = consistencyLevel;
             this.natural = natural;
    -        this.pending = pending;
    -        this.selected = selected;
    -        // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
    -        if (all == null && pending == null)
    -            all = natural;
    -        this.all = all;
         }
     
    -    public Replica getReplicaFor(InetAddressAndPort endpoint)
    -    {
    -        return natural.byEndpoint().get(endpoint);
    -    }
    -
    -    public E natural()
    +    /**
    +     * The 'natural' owners of the ring position(s), as implied by the current ring layout.
    +     * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but
    +     * have not yet finished obtaining their view of the range.
    +     */
    +    public final E natural()
         {
             return natural;
         }
     
    -    public E all()
    -    {
    -        E result = all;
    -        if (result == null)
    -            all = result = Endpoints.concat(natural, pending);
    -        return result;
    -    }
    -
    -    public E selected()
    -    {
    -        return selected;
    -    }
    -
         /**
    -     * @return the pending replicas - will be null for read layouts
    -     * TODO: ideally we would enforce at compile time that read layouts have no pending to access
    +     * All relevant owners of the ring position(s) for this operation, as implied by the current ring layout.
    +     * For writes, this will include pending owners, and for reads it will be equivalent to natural()
          */
    -    public E pending()
    -    {
    -        return pending;
    -    }
    -
    -    public int blockFor()
    -    {
    -        return pending == null
    -                ? consistencyLevel.blockFor(keyspace)
    -                : consistencyLevel.blockForWrite(keyspace, pending);
    -    }
    -
    -    public Keyspace keyspace()
    +    public E all()
         {
    -        return keyspace;
    +        return natural;
         }
     
    -    public ConsistencyLevel consistencyLevel()
    +    public String toString()
         {
    -        return consistencyLevel;
    +        return "ReplicaLayout [ natural: " + natural + " ]";
         }
     
    -    abstract public L withSelected(E replicas);
    -
    -    abstract public L withConsistencyLevel(ConsistencyLevel cl);
    -
    -    public L forNaturalUncontacted()
    +    public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> implements ForToken
         {
    -        E more;
    -        if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
    +        public ForTokenRead(EndpointsForToken natural)
             {
    -            IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
    -            String localDC = DatabaseDescriptor.getLocalDataCenter();
    -
    -            more = natural.filter(replica -> !selected.contains(replica) &&
    -                    snitch.getDatacenter(replica).equals(localDC));
    -        } else
    +            super(natural);
    +        }
    +        @Override
    +        public Token token()
             {
    -            more = natural.filter(replica -> !selected.contains(replica));
    +            return natural().token();
             }
     
    -        return withSelected(more);
    +        public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter)
    +        {
    +            EndpointsForToken filtered = natural().filter(filter);
    +            if (filtered == natural()) return this;
    +            return new ReplicaLayout.ForTokenRead(filtered);
    +        }
         }
     
    -    public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
    +    public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> implements ForRange
         {
    -        public final AbstractBounds<PartitionPosition> range;
    +        final AbstractBounds<PartitionPosition> range;
     
    -        @VisibleForTesting
    -        public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
    +        public ForRangeRead(AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
             {
    -            // Range queries do not contact pending replicas
    -            super(keyspace, consistencyLevel, natural, null, selected);
    +            super(natural);
                 this.range = range;
             }
     
             @Override
    -        public ForRange withSelected(EndpointsForRange newSelected)
    +        public AbstractBounds<PartitionPosition> range()
    --- End diff --
    
    It wasn't unused, but it can be made so - and it is now.  But for consistency with ReplicaLayout.ForTokenRead, I think it's fairly harmless.
    
    A wider question is whether we actually want a ReplicaLayout for reads at all.  They're only a single Endpoints, and I do wonder if it's worth it.  But, again, for consistency with ReplicaPlan I think it's a very small price to pay.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217185583
  
    --- Diff: src/java/org/apache/cassandra/service/reads/ReadCallback.java ---
    @@ -44,16 +43,17 @@
     import org.apache.cassandra.utils.FBUtilities;
     import org.apache.cassandra.utils.concurrent.SimpleCondition;
     
    -public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse>
    +public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements IAsyncCallbackWithFailure<ReadResponse>
     {
         protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
     
         public final ResponseResolver resolver;
         final SimpleCondition condition = new SimpleCondition();
         private final long queryStartNanoTime;
    -    // TODO: move to replica layout as well?
    -    final int blockfor;
    -    final L replicaLayout;
    +    final int blockFor; // TODO: move to replica plan as well?
    --- End diff --
    
    Move it or keep it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216839700
  
    --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
    @@ -184,30 +185,28 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
             SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
     
    -        // Endpoints for Token
    -        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
    +        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
     
             // Speculative retry is disabled *OR*
             // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
             if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
    -            // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
    -            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
    +            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
     
             // There are simply no extra replicas to speculate.
             // Handle this separately so it can record failed attempts to speculate due to lack of replicas
    -        if (replicaLayout.selected().size() == replicaLayout.all().size())
    +        if (replicaPlan.contact().size() == replicaPlan.candidates().size())
             {
                 boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
    -            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
    +            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
             }
     
             // If CL.ALL, upgrade to AlwaysSpeculating;
    --- End diff --
    
    This doesn't seem to have anything to do with CL.ALL anymore. And I don't get why CL.ALL benefits from always speculating since it fails if it doesn't get the data response anyways?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216434289
  
    --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
    @@ -196,7 +196,7 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co
     
             // There are simply no extra replicas to speculate.
             // Handle this separately so it can record failed attempts to speculate due to lack of replicas
    -        if (replicaPlan.contact().size() >= replicaPlan.liveOnly().all().size())
    +        if (replicaPlan.contact().size() == replicaPlan.candidates().size())
    --- End diff --
    
    So >= is not necessary because contact should be a subset of candidates (or it should be an unavailable)? Is it more robust to use >= in case that doesn't hold due to a mistake or have an assertion for that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216486027
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
    @@ -194,14 +198,97 @@ public Token token()
     
         public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
         {
    -        if (Endpoints.haveConflicts(natural, pending))
    +        if (haveWriteConflicts(natural, pending))
             {
    -            natural = Endpoints.resolveConflictsInNatural(natural, pending);
    -            pending = Endpoints.resolveConflictsInPending(natural, pending);
    +            natural = resolveWriteConflictsInNatural(natural, pending);
    +            pending = resolveWriteConflictsInPending(natural, pending);
             }
             return new ReplicaLayout.ForTokenWrite(natural, pending);
         }
     
    +    /**
    +     * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation)
    +     * or because an endpoint is transitioning between full and transient replication status.
    +     *
    +     * We essentially always prefer the full version for writes, because this is stricter.
    +     *
    +     * For transient->full transitions:
    +     *
    +     *   Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration,
    +     *   it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form,
    +     *   as there is always a quorum of nodes receiving the write.  However, ring ownership changes are not atomic or
    +     *   consistent across the cluster, and it is possible for writers to see different ring states.
    +     *
    +     *   Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair
    +     *   (as the normal contract dictates) when it completes its transition.
    +     *
    +     *   While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative
    +     *   available to us today to mitigate, and (we think) the easiest to reason about.
    +     *
    +     * For full->transient transitions:
    +     *
    +     *   In this case, things are dicier, because in theory we can trigger this change instantly.  All we need to do is
    +     *   drop some data, surely?
    +     *
    +     *   Ring movements can put is in a pickle; any other node could believe us to be full when we have become transient,
    +     *   and perform a full data request to us that we believe ourselves capable of answering, but that we are not.
    +     *   If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing
    +     *   its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware
    +     *   of the data inconsistency.
    +     *
    +     *   This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when
    +     *   the transition takes effect.  As such, a node can hold an incorrect belief about its own ownership ranges.
    +     *
    +     *   This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it.
    +     *   It is a little more dangerous with transient replication, however, because we can completely answer a request without
    +     *   ever touching a digest, meaning we are less likely to attempt to repair any inconsistency.
    +     *
    +     *   We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance.
    +     *
    +     * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket
    +     * to avoid corrupting our count.  This is fine for writes, all we're doing is ensuring we always write to the node,
    +     * instead of selectively.
    +     *
    +     * @param natural
    +     * @param pending
    +     * @param <E>
    +     * @return
    +     */
    +    public static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E pending)
    +    {
    +        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
    +        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
    +        {
    +            if (naturalEndpoints.contains(pendingEndpoint))
    +                return true;
    +        }
    +        return false;
    +    }
    +
    +    /**
    +     * MUST APPLY FIRST
    +     * See {@link ReplicaLayout#haveWriteConflicts}
    +     * @return a 'natural' replica collection, that has had its conflicts with pending repaired
    +     */
    +    public static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E natural, E pending)
    +    {
    +        return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
    +    }
    +
    +    /**
    +     * MUST APPLY SECOND
    +     * See {@link ReplicaLayout#haveWriteConflicts}
    +     * @return a 'pending' replica collection, that has had its conflicts with natural repaired
    +     */
    +    public static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E natural, E pending)
    --- End diff --
    
    Do these need to be public?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217186512
  
    --- Diff: src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---
    @@ -459,36 +458,34 @@ private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddressAnd
                 Keyspace keyspace = Keyspace.open(ks);
                 Token tk = mutation.key().getToken();
     
    -            EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
    -            Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
    +            // TODO: this logic could do with revisiting at some point, as it is unclear what its rationale is
    --- End diff --
    
    JIRA vs this TODO?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216916456
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
    @@ -194,14 +198,97 @@ public Token token()
     
         public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
         {
    -        if (Endpoints.haveConflicts(natural, pending))
    +        if (haveWriteConflicts(natural, pending))
             {
    -            natural = Endpoints.resolveConflictsInNatural(natural, pending);
    -            pending = Endpoints.resolveConflictsInPending(natural, pending);
    +            natural = resolveWriteConflictsInNatural(natural, pending);
    +            pending = resolveWriteConflictsInPending(natural, pending);
             }
             return new ReplicaLayout.ForTokenWrite(natural, pending);
         }
     
    +    /**
    +     * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation)
    +     * or because an endpoint is transitioning between full and transient replication status.
    +     *
    +     * We essentially always prefer the full version for writes, because this is stricter.
    +     *
    +     * For transient->full transitions:
    +     *
    +     *   Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration,
    +     *   it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form,
    +     *   as there is always a quorum of nodes receiving the write.  However, ring ownership changes are not atomic or
    +     *   consistent across the cluster, and it is possible for writers to see different ring states.
    +     *
    +     *   Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair
    +     *   (as the normal contract dictates) when it completes its transition.
    +     *
    +     *   While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative
    +     *   available to us today to mitigate, and (we think) the easiest to reason about.
    +     *
    +     * For full->transient transitions:
    +     *
    +     *   In this case, things are dicier, because in theory we can trigger this change instantly.  All we need to do is
    +     *   drop some data, surely?
    +     *
    +     *   Ring movements can put is in a pickle; any other node could believe us to be full when we have become transient,
    +     *   and perform a full data request to us that we believe ourselves capable of answering, but that we are not.
    +     *   If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing
    +     *   its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware
    +     *   of the data inconsistency.
    +     *
    +     *   This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when
    +     *   the transition takes effect.  As such, a node can hold an incorrect belief about its own ownership ranges.
    +     *
    +     *   This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it.
    +     *   It is a little more dangerous with transient replication, however, because we can completely answer a request without
    +     *   ever touching a digest, meaning we are less likely to attempt to repair any inconsistency.
    +     *
    +     *   We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance.
    +     *
    +     * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket
    +     * to avoid corrupting our count.  This is fine for writes, all we're doing is ensuring we always write to the node,
    +     * instead of selectively.
    +     *
    +     * @param natural
    +     * @param pending
    +     * @param <E>
    +     * @return
    +     */
    +    public static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E pending)
    +    {
    +        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
    +        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
    +        {
    +            if (naturalEndpoints.contains(pendingEndpoint))
    +                return true;
    +        }
    +        return false;
    +    }
    +
    +    /**
    +     * MUST APPLY FIRST
    +     * See {@link ReplicaLayout#haveWriteConflicts}
    +     * @return a 'natural' replica collection, that has had its conflicts with pending repaired
    +     */
    +    public static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E natural, E pending)
    +    {
    +        return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
    +    }
    +
    +    /**
    +     * MUST APPLY SECOND
    +     * See {@link ReplicaLayout#haveWriteConflicts}
    +     * @return a 'pending' replica collection, that has had its conflicts with natural repaired
    +     */
    +    public static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E natural, E pending)
    --- End diff --
    
    Not anymore


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216442898
  
    --- Diff: src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---
    @@ -53,15 +53,16 @@
         protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
         private final int blockFor;
     
    -    BlockingReadRepair(ReadCommand command, P replicaPlan, long queryStartNanoTime)
    +    BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<P> replicaPlan, long queryStartNanoTime)
         {
             super(command, replicaPlan, queryStartNanoTime);
    -        this.blockFor = replicaPlan.consistencyLevel().blockFor(cfs.keyspace);
    +        this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
         }
     
         public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
         {
    -        return new PartitionIteratorMergeListener<>(replicaPlan, command, this.replicaPlan.consistencyLevel(), this);
    +        // TODO: why are we referencing a different replicaPlan here?
    --- End diff --
    
    TODO indeed looking at DataResolver it's modifying the replica plan without updating the shared one https://github.com/apache/cassandra/pull/265/files#diff-7e5dd130632299911e49b12afe86c85aR121
    So they would be different?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216915886
  
    --- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---
    @@ -182,7 +160,17 @@ private void assertSubList(C subCollection, int from, int to)
                 }
             }
     
    -        void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
    +        private void assertSubSequence(Iterable<Replica> subSequence, int from, int to)
    --- End diff --
    
    Ah, poor merge from a follow-up patch where I introduced filterLazily


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216432759
  
    --- Diff: test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---
    @@ -182,7 +160,17 @@ private void assertSubList(C subCollection, int from, int to)
                 }
             }
     
    -        void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
    +        private void assertSubSequence(Iterable<Replica> subSequence, int from, int to)
    --- End diff --
    
    Unused and also kind of weird (elementsEqual twice?)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by belliottsmith <gi...@git.apache.org>.
Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r217174597
  
    --- Diff: src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---
    @@ -196,7 +196,7 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co
     
             // There are simply no extra replicas to speculate.
             // Handle this separately so it can record failed attempts to speculate due to lack of replicas
    -        if (replicaPlan.contact().size() >= replicaPlan.liveOnly().all().size())
    +        if (replicaPlan.contact().size() == replicaPlan.candidates().size())
    --- End diff --
    
    I modified to `>=` in an earlier version of this patch, and decided it was a silly change and rolled it back.  Pretty much by definition, if we select `contact` from `candidates`, it should always be `<=`.  I guess we could insert an assertion, but at some point we have to drawn the line on what we verify and what we do not.  I'm willing to put an assertion in, but I feel it is unnecessary, given we produce almost all `contact` via a call to `candidates.filter`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216473482
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaPlan.java ---
    @@ -249,12 +229,32 @@ public int requiredParticipants()
          * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
          * @param <P>
          */
    -    public static class Shared<P extends ReplicaPlan<?, ?>>
    +    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
         {
    -        private P replicaPlan;
    -        public Shared(P replicaPlan) { this.replicaPlan = replicaPlan; }
    -        public void set(P newReplicaPlan) { this.replicaPlan = newReplicaPlan; }
    -        public P get() { return replicaPlan; }
    +        public void addToContact(Replica replica);
    +        public P get();
    +        public abstract P getWithContact(E endpoints);
         }
     
    +    public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
    +    {
    +        private ForTokenRead replicaPlan;
    +        public SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    +        public ForTokenRead get() { return replicaPlan; }
    +        public ForTokenRead getWithContact(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
    +    }
    +
    +    public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
    +    {
    +        private ForRangeRead replicaPlan;
    +        public SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
    +        public void addToContact(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contact(), replica)); }
    --- End diff --
    
    addToContact or addContact, grammar seems odd.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #265: 14705

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/265#discussion_r216451203
  
    --- Diff: src/java/org/apache/cassandra/locator/ReplicaLayout.java ---
    @@ -18,364 +18,296 @@
     
     package org.apache.cassandra.locator;
     
    -import java.util.Collection;
    -import java.util.HashSet;
    -import java.util.List;
    -import java.util.Set;
    -import java.util.function.Predicate;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Predicates;
    -import com.google.common.collect.Iterables;
    -
     import org.apache.cassandra.config.DatabaseDescriptor;
    -import org.apache.cassandra.db.ConsistencyLevel;
    -import org.apache.cassandra.db.DecoratedKey;
     import org.apache.cassandra.db.Keyspace;
     import org.apache.cassandra.db.PartitionPosition;
     import org.apache.cassandra.dht.AbstractBounds;
     import org.apache.cassandra.dht.Token;
    -import org.apache.cassandra.exceptions.UnavailableException;
     import org.apache.cassandra.gms.FailureDetector;
    -import org.apache.cassandra.net.IAsyncCallback;
    -import org.apache.cassandra.service.StorageProxy;
     import org.apache.cassandra.service.StorageService;
    -import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
    -import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
     import org.apache.cassandra.utils.FBUtilities;
     
    -import static com.google.common.collect.Iterables.any;
    +import java.util.Set;
    +import java.util.function.Predicate;
     
     /**
    - * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
    - * for building the relevant layout.
    + * The relevant replicas for an operation over a given range or token.
      *
    - * Constitutes:
    - *  - the 'natural' replicas replicating the range or token relevant for the operation
    - *  - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
    - *  - the 'selected' replicas, those that should be targeted for any operation
    - *  - 'all' replicas represents natural+pending
    - *
    - * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
    - * @param <L> the type of itself, including its type parameters, for return type of modifying methods
    + * @param <E>
      */
    -public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
    +public abstract class ReplicaLayout<E extends Endpoints<E>>
     {
    -    private volatile E all;
    -    protected final E natural;
    -    protected final E pending;
    -    protected final E selected;
    -
    -    protected final Keyspace keyspace;
    -    protected final ConsistencyLevel consistencyLevel;
    -
    -    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
    -    {
    -        this(keyspace, consistencyLevel, natural, pending, selected, null);
    -    }
    +    private final E natural;
     
    -    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
    +    ReplicaLayout(E natural)
         {
    -        assert selected != null;
    -        assert pending == null || !Endpoints.haveConflicts(natural, pending);
    -        this.keyspace = keyspace;
    -        this.consistencyLevel = consistencyLevel;
             this.natural = natural;
    -        this.pending = pending;
    -        this.selected = selected;
    -        // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
    -        if (all == null && pending == null)
    -            all = natural;
    -        this.all = all;
         }
     
    -    public Replica getReplicaFor(InetAddressAndPort endpoint)
    -    {
    -        return natural.byEndpoint().get(endpoint);
    -    }
    -
    -    public E natural()
    +    /**
    +     * The 'natural' owners of the ring position(s), as implied by the current ring layout.
    +     * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but
    +     * have not yet finished obtaining their view of the range.
    +     */
    +    public final E natural()
         {
             return natural;
         }
     
    -    public E all()
    -    {
    -        E result = all;
    -        if (result == null)
    -            all = result = Endpoints.concat(natural, pending);
    -        return result;
    -    }
    -
    -    public E selected()
    -    {
    -        return selected;
    -    }
    -
         /**
    -     * @return the pending replicas - will be null for read layouts
    -     * TODO: ideally we would enforce at compile time that read layouts have no pending to access
    +     * All relevant owners of the ring position(s) for this operation, as implied by the current ring layout.
    +     * For writes, this will include pending owners, and for reads it will be equivalent to natural()
          */
    -    public E pending()
    -    {
    -        return pending;
    -    }
    -
    -    public int blockFor()
    -    {
    -        return pending == null
    -                ? consistencyLevel.blockFor(keyspace)
    -                : consistencyLevel.blockForWrite(keyspace, pending);
    -    }
    -
    -    public Keyspace keyspace()
    +    public E all()
         {
    -        return keyspace;
    +        return natural;
         }
     
    -    public ConsistencyLevel consistencyLevel()
    +    public String toString()
         {
    -        return consistencyLevel;
    +        return "ReplicaLayout [ natural: " + natural + " ]";
         }
     
    -    abstract public L withSelected(E replicas);
    -
    -    abstract public L withConsistencyLevel(ConsistencyLevel cl);
    -
    -    public L forNaturalUncontacted()
    +    public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> implements ForToken
         {
    -        E more;
    -        if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
    +        public ForTokenRead(EndpointsForToken natural)
             {
    -            IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
    -            String localDC = DatabaseDescriptor.getLocalDataCenter();
    -
    -            more = natural.filter(replica -> !selected.contains(replica) &&
    -                    snitch.getDatacenter(replica).equals(localDC));
    -        } else
    +            super(natural);
    +        }
    +        @Override
    +        public Token token()
             {
    -            more = natural.filter(replica -> !selected.contains(replica));
    +            return natural().token();
             }
     
    -        return withSelected(more);
    +        public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter)
    +        {
    +            EndpointsForToken filtered = natural().filter(filter);
    +            if (filtered == natural()) return this;
    +            return new ReplicaLayout.ForTokenRead(filtered);
    +        }
         }
     
    -    public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
    +    public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> implements ForRange
         {
    -        public final AbstractBounds<PartitionPosition> range;
    +        final AbstractBounds<PartitionPosition> range;
     
    -        @VisibleForTesting
    -        public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
    +        public ForRangeRead(AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
             {
    -            // Range queries do not contact pending replicas
    -            super(keyspace, consistencyLevel, natural, null, selected);
    +            super(natural);
                 this.range = range;
             }
     
             @Override
    -        public ForRange withSelected(EndpointsForRange newSelected)
    +        public AbstractBounds<PartitionPosition> range()
    --- End diff --
    
    This seems to get copied around but is unused?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org