You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/12/11 18:35:39 UTC

[cassandra] branch cassandra-3.11 updated (d8a317f -> ae89812)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from d8a317f  Merge branch 'cassandra-3.0' into cassandra-3.11
     new fa77676  Extend the exclusion of replica filtering protection to other indices instead of just SASI
     new ae89812  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                         |  1 +
 src/java/org/apache/cassandra/index/Index.java      | 17 +++++++++++++++++
 .../org/apache/cassandra/index/sasi/SASIIndex.java  |  6 ++++++
 .../org/apache/cassandra/service/DataResolver.java  | 21 ++++++++++++++-------
 4 files changed, 38 insertions(+), 7 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ae8981236ae06a5053775b7f55c8aeb77f8b9318
Merge: d8a317f fa77676
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Dec 11 18:17:37 2020 +0000

    Merge branch 'cassandra-3.0' into cassandra-3.11
    
    # Conflicts:
    #	CHANGES.txt
    #	src/java/org/apache/cassandra/service/DataResolver.java

 CHANGES.txt                                         |  1 +
 src/java/org/apache/cassandra/index/Index.java      | 17 +++++++++++++++++
 .../org/apache/cassandra/index/sasi/SASIIndex.java  |  6 ++++++
 .../org/apache/cassandra/service/DataResolver.java  | 21 ++++++++++++++-------
 4 files changed, 38 insertions(+), 7 deletions(-)

diff --cc CHANGES.txt
index fb9921b,d560d91..a9a0cee
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.24:
 +3.11.10
 + * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
 + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
 +Merged from 3.0:
+  * Extend the exclusion of replica filtering protection to other indices instead of just SASI (CASSANDRA-16311)
   * Synchronize transaction logs for JBOD (CASSANDRA-16225)
   * Fix the counting of cells per partition (CASSANDRA-16259)
   * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
diff --cc src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 4bf94ef,0000000..5ea7cec
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@@ -1,352 -1,0 +1,358 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index.sasi;
 +
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.function.BiFunction;
 +
 +import com.googlecode.concurrenttrees.common.Iterables;
 +
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.Tracker;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.TargetParser;
 +import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 +import org.apache.cassandra.index.sasi.conf.IndexMode;
 +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
 +import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter;
 +import org.apache.cassandra.index.sasi.plan.QueryPlan;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.notifications.*;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +public class SASIIndex implements Index, INotificationConsumer
 +{
 +    public final static String USAGE_WARNING = "SASI indexes are experimental and are not recommended for production use.";
 +
 +    private static class SASIIndexBuildingSupport implements IndexBuildingSupport
 +    {
 +        public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs,
 +                                                       Set<Index> indexes,
 +                                                       Collection<SSTableReader> sstablesToRebuild)
 +        {
 +            NavigableMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> sstables = new TreeMap<>((a, b) -> {
 +                return Integer.compare(a.descriptor.generation, b.descriptor.generation);
 +            });
 +
 +            indexes.stream()
 +                   .filter((i) -> i instanceof SASIIndex)
 +                   .forEach((i) -> {
 +                       SASIIndex sasi = (SASIIndex) i;
 +                       sasi.index.dropData(sstablesToRebuild);
 +                       sstablesToRebuild.stream()
 +                                        .filter((sstable) -> !sasi.index.hasSSTable(sstable))
 +                                        .forEach((sstable) -> {
 +                                            Map<ColumnDefinition, ColumnIndex> toBuild = sstables.get(sstable);
 +                                            if (toBuild == null)
 +                                                sstables.put(sstable, (toBuild = new HashMap<>()));
 +
 +                                            toBuild.put(sasi.index.getDefinition(), sasi.index);
 +                                        });
 +                   });
 +
 +            return new SASIIndexBuilder(cfs, sstables);
 +        }
 +    }
 +
 +    private static final SASIIndexBuildingSupport INDEX_BUILDER_SUPPORT = new SASIIndexBuildingSupport();
 +
 +    private final ColumnFamilyStore baseCfs;
 +    private final IndexMetadata config;
 +    private final ColumnIndex index;
 +
 +    public SASIIndex(ColumnFamilyStore baseCfs, IndexMetadata config)
 +    {
 +        this.baseCfs = baseCfs;
 +        this.config = config;
 +
 +        ColumnDefinition column = TargetParser.parse(baseCfs.metadata, config).left;
 +        this.index = new ColumnIndex(baseCfs.metadata.getKeyValidator(), column, config);
 +
 +        Tracker tracker = baseCfs.getTracker();
 +        tracker.subscribe(this);
 +
 +        SortedMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> toRebuild = new TreeMap<>((a, b)
 +                                                -> Integer.compare(a.descriptor.generation, b.descriptor.generation));
 +
 +        for (SSTableReader sstable : index.init(tracker.getView().liveSSTables()))
 +        {
 +            Map<ColumnDefinition, ColumnIndex> perSSTable = toRebuild.get(sstable);
 +            if (perSSTable == null)
 +                toRebuild.put(sstable, (perSSTable = new HashMap<>()));
 +
 +            perSSTable.put(index.getDefinition(), index);
 +        }
 +
 +        CompactionManager.instance.submitIndexBuild(new SASIIndexBuilder(baseCfs, toRebuild));
 +    }
 +
 +    /**
 +     * Called via reflection at {@link IndexMetadata#validateCustomIndexOptions}
 +     */
 +    public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm)
 +    {
 +        if (!(cfm.partitioner instanceof Murmur3Partitioner))
 +            throw new ConfigurationException("SASI only supports Murmur3Partitioner.");
 +
 +        String targetColumn = options.get("target");
 +        if (targetColumn == null)
 +            throw new ConfigurationException("unknown target column");
 +
 +        Pair<ColumnDefinition, IndexTarget.Type> target = TargetParser.parse(cfm, targetColumn);
 +        if (target == null)
 +            throw new ConfigurationException("failed to retrieve target column for: " + targetColumn);
 +
 +        if (target.left.isComplex())
 +            throw new ConfigurationException("complex columns are not yet supported by SASI");
 +
 +        if (target.left.isPartitionKey())
 +            throw new ConfigurationException("partition key columns are not yet supported by SASI");
 +
 +        IndexMode.validateAnalyzer(options, target.left);
 +
 +        IndexMode mode = IndexMode.getMode(target.left, options);
 +        if (mode.mode == Mode.SPARSE)
 +        {
 +            if (mode.isLiteral)
 +                throw new ConfigurationException("SPARSE mode is only supported on non-literal columns.");
 +
 +            if (mode.isAnalyzed)
 +                throw new ConfigurationException("SPARSE mode doesn't support analyzers.");
 +        }
 +
 +        return Collections.emptyMap();
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return config;
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        return null;
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata)
 +    {
 +        return null;
 +    }
 +
 +    public Callable<?> getBlockingFlushTask()
 +    {
 +        return null; // SASI indexes are flushed along side memtable
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return getTruncateTask(FBUtilities.timestampMicros());
 +    }
 +
 +    public Callable<?> getTruncateTask(long truncatedAt)
 +    {
 +        return () -> {
 +            index.dropData(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        return true;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return Optional.empty();
 +    }
 +
 +    public boolean indexes(PartitionColumns columns)
 +    {
 +        return columns.contains(index.getDefinition());
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return index.getDefinition().compareTo(column) == 0;
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return dependsOn(column) && index.supports(operator);
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return filter.withoutExpressions();
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        // this is temporary (until proper QueryPlan is integrated into Cassandra)
 +        // and allows us to priority SASI indexes if any in the query since they
 +        // are going to be more efficient, to query and intersect, than built-in indexes.
 +        return Long.MIN_VALUE;
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {}
 +
++    @Override
++    public boolean supportsReplicaFilteringProtection(RowFilter rowFilter)
++    {
++        return false;
++    }
++
 +    public Indexer indexerFor(DecoratedKey key, PartitionColumns columns, int nowInSec, OpOrder.Group opGroup, IndexTransaction.Type transactionType)
 +    {
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {}
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {}
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {}
 +
 +            public void insertRow(Row row)
 +            {
 +                if (isNewData())
 +                    adjustMemtableSize(index.index(key, row), opGroup);
 +            }
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                insertRow(newRow);
 +            }
 +
 +            public void removeRow(Row row)
 +            {}
 +
 +            public void finish()
 +            {}
 +
 +            // we are only interested in the data from Memtable
 +            // everything else is going to be handled by SSTableWriter observers
 +            private boolean isNewData()
 +            {
 +                return transactionType == IndexTransaction.Type.UPDATE;
 +            }
 +
 +            public void adjustMemtableSize(long additionalSpace, OpOrder.Group opGroup)
 +            {
 +                baseCfs.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().allocate(additionalSpace, opGroup);
 +            }
 +        };
 +    }
 +
 +    public Searcher searcherFor(ReadCommand command) throws InvalidRequestException
 +    {
 +        CFMetaData config = command.metadata();
 +        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(config.cfId);
 +        return controller -> new QueryPlan(cfs, command, DatabaseDescriptor.getRangeRpcTimeout()).execute(controller);
 +    }
 +
 +    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationType opType)
 +    {
 +        return newWriter(baseCfs.metadata.getKeyValidator(), descriptor, Collections.singletonMap(index.getDefinition(), index), opType);
 +    }
 +
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public IndexBuildingSupport getBuildTaskSupport()
 +    {
 +        return INDEX_BUILDER_SUPPORT;
 +    }
 +
 +    public void handleNotification(INotification notification, Object sender)
 +    {
 +        // unfortunately, we can only check the type of notification via instanceof :(
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification notice = (SSTableAddedNotification) notification;
 +            index.update(Collections.<SSTableReader>emptyList(), Iterables.toList(notice.added));
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification notice = (SSTableListChangedNotification) notification;
 +            index.update(notice.removed, notice.added);
 +        }
 +        else if (notification instanceof MemtableRenewedNotification)
 +        {
 +            index.switchMemtable();
 +        }
 +        else if (notification instanceof MemtableSwitchedNotification)
 +        {
 +            index.switchMemtable(((MemtableSwitchedNotification) notification).memtable);
 +        }
 +        else if (notification instanceof MemtableDiscardedNotification)
 +        {
 +            index.discardMemtable(((MemtableDiscardedNotification) notification).memtable);
 +        }
 +    }
 +
 +    public ColumnIndex getIndex()
 +    {
 +        return index;
 +    }
 +
 +    protected static PerSSTableIndexWriter newWriter(AbstractType<?> keyValidator,
 +                                                     Descriptor descriptor,
 +                                                     Map<ColumnDefinition, ColumnIndex> indexes,
 +                                                     OperationType opType)
 +    {
 +        return new PerSSTableIndexWriter(keyValidator, descriptor, opType, indexes);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index eb2f7b0,cc17267..26dabe9
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -29,10 -29,9 +29,9 @@@ import com.google.common.collect.Iterab
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.*;
- import org.apache.cassandra.cql3.statements.IndexTarget;
  import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 -import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.filter.DataLimits.Counter;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.transform.*;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org