You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/02 01:55:25 UTC
[3/5] cassandra git commit: Refactor read executor and response
resolver, abstract read repair
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
new file mode 100644
index 0000000..6b1da0b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.function.Function;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tracing.Tracing;
+
+class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
+{
+ private final ReadCommand command;
+ private final InetAddressAndPort source;
+ private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+ private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+ private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
+ private final TableMetadata metadata;
+ private final DecoratedKey partitionKey;
+
+ private Clustering lastClustering; // clustering of the last observed row
+
+ private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
+ private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
+ private int lastQueried = 0; // # extra rows requested from the replica last time
+
+ ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, InetAddressAndPort source,
+ Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
+ DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
+ {
+ this.command = command;
+ this.source = source;
+ this.commandExecutor = commandExecutor;
+ this.singleResultCounter = singleResultCounter;
+ this.mergedResultCounter = mergedResultCounter;
+ this.metadata = command.metadata();
+ this.partitionKey = partitionKey;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ lastClustering = row.clustering();
+ return row;
+ }
+
+ /*
+ * We only get here once all the rows in this iterator have been iterated over, and so if the node
+ * had returned the requested number of rows but we still get here, then some results were skipped
+ * during reconciliation.
+ */
+ public UnfilteredRowIterator moreContents()
+ {
+ // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
+ assert !mergedResultCounter.isDoneForPartition();
+
+ // we do not apply short read protection when we have no limits at all
+ assert !command.limits().isUnlimited();
+
+ /*
+ * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more.
+ *
+ * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
+ * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
+ */
+ if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
+ return null;
+
+ /*
+ * If the replica has no live rows in the partition, don't try to fetch more.
+ *
+ * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
+ * always cover this scenario:
+ * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
+ * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
+ *
+ * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
+ * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
+ * have tombstones in the current partition.
+ *
+ * One other way we can hit this condition is when the partition only has a live static row and no regular
+ * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
+ * the moreContents() call.
+ */
+ if (countedInCurrentPartition(singleResultCounter) == 0)
+ return null;
+
+ /*
+ * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
+ * We already have the row, so there is no point in asking for more from the partition.
+ */
+ if (Clustering.EMPTY == lastClustering)
+ return null;
+
+ lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted;
+ lastCounted = countedInCurrentPartition(singleResultCounter);
+
+ // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
+ if (lastQueried > 0 && lastFetched < lastQueried)
+ return null;
+
+ /*
+ * At this point we know that:
+ * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
+ * rows in the partition
+ * 2. at least one of those returned rows was shadowed by a tombstone returned from another
+ * replica
+ * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
+ * avoid a short read
+ *
+ * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
+ * are defined as follows:
+ * [a] limits.count() - mergedResultCounter.counted()
+ * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
+ *
+ * It would be naive to query for exactly that many rows, as it's possible and not unlikely
+ * that some of the returned rows would also be shadowed by tombstones from other hosts.
+ *
+ * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
+ * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
+ *
+ * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
+ * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
+ * marginal cost for each extra row requested.
+ *
+ * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
+ * time we want to respect paging limits and not blow up spectacularly.
+ *
+ * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
+ * counts.
+ *
+ * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits.
+ *
+ * See CASSANDRA-13794 for more details.
+ */
+ lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
+
+ ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
+
+ SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
+ return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd);
+ }
+
+ // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
+ private int countedInCurrentPartition(DataLimits.Counter counter)
+ {
+ return command.limits().isGroupByLimit()
+ ? counter.rowCountedInCurrentPartition()
+ : counter.countedInCurrentPartition();
+ }
+
+ private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
+ {
+ ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
+ if (null != lastClustering)
+ filter = filter.forPaging(metadata.comparator, lastClustering, false);
+
+ return SinglePartitionReadCommand.create(command.metadata(),
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ command.limits().forShortReadRetry(toQuery),
+ partitionKey,
+ filter,
+ command.indexMetadata());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
new file mode 100644
index 0000000..07b6e2c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.AsyncOneResponse;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.reads.AsyncRepairCallback;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+
+/**
+ * 'Classic' read repair. Doesn't allow the client read to return until
+ * updates have been written to nodes needing correction.
+ */
+public class BlockingReadRepair implements ReadRepair, RepairListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
+
+ private final ReadCommand command;
+ private final List<InetAddressAndPort> endpoints;
+ private final long queryStartNanoTime;
+ private final ConsistencyLevel consistency;
+
+ private final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
+
+ private volatile DigestRepair digestRepair = null;
+
+ private static class DigestRepair
+ {
+ private final DataResolver dataResolver;
+ private final ReadCallback readCallback;
+ private final Consumer<PartitionIterator> resultConsumer;
+
+ public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
+ {
+ this.dataResolver = dataResolver;
+ this.readCallback = readCallback;
+ this.resultConsumer = resultConsumer;
+ }
+ }
+
+ public BlockingReadRepair(ReadCommand command,
+ List<InetAddressAndPort> endpoints,
+ long queryStartNanoTime,
+ ConsistencyLevel consistency)
+ {
+ this.command = command;
+ this.endpoints = endpoints;
+ this.queryStartNanoTime = queryStartNanoTime;
+ this.consistency = consistency;
+ }
+
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ {
+ return new PartitionIteratorMergeListener(endpoints, command, this);
+ }
+
+ public static class BlockingPartitionRepair extends AbstractFuture<Object> implements RepairListener.PartitionRepair
+ {
+
+ final List<AsyncOneResponse<?>> responses;
+
+ public BlockingPartitionRepair(int expectedResponses)
+ {
+ this.responses = new ArrayList<>(expectedResponses);
+ }
+
+ protected AsyncOneResponse sendMutation(InetAddressAndPort endpoint, Mutation mutation)
+ {
+ // use a separate verb here because we don't want these to be get the white glove hint-
+ // on-timeout behavior that a "real" mutation gets
+ Tracing.trace("Sending read-repair-mutation to {}", endpoint);
+ MessageOut<Mutation> msg = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ return MessagingService.instance().sendRR(msg, endpoint);
+ }
+
+ public void reportMutation(InetAddressAndPort endpoint, Mutation mutation)
+ {
+ responses.add(sendMutation(endpoint, mutation));
+ }
+
+ public void finish()
+ {
+ Futures.addCallback(Futures.allAsList(responses), new FutureCallback<List<Object>>()
+ {
+ public void onSuccess(@Nullable List<Object> result)
+ {
+ set(result);
+ }
+
+ public void onFailure(Throwable t)
+ {
+ setException(t);
+ }
+ });
+ }
+ }
+
+ public void awaitRepairs(long timeout)
+ {
+ try
+ {
+ Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException ex)
+ {
+ // We got all responses, but timed out while repairing
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ int blockFor = consistency.blockFor(keyspace);
+ if (Tracing.isTracing())
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ else
+ logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
+
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public PartitionRepair startPartitionRepair()
+ {
+ BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size());
+ repairs.add(repair);
+ return repair;
+ }
+
+ public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ {
+ ReadRepairMetrics.repairedBlocking.mark();
+
+ // Do a full data read to resolve the correct response (and repair node that need be)
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this);
+ ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, contactedEndpoints.size(), command,
+ keyspace, allEndpoints, queryStartNanoTime, this);
+
+ digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
+
+ for (InetAddressAndPort endpoint : contactedEndpoints)
+ {
+ Tracing.trace("Enqueuing full data read to {}", endpoint);
+ MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback);
+ }
+ }
+
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException
+ {
+ if (digestRepair != null)
+ {
+ digestRepair.readCallback.awaitResults();
+ digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve());
+ }
+ }
+
+ public void maybeStartBackgroundRepair(ResponseResolver resolver)
+ {
+ TraceState traceState = Tracing.instance.get();
+ if (traceState != null)
+ traceState.trace("Initiating read-repair");
+ StageManager.getStage(Stage.READ_REPAIR).execute(() -> resolver.evaluateAllResponses(traceState));
+ }
+
+ public void backgroundDigestRepair(TraceState traceState)
+ {
+ if (traceState != null)
+ traceState.trace("Digest mismatch");
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch");
+
+ ReadRepairMetrics.repairedBackground.mark();
+
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ final DataResolver repairResolver = new DataResolver(keyspace, command, consistency, endpoints.size(), queryStartNanoTime, this);
+ AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+ for (InetAddressAndPort endpoint : endpoints)
+ MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
new file mode 100644
index 0000000..ff65dbb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public class NoopReadRepair implements ReadRepair
+{
+ public static final NoopReadRepair instance = new NoopReadRepair();
+
+ private NoopReadRepair() {}
+
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ {
+ return UnfilteredPartitionIterators.MergeListener.NOOP;
+ }
+
+ public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ {
+ resultConsumer.accept(digestResolver.getData());
+ }
+
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException
+ {
+
+ }
+
+ public void maybeStartBackgroundRepair(ResponseResolver resolver)
+ {
+
+ }
+
+ public void backgroundDigestRepair(TraceState traceState)
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
new file mode 100644
index 0000000..3ad57cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(PartitionIteratorMergeListener.class);
+
+ private final InetAddressAndPort[] sources;
+ private final ReadCommand command;
+ private final RepairListener repairListener;
+
+ public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener)
+ {
+ this.sources = sources;
+ this.command = command;
+ this.repairListener = repairListener;
+ }
+
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, repairListener);
+ }
+
+ private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
+ {
+ Columns statics = Columns.NONE;
+ Columns regulars = Columns.NONE;
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter == null)
+ continue;
+
+ RegularAndStaticColumns cols = iter.columns();
+ statics = statics.mergeTo(cols.statics);
+ regulars = regulars.mergeTo(cols.regulars);
+ }
+ return new RegularAndStaticColumns(statics, regulars);
+ }
+
+ private boolean isReversed(List<UnfilteredRowIterator> versions)
+ {
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter == null)
+ continue;
+
+ // Everything will be in the same order
+ return iter.isReverseOrder();
+ }
+
+ assert false : "Expected at least one iterator";
+ return false;
+ }
+
+ public void close()
+ {
+ repairListener.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
new file mode 100644
index 0000000..bdd730c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.reads.DigestResolver;
+import org.apache.cassandra.service.reads.ResponseResolver;
+import org.apache.cassandra.tracing.TraceState;
+
+public interface ReadRepair
+{
+ /**
+ * Used by DataResolver to generate corrections as the partition iterator is consumed
+ */
+ UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints);
+
+ /**
+ * Called when the digests from the initial read don't match. Reads may block on the
+ * repair started by this method.
+ */
+ public void startForegroundRepair(DigestResolver digestResolver,
+ List<InetAddressAndPort> allEndpoints,
+ List<InetAddressAndPort> contactedEndpoints,
+ Consumer<PartitionIterator> resultConsumer);
+
+ /**
+ * Wait for any operations started by {@link ReadRepair#startForegroundRepair} to complete
+ * @throws ReadTimeoutException
+ */
+ public void awaitForegroundRepairFinish() throws ReadTimeoutException;
+
+ /**
+ * Called when responses from all replicas have been received. Read will not block on this.
+ * @param resolver
+ */
+ public void maybeStartBackgroundRepair(ResponseResolver resolver);
+
+ /**
+ * If {@link ReadRepair#maybeStartBackgroundRepair} was called with a {@link DigestResolver}, this will
+ * be called to perform a repair if there was a digest mismatch
+ */
+ public void backgroundDigestRepair(TraceState traceState);
+
+ static ReadRepair create(ReadCommand command, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency)
+ {
+ return new BlockingReadRepair(command, endpoints, queryStartNanoTime, consistency);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
new file mode 100644
index 0000000..174c0e7
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public interface RepairListener
+{
+ interface PartitionRepair
+ {
+ void reportMutation(InetAddressAndPort endpoint, Mutation mutation);
+ void finish();
+ }
+
+ PartitionRepair startPartitionRepair();
+ void awaitRepairs(long timeoutMillis);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
new file mode 100644
index 0000000..63bd3ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.Arrays;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowDiffListener;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+
+public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
+{
+ private final DecoratedKey partitionKey;
+ private final RegularAndStaticColumns columns;
+ private final boolean isReversed;
+ private final InetAddressAndPort[] sources;
+ private final ReadCommand command;
+
+ private final PartitionUpdate.Builder[] repairs;
+
+ private final Row.Builder[] currentRows;
+ private final RowDiffListener diffListener;
+
+ // The partition level deletion for the merge row.
+ private DeletionTime partitionLevelDeletion;
+ // When merged has a currently open marker, its time. null otherwise.
+ private DeletionTime mergedDeletionTime;
+ // For each source, the time of the current deletion as known by the source.
+ private final DeletionTime[] sourceDeletionTime;
+ // For each source, record if there is an open range to send as repair, and from where.
+ private final ClusteringBound[] markerToRepair;
+
+ private final RepairListener repairListener;
+
+ public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener)
+ {
+ this.partitionKey = partitionKey;
+ this.columns = columns;
+ this.isReversed = isReversed;
+ this.sources = sources;
+ repairs = new PartitionUpdate.Builder[sources.length];
+ currentRows = new Row.Builder[sources.length];
+ sourceDeletionTime = new DeletionTime[sources.length];
+ markerToRepair = new ClusteringBound[sources.length];
+ this.command = command;
+ this.repairListener = repairListener;
+
+ this.diffListener = new RowDiffListener()
+ {
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ if (merged != null && !merged.equals(original))
+ currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
+ }
+
+ public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+ {
+ if (merged != null && !merged.equals(original))
+ currentRow(i, clustering).addRowDeletion(merged);
+ }
+
+ public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
+ {
+ if (merged != null && !merged.equals(original))
+ currentRow(i, clustering).addComplexDeletion(column, merged);
+ }
+
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ if (merged != null && !merged.equals(original) && isQueried(merged))
+ currentRow(i, clustering).addCell(merged);
+ }
+
+ private boolean isQueried(Cell cell)
+ {
+ // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may
+ // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair.
+ // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL
+ // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has
+ /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are
+ // not repaired.
+ ColumnMetadata column = cell.column();
+ ColumnFilter filter = RowIteratorMergeListener.this.command.columnFilter();
+ return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column);
+ }
+ };
+ }
+
+ private PartitionUpdate.Builder update(int i)
+ {
+ if (repairs[i] == null)
+ repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+ return repairs[i];
+ }
+
+ /**
+ * The partition level deletion with with which source {@code i} is currently repaired, or
+ * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
+ * up to date on it). The output* of this method is only valid after the call to
+ * {@link #onMergedPartitionLevelDeletion}.
+ */
+ private DeletionTime partitionLevelRepairDeletion(int i)
+ {
+ return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion();
+ }
+
+ private Row.Builder currentRow(int i, Clustering clustering)
+ {
+ if (currentRows[i] == null)
+ {
+ currentRows[i] = BTreeRow.sortedBuilder();
+ currentRows[i].newRow(clustering);
+ }
+ return currentRows[i];
+ }
+
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ {
+ this.partitionLevelDeletion = mergedDeletion;
+ for (int i = 0; i < versions.length; i++)
+ {
+ if (mergedDeletion.supersedes(versions[i]))
+ update(i).addPartitionDeletion(mergedDeletion);
+ }
+ }
+
+ public void onMergedRows(Row merged, Row[] versions)
+ {
+ // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
+ // those case directly in their respective methods (in other words, it would be inefficient to send a row
+ // deletion as repair when we know we've already send a partition level or range tombstone that covers it).
+ if (merged.isEmpty())
+ return;
+
+ Rows.diff(diffListener, merged, versions);
+ for (int i = 0; i < currentRows.length; i++)
+ {
+ if (currentRows[i] != null)
+ update(i).add(currentRows[i].build());
+ }
+ Arrays.fill(currentRows, null);
+ }
+
+ private DeletionTime currentDeletion()
+ {
+ return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
+ }
+
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ try
+ {
+ // The code for merging range tombstones is a tad complex and we had the assertions there triggered
+ // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+ // when that happen without more context that what the assertion errors give us however, hence the
+ // catch here that basically gather as much as context as reasonable.
+ internalOnMergedRangeTombstoneMarkers(merged, versions);
+ }
+ catch (AssertionError e)
+ {
+ // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+ // rather get more info to debug than not.
+ TableMetadata table = command.metadata();
+ String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}",
+ table,
+ merged == null ? "null" : merged.toString(table),
+ '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ Arrays.toString(sources));
+ throw new AssertionError(details, e);
+ }
+ }
+
+ private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ // The current deletion as of dealing with this marker.
+ DeletionTime currentDeletion = currentDeletion();
+
+ for (int i = 0; i < versions.length; i++)
+ {
+ RangeTombstoneMarker marker = versions[i];
+
+ // Update what the source now thinks is the current deletion
+ if (marker != null)
+ sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
+
+ // If merged == null, some of the source is opening or closing a marker
+ if (merged == null)
+ {
+ // but if it's not this source, move to the next one
+ if (marker == null)
+ continue;
+
+ // We have a close and/or open marker for a source, with nothing corresponding in merged.
+ // Because merged is a superset, this imply that we have a current deletion (being it due to an
+ // early opening in merged or a partition level deletion) and that this deletion will still be
+ // active after that point. Further whatever deletion was open or is open by this marker on the
+ // source, that deletion cannot supersedes the current one.
+ //
+ // But while the marker deletion (before and/or after this point) cannot supersede the current
+ // deletion, we want to know if it's equal to it (both before and after), because in that case
+ // the source is up to date and we don't want to include repair.
+ //
+ // So in practice we have 2 possible case:
+ // 1) the source was up-to-date on deletion up to that point: then it won't be from that point
+ // on unless it's a boundary and the new opened deletion time is also equal to the current
+ // deletion (note that this implies the boundary has the same closing and opening deletion
+ // time, which should generally not happen, but can due to legacy reading code not avoiding
+ // this for a while, see CASSANDRA-13237).
+ // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't
+ // we just have nothing to do for that marker).
+ assert !currentDeletion.isLive() : currentDeletion.toString();
+
+ // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair
+ // nor an "active" partition level deletion (where "active" means that it's greater or equal
+ // to the current deletion: if the source has a repaired partition deletion lower than the
+ // current deletion, this means the current deletion is due to a previously open range tombstone,
+ // and if the source isn't currently repaired for that RT, then it means it's up to date on it).
+ DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i);
+ if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
+ {
+ // Since there is an ongoing merged deletion, the only way we don't have an open repair for
+ // this source is that it had a range open with the same deletion as current and it's
+ // closing it.
+ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+
+ // and so unless it's a boundary whose opening deletion time is still equal to the current
+ // deletion (see comment above for why this can actually happen), we have to repair the source
+ // from that point on.
+ if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
+ markerToRepair[i] = marker.closeBound(isReversed).invert();
+ }
+ // In case 2) above, we only have something to do if the source is up-to-date after that point
+ // (which, since the source isn't up-to-date before that point, means we're opening a new deletion
+ // that is equal to the current one).
+ else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
+ {
+ closeOpenMarker(i, marker.openBound(isReversed).invert());
+ }
+ }
+ else
+ {
+ // We have a change of current deletion in merged (potentially to/from no deletion at all).
+
+ if (merged.isClose(isReversed))
+ {
+ // We're closing the merged range. If we're recorded that this should be repaird for the
+ // source, close and add said range to the repair to send.
+ if (markerToRepair[i] != null)
+ closeOpenMarker(i, merged.closeBound(isReversed));
+
+ }
+
+ if (merged.isOpen(isReversed))
+ {
+ // If we're opening a new merged range (or just switching deletion), then unless the source
+ // is up to date on that deletion (note that we've updated what the source deleteion is
+ // above), we'll have to sent the range to the source.
+ DeletionTime newDeletion = merged.openDeletionTime(isReversed);
+ DeletionTime sourceDeletion = sourceDeletionTime[i];
+ if (!newDeletion.equals(sourceDeletion))
+ markerToRepair[i] = merged.openBound(isReversed);
+ }
+ }
+ }
+
+ if (merged != null)
+ mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
+ }
+
+ private void closeOpenMarker(int i, ClusteringBound close)
+ {
+ ClusteringBound open = markerToRepair[i];
+ update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+ markerToRepair[i] = null;
+ }
+
+ public void close()
+ {
+ RepairListener.PartitionRepair repair = null;
+ for (int i = 0; i < repairs.length; i++)
+ {
+ if (repairs[i] == null)
+ continue;
+
+ if (repair == null)
+ {
+ repair = repairListener.startPartitionRepair();
+ }
+ repair.reportMutation(sources[i], new Mutation(repairs[i].build()));
+ }
+
+ if (repair != null)
+ {
+ repair.finish();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org