You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/12/11 18:35:41 UTC

[cassandra] branch trunk updated (1193180 -> 8e61e21)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 1193180  Merge branch 'cassandra-3.11' into trunk
     new fa77676  Extend the exclusion of replica filtering protection to other indices instead of just SASI
     new ae89812  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 8e61e21  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/index/Index.java     | 17 +++++++++++++++++
 .../org/apache/cassandra/index/sasi/SASIIndex.java |  6 ++++++
 .../cassandra/service/reads/DataResolver.java      | 22 +++++++++++++++-------
 4 files changed, 39 insertions(+), 7 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8e61e216a67304482f4373fb8b53012a25404026
Merge: 1193180 ae89812
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Dec 11 18:34:25 2020 +0000

    Merge branch 'cassandra-3.11' into trunk
    
    # Conflicts:
    #	src/java/org/apache/cassandra/index/sasi/SASIIndex.java
    #	src/java/org/apache/cassandra/service/DataResolver.java

 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/index/Index.java     | 17 +++++++++++++++++
 .../org/apache/cassandra/index/sasi/SASIIndex.java |  6 ++++++
 .../cassandra/service/reads/DataResolver.java      | 22 +++++++++++++++-------
 4 files changed, 39 insertions(+), 7 deletions(-)

diff --cc CHANGES.txt
index 520af90,a9a0cee..3b7c96d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,28 -1,8 +1,29 @@@
 -3.11.10
 - * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
 +4.0-beta4
 + * Add dedicated tcp user timeout for streaming connection (CASSANDRA-16143)
 + * Add generatetokens script for offline token allocation strategy generation (CASSANDRA-16205)
 + * Remove Windows scripts (CASSANDRA-16171)
 + * Improve checksumming and compression in protocol V5 (CASSANDRA-15299)
 + * Optimised repair streaming improvements (CASSANDRA-16274)
 + * Update jctools dependency to 3.1.0 (CASSANDRA-16255)
 + * 'SSLEngine closed already' exception on failed outbound connection (CASSANDRA-16277)
 + * Drain and/or shutdown might throw because of slow messaging service shutdown (CASSANDRA-16276)
 + * Upgrade JNA to 5.6.0, dropping support for <=glibc-2.6 systems (CASSANDRA-16212)
 + * Add saved Host IDs to TokenMetadata at startup (CASSANDRA-16246)
 + * Ensure that CacheMetrics.requests is picked up by the metric reporter (CASSANDRA-16228)
 + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019)
 + * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369)
 + * Fix SSTableloader issue when restoring a table named backups (CASSANDRA-16235)
 + * Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103)
 + * Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214)
 + * TLS connections to the storage port on a node without server encryption configured causes java.io.IOException accessing missing keystore (CASSANDRA-16144)
 + * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
 + * When a table attempts to clean up metrics, it was cleaning up all global table metrics (CASSANDRA-16095)
 + * Bring back the accepted encryption protocols list as configurable option (CASSANDRA-13325)
 + * DigestResolver.getData throws AssertionError since dataResponse is null (CASSANDRA-16097)
 +Merged from 3.11:
   * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
  Merged from 3.0:
+  * Extend the exclusion of replica filtering protection to other indices instead of just SASI (CASSANDRA-16311)
   * Synchronize transaction logs for JBOD (CASSANDRA-16225)
   * Fix the counting of cells per partition (CASSANDRA-16259)
   * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
diff --cc src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 592499e,5ea7cec..b1998bc
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@@ -249,7 -244,13 +249,13 @@@ public class SASIIndex implements Index
      public void validate(PartitionUpdate update) throws InvalidRequestException
      {}
  
+     @Override
+     public boolean supportsReplicaFilteringProtection(RowFilter rowFilter)
+     {
+         return false;
+     }
+ 
 -    public Indexer indexerFor(DecoratedKey key, PartitionColumns columns, int nowInSec, OpOrder.Group opGroup, IndexTransaction.Type transactionType)
 +    public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, WriteContext context, IndexTransaction.Type transactionType)
      {
          return new Indexer()
          {
diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java
index eeabb4b,0000000..7c76336
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@@ -1,404 -1,0 +1,412 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.function.UnaryOperator;
 +
 +import com.google.common.base.Joiner;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.cql3.statements.schema.IndexTarget;
++import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionIterators;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
 +import org.apache.cassandra.db.transform.Filter;
 +import org.apache.cassandra.db.transform.FilteredPartitions;
 +import org.apache.cassandra.db.transform.Transformation;
- import org.apache.cassandra.index.sasi.SASIIndex;
++import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +
 +import static com.google.common.collect.Iterables.*;
 +
 +public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 +{
 +    private final boolean enforceStrictLiveness;
 +    private final ReadRepair<E, P> readRepair;
 +
 +    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
 +    {
 +        super(command, replicaPlan, queryStartNanoTime);
 +        this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
 +        this.readRepair = readRepair;
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        ReadResponse response = responses.get(0).payload;
 +        return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return !responses.isEmpty();
 +    }
 +
 +    public PartitionIterator resolve()
 +    {
 +        // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
 +        // at the beginning of this method), so grab the response count once and use that through the method.
 +        Collection<Message<ReadResponse>> messages = responses.snapshot();
 +        assert !any(messages, msg -> msg.payload.isDigestResponse());
 +
 +        E replicas = replicaPlan().candidates().select(transform(messages, Message::from), false);
 +
 +        // If requested, inspect each response for a digest of the replica's repaired data set
 +        RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
 +                                                  ? new RepairedDataTracker(getRepairedDataVerifier(command))
 +                                                  : null;
 +        if (repairedDataTracker != null)
 +        {
 +            messages.forEach(msg -> {
 +                if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull())
 +                {
 +                    repairedDataTracker.recordDigest(msg.from(),
 +                                                     msg.payload.repairedDataDigest(),
 +                                                     msg.payload.isRepairedDigestConclusive());
 +                }
 +            });
 +        }
 +
 +        if (!needsReplicaFilteringProtection())
 +        {
 +            ResolveContext context = new ResolveContext(replicas);
 +            return resolveWithReadRepair(context,
 +                                         i -> shortReadProtectedResponse(i, context),
 +                                         UnaryOperator.identity(),
 +                                         repairedDataTracker);
 +        }
 +
 +        return resolveWithReplicaFilteringProtection(replicas, repairedDataTracker);
 +    }
 +
 +    private boolean needsReplicaFilteringProtection()
 +    {
 +        if (command.rowFilter().isEmpty())
 +            return false;
 +
-         IndexMetadata indexDef = command.indexMetadata();
-         if (indexDef != null && indexDef.isCustom())
++        IndexMetadata indexMetadata = command.indexMetadata();
++
++        if (indexMetadata == null || !indexMetadata.isCustom())
 +        {
-             String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
-             return !SASIIndex.class.getName().equals(className);
++            return true;
 +        }
 +
-         return true;
++        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(command.metadata().id);
++
++        assert cfs != null;
++
++        Index index = command.getIndex(cfs);
++
++        assert index != null;
++
++        return index.supportsReplicaFilteringProtection(command.rowFilter());
 +    }
 +
 +    private class ResolveContext
 +    {
 +        private final E replicas;
 +        private final DataLimits.Counter mergedResultCounter;
 +
 +        private ResolveContext(E replicas)
 +        {
 +            this.replicas = replicas;
 +            this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
 +                                                                   true,
 +                                                                   command.selectsFullPartition(),
 +                                                                   enforceStrictLiveness);
 +        }
 +
 +        private boolean needsReadRepair()
 +        {
 +            return replicas.size() > 1;
 +        }
 +
 +        private boolean needShortReadProtection()
 +        {
 +            // If we have only one result, there is no read repair to do and we can't get short reads
 +            // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
 +            // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
 +            // don't bother protecting against short reads.
 +            return replicas.size() > 1 && !command.limits().isUnlimited();
 +        }
 +    }
 +
 +    @FunctionalInterface
 +    private interface ResponseProvider
 +    {
 +        UnfilteredPartitionIterator getResponse(int i);
 +    }
 +
 +    private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context)
 +    {
 +        UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
 +
 +        return context.needShortReadProtection()
 +               ? ShortReadProtection.extend(context.replicas.get(i),
 +                                            () -> responses.clearUnsafe(i),
 +                                            originalResponse,
 +                                            command,
 +                                            context.mergedResultCounter,
 +                                            queryStartNanoTime,
 +                                            enforceStrictLiveness)
 +               : originalResponse;
 +    }
 +
 +    private PartitionIterator resolveWithReadRepair(ResolveContext context,
 +                                                    ResponseProvider responseProvider,
 +                                                    UnaryOperator<PartitionIterator> preCountFilter,
 +                                                    RepairedDataTracker repairedDataTracker)
 +    {
 +        UnfilteredPartitionIterators.MergeListener listener = null;
 +        if (context.needsReadRepair())
 +        {
 +            P sources = replicaPlan.getWithContacts(context.replicas);
 +            listener = wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker);
 +        }
 +
 +        return resolveInternal(context, listener, responseProvider, preCountFilter);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, RepairedDataTracker repairedDataTracker)
 +    {
 +        // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
 +        // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
 +        // of that row) involves 3 main elements:
 +        //   1) We combine short-read protection and a merge listener that identifies potentially "out-of-date"
 +        //      rows to create an iterator that is guaranteed to produce enough valid row results to satisfy the query
 +        //      limit if enough actually exist. A row is considered out-of-date if its merged from is non-empty and we
 +        //      receive not response from at least one replica. In this case, it is possible that filtering at the
 +        //      "silent" replica has produced a more up-to-date result.
 +        //   2) This iterator is passed to the standard resolution process with read-repair, but is first wrapped in a
 +        //      response provider that lazily "completes" potentially out-of-date rows by directly querying them on the
 +        //      replicas that were previously silent. As this iterator is consumed, it caches valid data for potentially
 +        //      out-of-date rows, and this cached data is merged with the fetched data as rows are requested. If there
 +        //      is no replica divergence, only rows in the partition being evalutated will be cached (then released
 +        //      when the partition is consumed).
 +        //   3) After a "complete" row is materialized, it must pass the row filter supplied by the original query
 +        //      before it counts against the limit.
 +
 +        // We need separate contexts, as each context has his own counter
 +        ResolveContext firstPhaseContext = new ResolveContext(replicas);
 +        ResolveContext secondPhaseContext = new ResolveContext(replicas);
 +        ReplicaFilteringProtection<E> rfp = new ReplicaFilteringProtection<>(replicaPlan().keyspace(),
 +                                                                             command,
 +                                                                             replicaPlan().consistencyLevel(),
 +                                                                             queryStartNanoTime,
 +                                                                             firstPhaseContext.replicas,
 +                                                                             DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
 +                                                                             DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
 +
 +        PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
 +                                                                 rfp.mergeController(),
 +                                                                 i -> shortReadProtectedResponse(i, firstPhaseContext),
 +                                                                 UnaryOperator.identity());
 +
 +        PartitionIterator completedPartitions = resolveWithReadRepair(secondPhaseContext,
 +                                                                      i -> rfp.queryProtectedPartitions(firstPhasePartitions, i),
 +                                                                      results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()),
 +                                                                      repairedDataTracker);
 +
 +        // Ensure that the RFP instance has a chance to record metrics when the iterator closes.
 +        return PartitionIterators.doOnClose(completedPartitions, firstPhasePartitions::close);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private PartitionIterator resolveInternal(ResolveContext context,
 +                                              UnfilteredPartitionIterators.MergeListener mergeListener,
 +                                              ResponseProvider responseProvider,
 +                                              UnaryOperator<PartitionIterator> preCountFilter)
 +    {
 +        int count = context.replicas.size();
 +        List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
 +        for (int i = 0; i < count; i++)
 +            results.add(responseProvider.getResponse(i));
 +
 +        /*
 +         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
 +         * have more rows than the client requested. To make sure that we still conform to the original limit,
 +         * we apply a top-level post-reconciliation counter to the merged partition iterator.
 +         *
 +         * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
 +         * to the current partition to work. For this reason we have to apply the counter transformation before
 +         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
 +         *
 +         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
 +         *
 +         * See CASSANDRA-13747 for more details.
 +         */
 +
 +        UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, mergeListener);
 +        Filter filter = new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness());
 +        FilteredPartitions filtered = FilteredPartitions.filter(merged, filter);
 +        PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter);
 +        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
 +    }
 +
 +    protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
 +    {
 +        return RepairedDataVerifier.verifier(command);
 +    }
 +
 +    private String makeResponsesDebugString(DecoratedKey partitionKey)
 +    {
 +        return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
 +    }
 +
 +    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
 +                                                                         P sources,
 +                                                                         RepairedDataTracker repairedDataTracker)
 +    {
 +        // Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status
 +        // in which case we need to inject the tracker & verify on close
 +        if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
 +        {
 +            if (repairedDataTracker == null)
 +                return partitionListener;
 +
 +            return new UnfilteredPartitionIterators.MergeListener()
 +            {
 +
 +                public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +                {
 +                    return UnfilteredRowIterators.MergeListener.NOOP;
 +                }
 +
 +                public void close()
 +                {
 +                    repairedDataTracker.verify();
 +                }
 +            };
 +        }
 +
 +        return new UnfilteredPartitionIterators.MergeListener()
 +        {
 +            public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +            {
 +                UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
 +
 +                return new UnfilteredRowIterators.MergeListener()
 +                {
 +                    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           mergedDeletion == null ? "null" : mergedDeletion.toString(),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public Row onMergedRows(Row merged, Row[] versions)
 +                    {
 +                        try
 +                        {
 +                            return rowListener.onMergedRows(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
 +                    {
 +                        try
 +                        {
 +                            // The code for merging range tombstones is a tad complex and we had the assertions there triggered
 +                            // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
 +                            // when that happen without more context that what the assertion errors give us however, hence the
 +                            // catch here that basically gather as much as context as reasonable.
 +                            rowListener.onMergedRangeTombstoneMarkers(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +
 +                    }
 +
 +                    public void close()
 +                    {
 +                        rowListener.close();
 +                    }
 +                };
 +            }
 +
 +            public void close()
 +            {
 +                partitionListener.close();
 +                if (repairedDataTracker != null)
 +                    repairedDataTracker.verify();
 +            }
 +        };
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org