You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/12 14:58:02 UTC

cassandra git commit: Remove mentions of transient replication from repair path

Repository: cassandra
Updated Branches:
  refs/heads/trunk f100024eb -> 8a73427c6


Remove mentions of transient replication from repair path

Patch by Alex Petrov; reviewed by Blake Eggleston and Ariel Weisberg for CASSANDRA-14698

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a73427c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a73427c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a73427c

Branch: refs/heads/trunk
Commit: 8a73427c6543c94ce49da0ed1f833ec5b8ed4f18
Parents: f100024
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Sep 6 12:37:13 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 12 16:56:57 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../service/reads/AbstractReadExecutor.java     |   2 +-
 .../cassandra/service/reads/DataResolver.java   |  13 +-
 .../cassandra/service/reads/DigestResolver.java |   8 +-
 .../service/reads/ResponseResolver.java         |   8 +-
 .../reads/repair/AbstractReadRepair.java        |   4 +-
 .../reads/repair/ReadRepairDiagnostics.java     |  14 +-
 .../reads/repair/RowIteratorMergeListener.java  |  44 ++--
 .../reads/DataResolverTransientTest.java        | 226 -------------------
 .../service/reads/DigestResolverTest.java       |  35 ++-
 10 files changed, 73 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3cfdcff..264c80f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 4.0
- * fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
+ * Remove mentions of transient replication from repair path (CASSANDRA-14698)
+ * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
  * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
  * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
  * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 5543fcc..75885ae 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -81,7 +81,7 @@ public abstract class AbstractReadExecutor
         this.replicaLayout = replicaLayout;
         this.initialDataRequestCount = initialDataRequestCount;
         this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
-        this.digestResolver = new DigestResolver<>(command, replicaLayout, readRepair, queryStartNanoTime);
+        this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime);
         this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
         this.cfs = cfs;
         this.traceState = Tracing.instance.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 1f69d6a..a6901b2 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -50,15 +50,18 @@ import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 
 import static com.google.common.collect.Iterables.*;
+import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener;
 
 public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
 {
     private final boolean enforceStrictLiveness;
+    private final ReadRepair<E, L> readRepair;
 
     public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
     {
-        super(command, replicaLayout, readRepair, queryStartNanoTime);
+        super(command, replicaLayout, queryStartNanoTime);
         this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+        this.readRepair = readRepair;
     }
 
     public PartitionIterator getData()
@@ -157,8 +160,14 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
         return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
     }
 
-    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker)
+    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
+                                                                         L sources,
+                                                                         RepairedDataTracker repairedDataTracker)
     {
+        // Avoid wrapping no-op listeners as it doesn't throw
+        if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
+            return partitionListener;
+
         return new UnfilteredPartitionIterators.MergeListener()
         {
             public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index c3eee43..28c2117 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -44,9 +44,9 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
 {
     private volatile MessageIn<ReadResponse> dataResponse;
 
-    public DigestResolver(ReadCommand command, L replicas, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+    public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime)
     {
-        super(command, replicas, readRepair, queryStartNanoTime);
+        super(command, replicas, queryStartNanoTime);
         Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
                                     "DigestResolver can only be used with SinglePartitionReadCommand commands");
     }
@@ -95,11 +95,11 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
             // transient replica response still contains data, which needs to be reconciled.
             DataResolver<E, L> dataResolver = new DataResolver<>(command,
                                                                  replicaLayout,
-                                                                 (ReadRepair<E, L>) NoopReadRepair.instance,
+                                                                 NoopReadRepair.instance,
                                                                  queryStartNanoTime);
 
             dataResolver.preprocess(dataResponse);
-            // Forward differences to all full nodes
+            // Reconcile with transient replicas
             for (MessageIn<ReadResponse> response : responses)
             {
                 Replica replica = replicaLayout.getReplicaFor(response.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index e306b4d..298f843 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.utils.concurrent.Accumulator;
 
 public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
@@ -34,17 +33,15 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
 
     protected final ReadCommand command;
     protected final L replicaLayout;
-    protected final ReadRepair<E, L> readRepair;
 
     // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
     protected final Accumulator<MessageIn<ReadResponse>> responses;
     protected final long queryStartNanoTime;
 
-    public ResponseResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+    public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime)
     {
         this.command = command;
         this.replicaLayout = replicaLayout;
-        this.readRepair = readRepair;
         // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
         this.responses = new Accumulator<>(replicaLayout.all().size());
         this.queryStartNanoTime = queryStartNanoTime;
@@ -60,7 +57,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
         }
         catch (IllegalStateException e)
         {
-            logger.error("Encountered error while trying to preprocess the message {}: %s in command {}, replicas: {}", message, command, readRepair, replicaLayout.consistencyLevel(), replicaLayout.selected());
+            logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}",
+                         message, command, replicaLayout);
             throw e;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 493b9d0..528d31b 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -110,7 +110,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
             Tracing.trace("Enqueuing full data read to {}", replica);
             sendReadCommand(replica, readCallback);
         }
-        ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
+        ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver);
     }
 
     public void awaitReads() throws ReadTimeoutException
@@ -150,7 +150,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
             Tracing.trace("Enqueuing speculative full data read to {}", replica);
             sendReadCommand(replica, repair.readCallback);
             ReadRepairMetrics.speculatedRead.mark();
-            ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
+            ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 6eff395..4c74a89 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -18,15 +18,14 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 import com.google.common.collect.Lists;
 
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.diag.DiagnosticEventService;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
 import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -39,21 +38,22 @@ final class ReadRepairDiagnostics
     {
     }
 
-    static void startRepair(AbstractReadRepair readRepair, Collection<InetAddressAndPort> endpointDestinations,
-                            DigestResolver digestResolver, Collection<InetAddressAndPort> allEndpoints)
+    static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver)
     {
         if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
             service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,
-                                                readRepair, endpointDestinations, allEndpoints, digestResolver));
+                                                readRepair,
+                                                layout.selected().endpoints(),
+                                                layout.all().endpoints(), digestResolver));
     }
 
     static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint,
-                               Iterable<InetAddressAndPort> allEndpoints)
+                               ReplicaLayout<?, ?> replicaLayout)
     {
         if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ))
             service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
                                                 readRepair, Collections.singletonList(endpoint),
-                                                Lists.newArrayList(allEndpoints), null));
+                                                Lists.newArrayList(replicaLayout.all().endpoints()), null));
     }
 
     static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index b0c019a..7fe797a 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -58,7 +58,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
     private final ConsistencyLevel consistency;
 
     private final PartitionUpdate.Builder[] repairs;
-    private final Replica[] sources;
     private final Row.Builder[] currentRows;
     private final RowDiffListener diffListener;
     private final ReplicaLayout layout;
@@ -79,16 +78,12 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         this.partitionKey = partitionKey;
         this.columns = columns;
         this.isReversed = isReversed;
-        Endpoints<?> sources = layout.selected();
-        this.sources = new Replica[sources.size()];
-        for (int i = 0; i < sources.size(); i++)
-            this.sources[i] = sources.get(i);
-
         this.layout = layout;
-        repairs = new PartitionUpdate.Builder[sources.size()];
-        currentRows = new Row.Builder[sources.size()];
-        sourceDeletionTime = new DeletionTime[sources.size()];
-        markerToRepair = new ClusteringBound[sources.size()];
+        int size = layout.selected().size();
+        repairs = new PartitionUpdate.Builder[size];
+        currentRows = new Row.Builder[size];
+        sourceDeletionTime = new DeletionTime[size];
+        markerToRepair = new ClusteringBound[size];
         this.command = command;
         this.consistency = consistency;
         this.readRepair = readRepair;
@@ -97,25 +92,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         {
             public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
             {
-                if (merged != null && !merged.equals(original) && !isTransient(i))
+                if (merged != null && !merged.equals(original))
                     currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
             }
 
             public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
             {
-                if (merged != null && !merged.equals(original) && !isTransient(i))
+                if (merged != null && !merged.equals(original))
                     currentRow(i, clustering).addRowDeletion(merged);
             }
 
             public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
             {
-                if (merged != null && !merged.equals(original) && !isTransient(i))
+                if (merged != null && !merged.equals(original))
                     currentRow(i, clustering).addComplexDeletion(column, merged);
             }
 
             public void onCell(int i, Clustering clustering, Cell merged, Cell original)
             {
-                if (merged != null && !merged.equals(original) && isQueried(merged) && !isTransient(i))
+                if (merged != null && !merged.equals(original) && isQueried(merged))
                     currentRow(i, clustering).addCell(merged);
             }
 
@@ -134,11 +129,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         };
     }
 
-    private boolean isTransient(int i)
-    {
-        return sources[i].isTransient();
-    }
-
     private PartitionUpdate.Builder update(int i)
     {
         if (repairs[i] == null)
@@ -172,9 +162,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
         this.partitionLevelDeletion = mergedDeletion;
         for (int i = 0; i < versions.length; i++)
         {
-            if (isTransient(i))
-                continue;
-
             if (mergedDeletion.supersedes(versions[i]))
                 update(i).addPartitionDeletion(mergedDeletion);
         }
@@ -209,9 +196,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
         for (int i = 0; i < versions.length; i++)
         {
-            if (isTransient(i))
-                continue;
-
             RangeTombstoneMarker marker = versions[i];
 
             // Update what the source now thinks is the current deletion
@@ -326,20 +310,22 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
     public void close()
     {
         Map<Replica, Mutation> mutations = null;
+        Endpoints<?> sources = layout.selected();
         for (int i = 0; i < repairs.length; i++)
         {
             if (repairs[i] == null)
                 continue;
 
-            Preconditions.checkState(!isTransient(i), "cannot read repair transient replicas");
-            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i].endpoint(), false);
+            Replica source = sources.get(i);
+
+            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false);
             if (mutation == null)
                 continue;
 
             if (mutations == null)
-                mutations = Maps.newHashMapWithExpectedSize(sources.length);
+                mutations = Maps.newHashMapWithExpectedSize(sources.size());
 
-            mutations.put(sources[i], mutation);
+            mutations.put(source, mutation);
         }
 
         if (mutations != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
deleted file mode 100644
index 8119400..0000000
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.reads;
-
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.primitives.Ints;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.db.DecoratedKey;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.EmptyIterators;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.SimpleBuilders;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeRow;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.reads.repair.TestableReadRepair;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
-import static org.apache.cassandra.locator.Replica.fullReplica;
-import static org.apache.cassandra.locator.Replica.transientReplica;
-import static org.apache.cassandra.locator.ReplicaUtils.full;
-import static org.apache.cassandra.locator.ReplicaUtils.trans;
-
-/**
- * Tests DataResolvers handing of transient replicas
- */
-public class DataResolverTransientTest extends AbstractReadResponseTest
-{
-    private static DecoratedKey key;
-
-    @Before
-    public void setUp()
-    {
-        key = Util.dk("key1");
-    }
-
-    private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows)
-    {
-        PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false);
-        for (Row row: rows)
-        {
-            builder.add(row);
-        }
-        return builder;
-    }
-
-    private static PartitionUpdate.Builder update(Row... rows)
-    {
-        return update(cfm, "key1", rows);
-    }
-
-    private static Row.SimpleBuilder rowBuilder(int clustering)
-    {
-        return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering));
-    }
-
-    private static Row row(long timestamp, int clustering, int value)
-    {
-        return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build();
-    }
-
-    private static DeletionTime deletion(long timeMillis)
-    {
-        TimeUnit MILLIS = TimeUnit.MILLISECONDS;
-        return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis)));
-    }
-
-    /**
-     * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica
-     */
-    private void assertNoTransientRepairs(PartitionUpdate update)
-    {
-        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key);
-        EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
-        TestableReadRepair repair = new TestableReadRepair(command, QUORUM);
-        DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0);
-
-        Assert.assertFalse(resolver.isDataPresent());
-        resolver.preprocess(response(command, EP1, iter(update), false));
-        resolver.preprocess(response(command, EP2, iter(update), false));
-        resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false));
-
-        Assert.assertFalse(repair.dataWasConsumed());
-        assertPartitionsEqual(filter(iter(update)), resolver.resolve());
-        Assert.assertTrue(repair.dataWasConsumed());
-        Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty());
-    }
-
-    @Test
-    public void emptyRowRepair()
-    {
-        assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build());
-    }
-
-    @Test
-    public void emptyPartitionDeletionRepairs()
-    {
-        PartitionUpdate.Builder builder = update();
-        builder.addPartitionDeletion(deletion(1999));
-        assertNoTransientRepairs(builder.build());
-    }
-
-    /**
-     * Partition level deletion responses shouldn't sent data to a transient replica
-     */
-    @Test
-    public void emptyRowDeletionRepairs()
-    {
-        PartitionUpdate.Builder builder = update();
-        builder.add(rowBuilder(1).timestamp(1999).delete().build());
-        assertNoTransientRepairs(builder.build());
-    }
-
-    @Test
-    public void emptyComplexDeletionRepair()
-    {
-
-        long[] ts = {1000, 2000};
-
-        Row.Builder builder = BTreeRow.unsortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
-        assertNoTransientRepairs(update(cfm2, "key", builder.build()).build());
-
-    }
-
-    @Test
-    public void emptyRangeTombstoneRepairs()
-    {
-        Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b")));
-        PartitionUpdate.Builder builder = update();
-        builder.add(new RangeTombstone(slice, deletion(2000)));
-        assertNoTransientRepairs(builder.build());
-    }
-
-    /**
-     * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas
-     */
-    @Test
-    public void fullRepairsIgnoreTransientReplicas()
-    {
-        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
-        EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
-        TestableReadRepair repair = new TestableReadRepair(command, QUORUM);
-        DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
-
-        Assert.assertFalse(resolver.isDataPresent());
-        resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false));
-        resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false));
-        resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false));
-
-        Assert.assertFalse(repair.dataWasConsumed());
-
-        consume(resolver.resolve());
-
-        Assert.assertTrue(repair.dataWasConsumed());
-
-        Assert.assertTrue(repair.sent.containsKey(EP1));
-        Assert.assertTrue(repair.sent.containsKey(EP2));
-        Assert.assertFalse(repair.sent.containsKey(EP3));
-    }
-
-    /**
-     * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not
-     */
-    @Test
-    public void transientMismatchesRepairFullReplicas()
-    {
-        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
-        EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
-        TestableReadRepair<?, ?> repair = new TestableReadRepair(command, QUORUM);
-        DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
-
-        Assert.assertFalse(resolver.isDataPresent());
-        PartitionUpdate transData = update(row(1000, 5, 5)).build();
-        resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false));
-        resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false));
-        resolver.preprocess(response(command, EP3, iter(transData), false));
-
-        Assert.assertFalse(repair.dataWasConsumed());
-
-        assertPartitionsEqual(filter(iter(transData)), resolver.resolve());
-
-        Assert.assertTrue(repair.dataWasConsumed());
-
-        assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm))));
-        assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm))));
-        Assert.assertFalse(repair.sent.containsKey(EP3));
-
-    }
-
-    private ReplicaLayout.ForToken plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel)
-    {
-        return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 5306a74..8454d6a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -64,8 +64,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
         EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
-        TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
-        DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0);
+        DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
 
         PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build();
 
@@ -83,7 +82,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
         EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
-        DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance,0);
+        DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
 
         PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build();
         PartitionUpdate response2 = update(row(2000, 4, 5)).build();
@@ -104,8 +103,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
         EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2));
-        TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
-        DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0);
+        DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
 
         PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build();
         PartitionUpdate response2 = update(row(1000, 5, 5)).build();
@@ -116,7 +114,6 @@ public class DigestResolverTest extends AbstractReadResponseTest
         Assert.assertTrue(resolver.isDataPresent());
         Assert.assertTrue(resolver.responsesMatch());
         Assert.assertTrue(resolver.hasTransientResponse());
-        Assert.assertTrue(readRepair.sent.isEmpty());
     }
 
     /**
@@ -127,7 +124,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
         EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2));
-        DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance, 0);
+        DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
 
         PartitionUpdate response2 = update(row(1000, 5, 5)).build();
         Assert.assertFalse(resolver.isDataPresent());
@@ -137,6 +134,30 @@ public class DigestResolverTest extends AbstractReadResponseTest
         Assert.assertTrue(resolver.hasTransientResponse());
     }
 
+    @Test
+    public void transientResponseData()
+    {
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2), trans(EP3));
+        DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
+
+        PartitionUpdate fullResponse = update(row(1000, 1, 1)).build();
+        PartitionUpdate digestResponse = update(row(1000, 1, 1)).build();
+        PartitionUpdate transientResponse = update(row(1000, 2, 2)).build();
+        Assert.assertFalse(resolver.isDataPresent());
+        Assert.assertFalse(resolver.hasTransientResponse());
+        resolver.preprocess(response(command, EP1, iter(fullResponse), false));
+        Assert.assertTrue(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP2, iter(digestResponse), true));
+        resolver.preprocess(response(command, EP3, iter(transientResponse), false));
+        Assert.assertTrue(resolver.hasTransientResponse());
+
+        assertPartitionsEqual(filter(iter(dk,
+                                          row(1000, 1, 1),
+                                          row(1000, 2, 2))),
+                              resolver.getData());
+    }
+
     private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas)
     {
         return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org