You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/02 01:55:25 UTC

[3/5] cassandra git commit: Refactor read executor and response resolver, abstract read repair

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
new file mode 100644
index 0000000..6b1da0b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.function.Function;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tracing.Tracing;
+
+class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
+{
+    private final ReadCommand command;
+    private final InetAddressAndPort source;
+    private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+    private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+    private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
+    private final TableMetadata metadata;
+    private final DecoratedKey partitionKey;
+
+    private Clustering lastClustering; // clustering of the last observed row
+
+    private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
+    private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
+    private int lastQueried = 0; // # extra rows requested from the replica last time
+
+    ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, InetAddressAndPort source,
+                            Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
+                            DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
+    {
+        this.command = command;
+        this.source = source;
+        this.commandExecutor = commandExecutor;
+        this.singleResultCounter = singleResultCounter;
+        this.mergedResultCounter = mergedResultCounter;
+        this.metadata = command.metadata();
+        this.partitionKey = partitionKey;
+    }
+
+    @Override
+    public Row applyToRow(Row row)
+    {
+        lastClustering = row.clustering();
+        return row;
+    }
+
+    /*
+     * We only get here once all the rows in this iterator have been iterated over, and so if the node
+     * had returned the requested number of rows but we still get here, then some results were skipped
+     * during reconciliation.
+     */
+    public UnfilteredRowIterator moreContents()
+    {
+        // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
+        assert !mergedResultCounter.isDoneForPartition();
+
+        // we do not apply short read protection when we have no limits at all
+        assert !command.limits().isUnlimited();
+
+        /*
+         * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more.
+         *
+         * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
+         * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
+         */
+        if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
+            return null;
+
+        /*
+         * If the replica has no live rows in the partition, don't try to fetch more.
+         *
+         * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
+         * always cover this scenario:
+         * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
+         * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
+         *
+         * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
+         * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
+         * have tombstones in the current partition.
+         *
+         * One other way we can hit this condition is when the partition only has a live static row and no regular
+         * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
+         * the moreContents() call.
+         */
+        if (countedInCurrentPartition(singleResultCounter) == 0)
+            return null;
+
+        /*
+         * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
+         * We already have the row, so there is no point in asking for more from the partition.
+         */
+        if (Clustering.EMPTY == lastClustering)
+            return null;
+
+        lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted;
+        lastCounted = countedInCurrentPartition(singleResultCounter);
+
+        // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
+        if (lastQueried > 0 && lastFetched < lastQueried)
+            return null;
+
+        /*
+         * At this point we know that:
+         *     1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
+         *        rows in the partition
+         *     2. at least one of those returned rows was shadowed by a tombstone returned from another
+         *        replica
+         *     3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
+         *        avoid a short read
+         *
+         * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
+         * are defined as follows:
+         *     [a] limits.count() - mergedResultCounter.counted()
+         *     [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
+         *
+         * It would be naive to query for exactly that many rows, as it's possible and not unlikely
+         * that some of the returned rows would also be shadowed by tombstones from other hosts.
+         *
+         * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
+         * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
+         *
+         * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
+         * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
+         * marginal cost for each extra row requested.
+         *
+         * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
+         * time we want to respect paging limits and not blow up spectacularly.
+         *
+         * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
+         * counts.
+         *
+         * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits.
+         *
+         * See CASSANDRA-13794 for more details.
+         */
+        lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
+
+        ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
+        Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
+
+        SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
+        return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd);
+    }
+
+    // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
+    private int countedInCurrentPartition(DataLimits.Counter counter)
+    {
+        return command.limits().isGroupByLimit()
+               ? counter.rowCountedInCurrentPartition()
+               : counter.countedInCurrentPartition();
+    }
+
+    private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
+    {
+        ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
+        if (null != lastClustering)
+            filter = filter.forPaging(metadata.comparator, lastClustering, false);
+
+        return SinglePartitionReadCommand.create(command.metadata(),
+                                                 command.nowInSec(),
+                                                 command.columnFilter(),
+                                                 command.rowFilter(),
+                                                 command.limits().forShortReadRetry(toQuery),
+                                                 partitionKey,
+                                                 filter,
+                                                 command.indexMetadata());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
new file mode 100644
index 0000000..07b6e2c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.AsyncOneResponse;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.reads.AsyncRepairCallback;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+
+/**
+ * 'Classic' read repair. Doesn't allow the client read to return until
+ *  updates have been written to nodes needing correction.
+ */
+public class BlockingReadRepair implements ReadRepair, RepairListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
+
+    private final ReadCommand command;
+    private final List<InetAddressAndPort> endpoints;
+    private final long queryStartNanoTime;
+    private final ConsistencyLevel consistency;
+
+    private final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
+
+    private volatile DigestRepair digestRepair = null;
+
+    private static class DigestRepair
+    {
+        private final DataResolver dataResolver;
+        private final ReadCallback readCallback;
+        private final Consumer<PartitionIterator> resultConsumer;
+
+        public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
+        {
+            this.dataResolver = dataResolver;
+            this.readCallback = readCallback;
+            this.resultConsumer = resultConsumer;
+        }
+    }
+
+    public BlockingReadRepair(ReadCommand command,
+                              List<InetAddressAndPort> endpoints,
+                              long queryStartNanoTime,
+                              ConsistencyLevel consistency)
+    {
+        this.command = command;
+        this.endpoints = endpoints;
+        this.queryStartNanoTime = queryStartNanoTime;
+        this.consistency = consistency;
+    }
+
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+    {
+        return new PartitionIteratorMergeListener(endpoints, command, this);
+    }
+
+    public static class BlockingPartitionRepair extends AbstractFuture<Object> implements RepairListener.PartitionRepair
+    {
+
+        final List<AsyncOneResponse<?>> responses;
+
+        public BlockingPartitionRepair(int expectedResponses)
+        {
+            this.responses = new ArrayList<>(expectedResponses);
+        }
+
+        protected AsyncOneResponse sendMutation(InetAddressAndPort endpoint, Mutation mutation)
+        {
+            // use a separate verb here because we don't want these to be get the white glove hint-
+            // on-timeout behavior that a "real" mutation gets
+            Tracing.trace("Sending read-repair-mutation to {}", endpoint);
+            MessageOut<Mutation> msg = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+            return MessagingService.instance().sendRR(msg, endpoint);
+        }
+
+        public void reportMutation(InetAddressAndPort endpoint, Mutation mutation)
+        {
+            responses.add(sendMutation(endpoint, mutation));
+        }
+
+        public void finish()
+        {
+            Futures.addCallback(Futures.allAsList(responses), new FutureCallback<List<Object>>()
+            {
+                public void onSuccess(@Nullable List<Object> result)
+                {
+                    set(result);
+                }
+
+                public void onFailure(Throwable t)
+                {
+                    setException(t);
+                }
+            });
+        }
+    }
+
+    public void awaitRepairs(long timeout)
+    {
+        try
+        {
+            Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (TimeoutException ex)
+        {
+            // We got all responses, but timed out while repairing
+            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+            int blockFor = consistency.blockFor(keyspace);
+            if (Tracing.isTracing())
+                Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+            else
+                logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
+
+            throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public PartitionRepair startPartitionRepair()
+    {
+        BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size());
+        repairs.add(repair);
+        return repair;
+    }
+
+    public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    {
+        ReadRepairMetrics.repairedBlocking.mark();
+
+        // Do a full data read to resolve the correct response (and repair node that need be)
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+        DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this);
+        ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, contactedEndpoints.size(), command,
+                                                     keyspace, allEndpoints, queryStartNanoTime, this);
+
+        digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
+
+        for (InetAddressAndPort endpoint : contactedEndpoints)
+        {
+            Tracing.trace("Enqueuing full data read to {}", endpoint);
+            MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback);
+        }
+    }
+
+    public void awaitForegroundRepairFinish() throws ReadTimeoutException
+    {
+        if (digestRepair != null)
+        {
+            digestRepair.readCallback.awaitResults();
+            digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve());
+        }
+    }
+
+    public void maybeStartBackgroundRepair(ResponseResolver resolver)
+    {
+        TraceState traceState = Tracing.instance.get();
+        if (traceState != null)
+            traceState.trace("Initiating read-repair");
+        StageManager.getStage(Stage.READ_REPAIR).execute(() -> resolver.evaluateAllResponses(traceState));
+    }
+
+    public void backgroundDigestRepair(TraceState traceState)
+    {
+        if (traceState != null)
+            traceState.trace("Digest mismatch");
+        if (logger.isDebugEnabled())
+            logger.debug("Digest mismatch");
+
+        ReadRepairMetrics.repairedBackground.mark();
+
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+        final DataResolver repairResolver = new DataResolver(keyspace, command, consistency, endpoints.size(), queryStartNanoTime, this);
+        AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+        for (InetAddressAndPort endpoint : endpoints)
+            MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
new file mode 100644
index 0000000..ff65dbb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public class NoopReadRepair implements ReadRepair
+{
+    public static final NoopReadRepair instance = new NoopReadRepair();
+
+    private NoopReadRepair() {}
+
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+    {
+        return UnfilteredPartitionIterators.MergeListener.NOOP;
+    }
+
+    public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    {
+        resultConsumer.accept(digestResolver.getData());
+    }
+
+    public void awaitForegroundRepairFinish() throws ReadTimeoutException
+    {
+
+    }
+
+    public void maybeStartBackgroundRepair(ResponseResolver resolver)
+    {
+
+    }
+
+    public void backgroundDigestRepair(TraceState traceState)
+    {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
new file mode 100644
index 0000000..3ad57cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(PartitionIteratorMergeListener.class);
+
+    private final InetAddressAndPort[] sources;
+    private final ReadCommand command;
+    private final RepairListener repairListener;
+
+    public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener)
+    {
+        this.sources = sources;
+        this.command = command;
+        this.repairListener = repairListener;
+    }
+
+    public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+    {
+        return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, repairListener);
+    }
+
+    private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
+    {
+        Columns statics = Columns.NONE;
+        Columns regulars = Columns.NONE;
+        for (UnfilteredRowIterator iter : versions)
+        {
+            if (iter == null)
+                continue;
+
+            RegularAndStaticColumns cols = iter.columns();
+            statics = statics.mergeTo(cols.statics);
+            regulars = regulars.mergeTo(cols.regulars);
+        }
+        return new RegularAndStaticColumns(statics, regulars);
+    }
+
+    private boolean isReversed(List<UnfilteredRowIterator> versions)
+    {
+        for (UnfilteredRowIterator iter : versions)
+        {
+            if (iter == null)
+                continue;
+
+            // Everything will be in the same order
+            return iter.isReverseOrder();
+        }
+
+        assert false : "Expected at least one iterator";
+        return false;
+    }
+
+    public void close()
+    {
+        repairListener.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout());
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
new file mode 100644
index 0000000..bdd730c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public interface ReadRepair
+{
+    /**
+     * Used by DataResolver to generate corrections as the partition iterator is consumed
+     */
+    UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints);
+
+    /**
+     * Called when the digests from the initial read don't match. Reads may block on the
+     * repair started by this method.
+     */
+    public void startForegroundRepair(DigestResolver digestResolver,
+                                      List<InetAddressAndPort> allEndpoints,
+                                      List<InetAddressAndPort> contactedEndpoints,
+                                      Consumer<PartitionIterator> resultConsumer);
+
+    /**
+     * Wait for any operations started by {@link ReadRepair#startForegroundRepair} to complete
+     * @throws ReadTimeoutException
+     */
+    public void awaitForegroundRepairFinish() throws ReadTimeoutException;
+
+    /**
+     * Called when responses from all replicas have been received. Read will not block on this.
+     * @param resolver
+     */
+    public void maybeStartBackgroundRepair(ResponseResolver resolver);
+
+    /**
+     * If {@link ReadRepair#maybeStartBackgroundRepair} was called with a {@link DigestResolver}, this will
+     * be called to perform a repair if there was a digest mismatch
+     */
+    public void backgroundDigestRepair(TraceState traceState);
+
+    static ReadRepair create(ReadCommand command, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency)
+    {
+        return new BlockingReadRepair(command, endpoints, queryStartNanoTime, consistency);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
new file mode 100644
index 0000000..174c0e7
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public interface RepairListener
+{
+    interface PartitionRepair
+    {
+        void reportMutation(InetAddressAndPort endpoint, Mutation mutation);
+        void finish();
+    }
+
+    PartitionRepair startPartitionRepair();
+    void awaitRepairs(long timeoutMillis);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
new file mode 100644
index 0000000..63bd3ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.Arrays;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowDiffListener;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+
+public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
+{
+    private final DecoratedKey partitionKey;
+    private final RegularAndStaticColumns columns;
+    private final boolean isReversed;
+    private final InetAddressAndPort[] sources;
+    private final ReadCommand command;
+
+    private final PartitionUpdate.Builder[] repairs;
+
+    private final Row.Builder[] currentRows;
+    private final RowDiffListener diffListener;
+
+    // The partition level deletion for the merge row.
+    private DeletionTime partitionLevelDeletion;
+    // When merged has a currently open marker, its time. null otherwise.
+    private DeletionTime mergedDeletionTime;
+    // For each source, the time of the current deletion as known by the source.
+    private final DeletionTime[] sourceDeletionTime;
+    // For each source, record if there is an open range to send as repair, and from where.
+    private final ClusteringBound[] markerToRepair;
+
+    private final RepairListener repairListener;
+
+    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener)
+    {
+        this.partitionKey = partitionKey;
+        this.columns = columns;
+        this.isReversed = isReversed;
+        this.sources = sources;
+        repairs = new PartitionUpdate.Builder[sources.length];
+        currentRows = new Row.Builder[sources.length];
+        sourceDeletionTime = new DeletionTime[sources.length];
+        markerToRepair = new ClusteringBound[sources.length];
+        this.command = command;
+        this.repairListener = repairListener;
+
+        this.diffListener = new RowDiffListener()
+        {
+            public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+            {
+                if (merged != null && !merged.equals(original))
+                    currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
+            }
+
+            public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+            {
+                if (merged != null && !merged.equals(original))
+                    currentRow(i, clustering).addRowDeletion(merged);
+            }
+
+            public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
+            {
+                if (merged != null && !merged.equals(original))
+                    currentRow(i, clustering).addComplexDeletion(column, merged);
+            }
+
+            public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+            {
+                if (merged != null && !merged.equals(original) && isQueried(merged))
+                    currentRow(i, clustering).addCell(merged);
+            }
+
+            private boolean isQueried(Cell cell)
+            {
+                // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may
+                // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair.
+                // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL
+                // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has
+                /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are
+                // not repaired.
+                ColumnMetadata column = cell.column();
+                ColumnFilter filter = RowIteratorMergeListener.this.command.columnFilter();
+                return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column);
+            }
+        };
+    }
+
+    private PartitionUpdate.Builder update(int i)
+    {
+        if (repairs[i] == null)
+            repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+        return repairs[i];
+    }
+
+    /**
+     * The partition level deletion with with which source {@code i} is currently repaired, or
+     * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
+     * up to date on it). The output* of this method is only valid after the call to
+     * {@link #onMergedPartitionLevelDeletion}.
+     */
+    private DeletionTime partitionLevelRepairDeletion(int i)
+    {
+        return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion();
+    }
+
+    private Row.Builder currentRow(int i, Clustering clustering)
+    {
+        if (currentRows[i] == null)
+        {
+            currentRows[i] = BTreeRow.sortedBuilder();
+            currentRows[i].newRow(clustering);
+        }
+        return currentRows[i];
+    }
+
+    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+    {
+        this.partitionLevelDeletion = mergedDeletion;
+        for (int i = 0; i < versions.length; i++)
+        {
+            if (mergedDeletion.supersedes(versions[i]))
+                update(i).addPartitionDeletion(mergedDeletion);
+        }
+    }
+
+    public void onMergedRows(Row merged, Row[] versions)
+    {
+        // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
+        // those case directly in their respective methods (in other words, it would be inefficient to send a row
+        // deletion as repair when we know we've already send a partition level or range tombstone that covers it).
+        if (merged.isEmpty())
+            return;
+
+        Rows.diff(diffListener, merged, versions);
+        for (int i = 0; i < currentRows.length; i++)
+        {
+            if (currentRows[i] != null)
+                update(i).add(currentRows[i].build());
+        }
+        Arrays.fill(currentRows, null);
+    }
+
+    private DeletionTime currentDeletion()
+    {
+        return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
+    }
+
+    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+    {
+        try
+        {
+            // The code for merging range tombstones is a tad complex and we had the assertions there triggered
+            // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+            // when that happen without more context that what the assertion errors give us however, hence the
+            // catch here that basically gather as much as context as reasonable.
+            internalOnMergedRangeTombstoneMarkers(merged, versions);
+        }
+        catch (AssertionError e)
+        {
+            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+            // rather get more info to debug than not.
+            TableMetadata table = command.metadata();
+            String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}",
+                                           table,
+                                           merged == null ? "null" : merged.toString(table),
+                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+                                           Arrays.toString(sources));
+            throw new AssertionError(details, e);
+        }
+    }
+
+    private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+    {
+        // The current deletion as of dealing with this marker.
+        DeletionTime currentDeletion = currentDeletion();
+
+        for (int i = 0; i < versions.length; i++)
+        {
+            RangeTombstoneMarker marker = versions[i];
+
+            // Update what the source now thinks is the current deletion
+            if (marker != null)
+                sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+
+            // If merged == null, some of the source is opening or closing a marker
+            if (merged == null)
+            {
+                // but if it's not this source, move to the next one
+                if (marker == null)
+                    continue;
+
+                // We have a close and/or open marker for a source, with nothing corresponding in merged.
+                // Because merged is a superset, this imply that we have a current deletion (being it due to an
+                // early opening in merged or a partition level deletion) and that this deletion will still be
+                // active after that point. Further whatever deletion was open or is open by this marker on the
+                // source, that deletion cannot supersedes the current one.
+                //
+                // But while the marker deletion (before and/or after this point) cannot supersede the current
+                // deletion, we want to know if it's equal to it (both before and after), because in that case
+                // the source is up to date and we don't want to include repair.
+                //
+                // So in practice we have 2 possible case:
+                //  1) the source was up-to-date on deletion up to that point: then it won't be from that point
+                //     on unless it's a boundary and the new opened deletion time is also equal to the current
+                //     deletion (note that this implies the boundary has the same closing and opening deletion
+                //     time, which should generally not happen, but can due to legacy reading code not avoiding
+                //     this for a while, see CASSANDRA-13237).
+                //  2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't
+                //     we just have nothing to do for that marker).
+                assert !currentDeletion.isLive() : currentDeletion.toString();
+
+                // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair
+                // nor an "active" partition level deletion (where "active" means that it's greater or equal
+                // to the current deletion: if the source has a repaired partition deletion lower than the
+                // current deletion, this means the current deletion is due to a previously open range tombstone,
+                // and if the source isn't currently repaired for that RT, then it means it's up to date on it).
+                DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i);
+                if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
+                {
+                    // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+                    // this source is that it had a range open with the same deletion as current and it's
+                    // closing it.
+                    assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+                    : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+
+                    // and so unless it's a boundary whose opening deletion time is still equal to the current
+                    // deletion (see comment above for why this can actually happen), we have to repair the source
+                    // from that point on.
+                    if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
+                        markerToRepair[i] = marker.closeBound(isReversed).invert();
+                }
+                // In case 2) above, we only have something to do if the source is up-to-date after that point
+                // (which, since the source isn't up-to-date before that point, means we're opening a new deletion
+                // that is equal to the current one).
+                else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+                {
+                    closeOpenMarker(i, marker.openBound(isReversed).invert());
+                }
+            }
+            else
+            {
+                // We have a change of current deletion in merged (potentially to/from no deletion at all).
+
+                if (merged.isClose(isReversed))
+                {
+                    // We're closing the merged range. If we're recorded that this should be repaird for the
+                    // source, close and add said range to the repair to send.
+                    if (markerToRepair[i] != null)
+                        closeOpenMarker(i, merged.closeBound(isReversed));
+
+                }
+
+                if (merged.isOpen(isReversed))
+                {
+                    // If we're opening a new merged range (or just switching deletion), then unless the source
+                    // is up to date on that deletion (note that we've updated what the source deleteion is
+                    // above), we'll have to sent the range to the source.
+                    DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+                    DeletionTime sourceDeletion = sourceDeletionTime[i];
+                    if (!newDeletion.equals(sourceDeletion))
+                        markerToRepair[i] = merged.openBound(isReversed);
+                }
+            }
+        }
+
+        if (merged != null)
+            mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+    }
+
+    private void closeOpenMarker(int i, ClusteringBound close)
+    {
+        ClusteringBound open = markerToRepair[i];
+        update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+        markerToRepair[i] = null;
+    }
+
+    public void close()
+    {
+        RepairListener.PartitionRepair repair = null;
+        for (int i = 0; i < repairs.length; i++)
+        {
+            if (repairs[i] == null)
+                continue;
+
+            if (repair == null)
+            {
+                repair = repairListener.startPartitionRepair();
+            }
+            repair.reportMutation(sources[i], new Mutation(repairs[i].build()));
+        }
+
+        if (repair != null)
+        {
+            repair.finish();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org