You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/24 13:19:56 UTC

[1/4] cassandra git commit: Merge branch 'cassandra-2.1' of https://git-wip-us.apache.org/repos/asf/cassandra into cassandra-2.1

Repository: cassandra
Updated Branches:
  refs/heads/trunk 065aeeb4a -> 41435ef6c


Merge branch 'cassandra-2.1' of https://git-wip-us.apache.org/repos/asf/cassandra into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35f173a0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35f173a0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35f173a0

Branch: refs/heads/trunk
Commit: 35f173a0e25f6ac01d79d57ed5992836238295db
Parents: 0d01c36 6ae1b42
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:38:30 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:38:30 2014 +0100

----------------------------------------------------------------------
 lib/jamm-0.3.0.jar | Bin 21149 -> 21033 bytes
 1 file changed, 0 insertions(+), 0 deletions(-)
----------------------------------------------------------------------



[4/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41435ef6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41435ef6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41435ef6

Branch: refs/heads/trunk
Commit: 41435ef6c1fec1cadf6606eb6eb66fe15bd8c46d
Parents: 065aeeb cab2b25
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Mon Nov 24 15:19:33 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Nov 24 15:19:33 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/UpdateStatement.java        | 17 +++-
 .../io/sstable/format/big/BigTableWriter.java   |  9 ++
 .../cql3/IndexedValuesValidationTest.java       | 86 ++++++++++++++++++++
 4 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/41435ef6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41435ef6/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41435ef6/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index ec53b4e,0000000..5221509
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,541 -1,0 +1,550 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.Closeable;
 +import java.io.DataInput;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +
 +        iwriter = new IndexWriter(keyCount);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +        }
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
 +        iwriter.append(decoratedKey, index);
 +        dbuilder.addPotentialBoundary(dataPosition);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long currentPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(currentPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
 +        afterAppend(row.key, currentPosition, entry);
 +        return entry;
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
++        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
++        {
++            logger.error("Key size {} exceeds maximum of {}, skipping row",
++                         decoratedKey.getKey().remaining(),
++                         FBUtilities.MAX_UNSIGNED_SHORT);
++            return;
++        }
++
 +        long startPosition = beforeAppend(decoratedKey);
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
 +            afterAppend(decoratedKey, startPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
 +        {
 +            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
 +                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
 +     */
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
 +        if (iwriter != null)
 +        {
 +            FileUtils.closeQuietly(iwriter.indexFile);
 +            if (closeBf)
 +            {
 +                iwriter.bf.close();
 +            }
 +        }
 +        if (dataFile!= null)
 +            FileUtils.closeQuietly(dataFile);
 +
 +        Set<Component> components = SSTable.componentsFor(descriptor);
 +        try
 +        {
 +            if (!components.isEmpty())
 +                SSTable.delete(descriptor, components);
 +        }
 +        catch (FSWriteError e)
 +        {
 +            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
 +            throw e;
 +        }
 +    }
 +
 +    // we use this method to ensure any managed data we may have retained references to during the write are no
 +    // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
 +    public void isolateReferences()
 +    {
 +        // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
 +        // data retention is done through copying
 +        first = getMinimalKey(first);
 +        last = lastWrittenKey = getMinimalKey(last);
 +    }
 +
 +    public SSTableReader openEarly(long maxDataAge)
 +    {
 +        StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                                  metadata.getBloomFilterFpChance(),
 +                                                  repairedAt).get(MetadataType.STATS);
 +
 +        // find the max (exclusive) readable key
 +        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
 +        if (exclusiveUpperBoundOfReadableIndex == null)
 +            return null;
 +
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
 +        }
 +
 +        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
 +        SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
 +        SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
 +        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, metadata,
 +                                                           partitioner, ifile,
 +                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
 +                                                           iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
 +        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
 +        if (inclusiveUpperBoundOfReadableData == null)
 +            return null;
 +        int offset = 2;
 +        while (true)
 +        {
 +            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
 +            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
 +                break;
 +            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
 +            if (inclusiveUpperBoundOfReadableData == null)
 +                return null;
 +        }
 +        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
 +        return sstable;
 +    }
 +
 +    public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
 +    {
 +        Pair<Descriptor, StatsMetadata> p = close(repairedAt);
 +        Descriptor newdesc = p.left;
 +        StatsMetadata sstableMetadata = p.right;
 +
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
 +        SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
 +        SSTableReader sstable = SSTableReader.internalOpen(newdesc,
 +                                                           components,
 +                                                           metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           iwriter.summary.build(partitioner),
 +                                                           iwriter.bf,
 +                                                           maxDataAge,
 +                                                           sstableMetadata,
 +                                                           SSTableReader.OpenReason.NORMAL);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +        // try to save the summaries to disk
 +        sstable.saveSummary(iwriter.builder, dbuilder);
 +        iwriter = null;
 +        dbuilder = null;
 +        return sstable;
 +    }
 +
 +    // Close the writer and return the descriptor to the new sstable and it's metadata
 +    public Pair<Descriptor, StatsMetadata> close()
 +    {
 +        return close(this.repairedAt);
 +    }
 +
 +    private Pair<Descriptor, StatsMetadata> close(long repairedAt)
 +    {
 +
 +        // index and filter
 +        iwriter.close();
 +        // main data, close will truncate if necessary
 +        dataFile.close();
 +        dataFile.writeFullChecksum(descriptor);
 +        // write sstable statistics
 +        Map<MetadataType, MetadataComponent> metadataComponents = metadataCollector.finalizeMetadata(
 +                                                                                    partitioner.getClass().getCanonicalName(),
 +                                                                                    metadata.getBloomFilterFpChance(),
 +                                                                                    repairedAt);
 +        writeMetadata(descriptor, metadataComponents);
 +
 +        // save the table of components
 +        SSTable.appendTOC(descriptor, components);
 +
 +        // remove the 'tmp' marker from all components
 +        return Pair.create(SSTableWriter.rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
 +
 +    }
 +
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
 +    {
 +        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
 +        try
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +        finally
 +        {
 +            out.close();
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
 +     */
 +    class IndexWriter implements Closeable
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount)
 +        {
 +            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
 +        DecoratedKey getMaxReadableKey(int offset)
 +        {
 +            long maxIndexLength = indexFile.getLastFlushOffset();
 +            return summary.getMaxReadableKey(maxIndexLength, offset);
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry)
 +        {
 +            bf.add(key.getKey());
 +            long indexPosition = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
 +
 +            summary.maybeAddEntry(key, indexPosition);
 +            builder.addPotentialBoundary(indexPosition);
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
 +         */
 +        public void close()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try
 +                {
 +                    // bloom filter
 +                    FileOutputStream fos = new FileOutputStream(path);
 +                    DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    fos.getFD().sync();
 +                    stream.close();
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +
 +            // index
 +            long position = indexFile.getFilePointer();
 +            indexFile.close(); // calls force
 +            FileUtils.truncate(indexFile.getPath(), position);
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +    }
 +}


[2/4] cassandra git commit: Validate size of indexed column values

Posted by al...@apache.org.
Validate size of indexed column values

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-8280


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e3d9fc1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e3d9fc1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e3d9fc1

Branch: refs/heads/trunk
Commit: 0e3d9fc14bfcb38b9f179c0428cf586890c4a8ab
Parents: 2ce1ad8
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Mon Nov 24 14:50:14 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Nov 24 14:50:14 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/cql3/CFDefinition.java |   5 +
 .../cassandra/cql3/ColumnNameBuilder.java       |   7 ++
 .../cql3/statements/ModificationStatement.java  |  20 +--
 .../cql3/statements/UpdateStatement.java        |  22 +++-
 .../db/index/SecondaryIndexManager.java         |   6 +-
 .../cassandra/db/marshal/CompositeType.java     |   9 ++
 .../cassandra/io/sstable/SSTableWriter.java     |   9 ++
 .../cql3/IndexedValuesValidationTest.java       | 124 +++++++++++++++++++
 9 files changed, 192 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a5ac0d..412eb59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Validate size of indexed column values (CASSANDRA-8280)
  * Make LCS split compaction results over all data directories (CASSANDRA-8329)
  * Fix some failing queries that use multi-column relations
    on COMPACT STORAGE tables (CASSANDRA-8264)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 23bedaf..e0bb409 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -358,5 +358,10 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
 
             return columnName;
         }
+
+        public int getLength()
+        {
+            return columnName == null ? 0 : columnName.remaining();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
index 3d5eff6..50cdc74 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
@@ -78,4 +78,11 @@ public interface ColumnNameBuilder
      */
     public ByteBuffer getComponent(int i);
 
+    /**
+     * Returns the total length of the ByteBuffer that will
+     * be returned by build().
+     * @return the total length of the column name to be built
+     */
+    public int getLength();
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 61f65c1..db22e7d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,8 +22,6 @@ import java.util.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +41,9 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.github.jamm.MemoryMeter;
 
 /*
  * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
@@ -328,7 +328,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     throws InvalidRequestException
     {
         CFDefinition cfDef = cfm.getCfDef();
-        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+        ColumnNameBuilder keyBuilderBase = cfDef.getKeyNameBuilder();
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
         for (CFDefinition.Name name : cfDef.partitionKeys())
         {
@@ -337,14 +337,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
 
             List<ByteBuffer> values = r.values(variables);
-
-            if (keyBuilder.remainingCount() == 1)
+            if (keyBuilderBase.remainingCount() == 1)
             {
                 for (ByteBuffer val : values)
                 {
                     if (val == null)
                         throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                    ByteBuffer key = keyBuilder.copy().add(val).build();
+
+                    ColumnNameBuilder keyBuilder = keyBuilderBase.copy().add(val);
+                    if (keyBuilder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT)
+                        throw new InvalidRequestException(String.format("Partition key size %s exceeds maximum %s",
+                                                                        keyBuilder.getLength(),
+                                                                        FBUtilities.MAX_UNSIGNED_SHORT));
+                    ByteBuffer key = keyBuilder.build();
                     ThriftValidation.validateKey(cfm, key);
                     keys.add(key);
                 }
@@ -356,7 +361,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 ByteBuffer val = values.get(0);
                 if (val == null)
                     throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                keyBuilder.add(val);
+                keyBuilderBase.add(val);
             }
         }
         return keys;
@@ -727,7 +732,6 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
         {
-            ThriftValidation.validateKey(cfm, key);
             ColumnFamily cf = UnsortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
             RowMutation rm = new RowMutation(cfm.ksName, key, cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index e2da251..9d98c84 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -23,9 +23,10 @@ import java.util.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -51,6 +52,11 @@ public class UpdateStatement extends ModificationStatement
     {
         CFDefinition cfDef = cfm.getCfDef();
 
+        if (builder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT)
+            throw new InvalidRequestException(String.format("The sum of all clustering columns is too long (%s > %s)",
+                                                            builder.getLength(),
+                                                            FBUtilities.MAX_UNSIGNED_SHORT));
+
         // Inserting the CQL row marker (see #4361)
         // We always need to insert a marker for INSERT, because of the following situation:
         //   CREATE TABLE t ( k int PRIMARY KEY, c text );
@@ -99,6 +105,20 @@ public class UpdateStatement extends ModificationStatement
             for (Operation update : updates)
                 update.execute(key, cf, builder.copy(), params);
         }
+
+        SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager;
+        if (indexManager.hasIndexes())
+        {
+            for (Column column : cf)
+            {
+                if (!indexManager.validate(column))
+                    throw new InvalidRequestException(String.format("Can't index column value of size %d for index %s on %s.%s",
+                                                                    column.value().remaining(),
+                                                                    cfm.getColumnDefinitionFromColumnName(column.name()).getIndexName(),
+                                                                    cfm.ksName,
+                                                                    cfm.cfName));
+            }
+        }
     }
 
     public static class ParsedInsert extends ModificationStatement.Parsed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 6d9f28a..fda79f8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -574,8 +574,10 @@ public class SecondaryIndexManager
 
     public boolean validate(Column column)
     {
-        SecondaryIndex index = getIndexForColumn(column.name());
-        return index == null || index.validate(column);
+        for (SecondaryIndex index : indexFor(column.name()))
+            if (!index.validate(column))
+                return false;
+        return true;
     }
 
     public static interface Updater

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 946ba24..f0d9d9b 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -504,5 +504,14 @@ public class CompositeType extends AbstractCompositeType
 
             return components.get(i);
         }
+
+        public int getLength()
+        {
+            int length = 0;
+            for (ByteBuffer component : components)
+                length += component.remaining() + 3; // length + 2 bytes for length + EOC
+
+            return length;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4619ddc..afa066d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
 import org.apache.cassandra.utils.Pair;
@@ -181,6 +182,14 @@ public class SSTableWriter extends SSTable
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
     {
+        if (decoratedKey.key.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+        {
+            logger.error("Key size {} exceeds maximum of {}, skipping row",
+                         decoratedKey.key.remaining(),
+                         FBUtilities.MAX_UNSIGNED_SHORT);
+            return;
+        }
+
         long startPosition = beforeAppend(decoratedKey);
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
new file mode 100644
index 0000000..9c2bc0f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.MD5Digest;
+
+import static org.junit.Assert.fail;
+import static org.apache.cassandra.cql3.QueryProcessor.process;
+
+public class IndexedValuesValidationTest
+{
+    static ClientState clientState;
+    static String keyspace = "indexed_value_validation_test";
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        executeSchemaChange("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
+        clientState = ClientState.forInternalCalls();
+    }
+
+    // CASSANDRA-8280/8081
+    // reject updates with indexed values where value > 64k
+    @Test
+    public void testIndexOnCompositeValueOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.composite_index_table (a int, b int, c blob, PRIMARY KEY (a))");
+        executeSchemaChange("CREATE INDEX ON %s.composite_index_table(c)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.composite_index_table (a, b, c) VALUES (0, 0, ?)");
+    }
+
+    @Test
+    public void testIndexOnClusteringValueOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.ck_index_table (a int, b blob, c int, PRIMARY KEY (a, b))");
+        executeSchemaChange("CREATE INDEX ON %s.ck_index_table(b)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.ck_index_table (a, b, c) VALUES (0, ?, 0)");
+    }
+
+    @Test
+    public void testIndexOnPartitionKeyOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.pk_index_table (a blob, b int, c int, PRIMARY KEY ((a, b)))");
+        executeSchemaChange("CREATE INDEX ON %s.pk_index_table(a)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.pk_index_table (a, b, c) VALUES (?, 0, 0)");
+    }
+
+    @Test
+    public void testCompactTableWithValueOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.compact_table (a int, b blob, PRIMARY KEY (a)) WITH COMPACT STORAGE");
+        executeSchemaChange("CREATE INDEX ON %s.compact_table(b)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.compact_table (a, b) VALUES (0, ?)");
+    }
+
+    private static void performInsertWithIndexedValueOver64k(String insertCQL) throws Exception
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024 * 65);
+        buf.clear();
+        for (int i=0; i<1024 + 1; i++)
+            buf.put((byte)0);
+
+        try
+        {
+            execute(String.format(insertCQL, keyspace), buf);
+            fail("Expected statement to fail validation");
+        }
+        catch (InvalidRequestException e)
+        {
+            // as expected
+        }
+    }
+
+    private static void execute(String query, ByteBuffer value) throws RequestValidationException, RequestExecutionException
+    {
+        MD5Digest statementId = QueryProcessor.prepare(String.format(query, keyspace), clientState, false).statementId;
+        CQLStatement statement = QueryProcessor.instance.getPrepared(statementId);
+        statement.executeInternal(QueryState.forInternalCalls(),
+                                  new QueryOptions(ConsistencyLevel.ONE, Collections.singletonList(value)));
+    }
+
+    private static void executeSchemaChange(String query) throws Throwable
+    {
+        try
+        {
+            process(String.format(query, keyspace), ConsistencyLevel.ONE);
+        }
+        catch (RuntimeException exc)
+        {
+            throw exc.getCause();
+        }
+    }
+}
+


[3/4] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/cql3/CFDefinition.java
	src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
	src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
	src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
	src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cab2b25b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cab2b25b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cab2b25b

Branch: refs/heads/trunk
Commit: cab2b25b0a5b2029a9c3e1324d080b4982fbdc50
Parents: 35f173a 0e3d9fc
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Mon Nov 24 15:14:14 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Nov 24 15:14:14 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/UpdateStatement.java        | 17 +++-
 .../cassandra/io/sstable/SSTableWriter.java     |  9 ++
 .../cql3/IndexedValuesValidationTest.java       | 86 ++++++++++++++++++++
 4 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cab2b25b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 313000a,412eb59..9db65e9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
 -2.0.12:
 +2.1.3
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 +Merged from 2.0:
+  * Validate size of indexed column values (CASSANDRA-8280)
   * Make LCS split compaction results over all data directories (CASSANDRA-8329)
   * Fix some failing queries that use multi-column relations
     on COMPACT STORAGE tables (CASSANDRA-8264)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cab2b25b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 2c87173,9d98c84..09f26d6
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@@ -22,12 -22,11 +22,12 @@@ import java.util.*
  
  import org.apache.cassandra.cql3.*;
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.Composite;
- import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.index.SecondaryIndexManager;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
  /**
@@@ -95,8 -103,22 +95,23 @@@ public class UpdateStatement extends Mo
          else
          {
              for (Operation update : updates)
 -                update.execute(key, cf, builder.copy(), params);
 +                update.execute(key, cf, prefix, params);
          }
+ 
+         SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager;
+         if (indexManager.hasIndexes())
+         {
 -            for (Column column : cf)
++            for (Cell cell : cf)
+             {
 -                if (!indexManager.validate(column))
++                // Indexed values must be validated by any applicable index. See CASSANDRA-3057/4240/8081 for more details
++                if (!indexManager.validate(cell))
+                     throw new InvalidRequestException(String.format("Can't index column value of size %d for index %s on %s.%s",
 -                                                                    column.value().remaining(),
 -                                                                    cfm.getColumnDefinitionFromColumnName(column.name()).getIndexName(),
++                                                                    cell.value().remaining(),
++                                                                    cfm.getColumnDefinition(cell.name()).getIndexName(),
+                                                                     cfm.ksName,
+                                                                     cfm.cfName));
+             }
+         }
      }
  
      public static class ParsedInsert extends ModificationStatement.Parsed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cab2b25b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 0f95a9b,afa066d..53176e3
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -52,18 -33,10 +52,19 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 -import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.FilterFactory;
  import org.apache.cassandra.utils.IFilter;
  import org.apache.cassandra.utils.Pair;
@@@ -212,6 -182,14 +213,14 @@@ public class SSTableWriter extends SSTa
  
      public void append(DecoratedKey decoratedKey, ColumnFamily cf)
      {
 -        if (decoratedKey.key.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
++        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+         {
+             logger.error("Key size {} exceeds maximum of {}, skipping row",
 -                         decoratedKey.key.remaining(),
++                         decoratedKey.getKey().remaining(),
+                          FBUtilities.MAX_UNSIGNED_SHORT);
+             return;
+         }
+ 
          long startPosition = beforeAppend(decoratedKey);
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cab2b25b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
index 0000000,9c2bc0f..05acf86
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
@@@ -1,0 -1,124 +1,86 @@@
+ /*
+  *
+  *  * Licensed to the Apache Software Foundation (ASF) under one
+  *  * or more contributor license agreements.  See the NOTICE file
+  *  * distributed with this work for additional information
+  *  * regarding copyright ownership.  The ASF licenses this file
+  *  * to you under the Apache License, Version 2.0 (the
+  *  * "License"); you may not use this file except in compliance
+  *  * with the License.  You may obtain a copy of the License at
+  *  *
+  *  *     http://www.apache.org/licenses/LICENSE-2.0
+  *  *
+  *  * Unless required by applicable law or agreed to in writing, software
+  *  * distributed under the License is distributed on an "AS IS" BASIS,
+  *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  *  * See the License for the specific language governing permissions and
+  *  * limitations under the License.
+  *
+  */
++
+ package org.apache.cassandra.cql3;
+ 
+ import java.nio.ByteBuffer;
 -import java.util.Collections;
 -import org.junit.BeforeClass;
++
+ import org.junit.Test;
+ 
 -import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.exceptions.RequestExecutionException;
 -import org.apache.cassandra.exceptions.RequestValidationException;
 -import org.apache.cassandra.service.ClientState;
 -import org.apache.cassandra.service.QueryState;
 -import org.apache.cassandra.utils.MD5Digest;
+ 
+ import static org.junit.Assert.fail;
 -import static org.apache.cassandra.cql3.QueryProcessor.process;
+ 
 -public class IndexedValuesValidationTest
++public class IndexedValuesValidationTest extends CQLTester
+ {
 -    static ClientState clientState;
 -    static String keyspace = "indexed_value_validation_test";
 -
 -    @BeforeClass
 -    public static void setUpClass() throws Throwable
 -    {
 -        SchemaLoader.loadSchema();
 -        executeSchemaChange("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
 -        clientState = ClientState.forInternalCalls();
 -    }
 -
+     // CASSANDRA-8280/8081
+     // reject updates with indexed values where value > 64k
+     @Test
+     public void testIndexOnCompositeValueOver64k() throws Throwable
+     {
 -        executeSchemaChange("CREATE TABLE %s.composite_index_table (a int, b int, c blob, PRIMARY KEY (a))");
 -        executeSchemaChange("CREATE INDEX ON %s.composite_index_table(c)");
 -        performInsertWithIndexedValueOver64k("INSERT INTO %s.composite_index_table (a, b, c) VALUES (0, 0, ?)");
++        createTable("CREATE TABLE %s(a int, b int, c blob, PRIMARY KEY (a))");
++        createIndex("CREATE INDEX ON %s(c)");
++        performInsertWithIndexedValueOver64k("INSERT INTO %s (a, b, c) VALUES (0, 0, ?)");
+     }
+ 
+     @Test
+     public void testIndexOnClusteringValueOver64k() throws Throwable
+     {
 -        executeSchemaChange("CREATE TABLE %s.ck_index_table (a int, b blob, c int, PRIMARY KEY (a, b))");
 -        executeSchemaChange("CREATE INDEX ON %s.ck_index_table(b)");
 -        performInsertWithIndexedValueOver64k("INSERT INTO %s.ck_index_table (a, b, c) VALUES (0, ?, 0)");
++        createTable("CREATE TABLE %s(a int, b blob, c int, PRIMARY KEY (a, b))");
++        createIndex("CREATE INDEX ON %s(b)");
++        performInsertWithIndexedValueOver64k("INSERT INTO %s (a, b, c) VALUES (0, ?, 0)");
+     }
+ 
+     @Test
+     public void testIndexOnPartitionKeyOver64k() throws Throwable
+     {
 -        executeSchemaChange("CREATE TABLE %s.pk_index_table (a blob, b int, c int, PRIMARY KEY ((a, b)))");
 -        executeSchemaChange("CREATE INDEX ON %s.pk_index_table(a)");
 -        performInsertWithIndexedValueOver64k("INSERT INTO %s.pk_index_table (a, b, c) VALUES (?, 0, 0)");
++        createTable("CREATE TABLE %s(a blob, b int, c int, PRIMARY KEY ((a, b)))");
++        createIndex("CREATE INDEX ON %s(a)");
++        performInsertWithIndexedValueOver64k("INSERT INTO %s (a, b, c) VALUES (?, 0, 0)");
+     }
+ 
+     @Test
+     public void testCompactTableWithValueOver64k() throws Throwable
+     {
 -        executeSchemaChange("CREATE TABLE %s.compact_table (a int, b blob, PRIMARY KEY (a)) WITH COMPACT STORAGE");
 -        executeSchemaChange("CREATE INDEX ON %s.compact_table(b)");
 -        performInsertWithIndexedValueOver64k("INSERT INTO %s.compact_table (a, b) VALUES (0, ?)");
++        createTable("CREATE TABLE %s(a int, b blob, PRIMARY KEY (a)) WITH COMPACT STORAGE");
++        createIndex("CREATE INDEX ON %s(b)");
++        performInsertWithIndexedValueOver64k("INSERT INTO %s (a, b) VALUES (0, ?)");
+     }
+ 
 -    private static void performInsertWithIndexedValueOver64k(String insertCQL) throws Exception
++    public void performInsertWithIndexedValueOver64k(String insertCQL) throws Throwable
+     {
+         ByteBuffer buf = ByteBuffer.allocate(1024 * 65);
+         buf.clear();
++
++        //read more than 64k
+         for (int i=0; i<1024 + 1; i++)
+             buf.put((byte)0);
+ 
+         try
+         {
 -            execute(String.format(insertCQL, keyspace), buf);
++            execute(insertCQL, buf);
+             fail("Expected statement to fail validation");
+         }
+         catch (InvalidRequestException e)
+         {
+             // as expected
+         }
+     }
 -
 -    private static void execute(String query, ByteBuffer value) throws RequestValidationException, RequestExecutionException
 -    {
 -        MD5Digest statementId = QueryProcessor.prepare(String.format(query, keyspace), clientState, false).statementId;
 -        CQLStatement statement = QueryProcessor.instance.getPrepared(statementId);
 -        statement.executeInternal(QueryState.forInternalCalls(),
 -                                  new QueryOptions(ConsistencyLevel.ONE, Collections.singletonList(value)));
 -    }
 -
 -    private static void executeSchemaChange(String query) throws Throwable
 -    {
 -        try
 -        {
 -            process(String.format(query, keyspace), ConsistencyLevel.ONE);
 -        }
 -        catch (RuntimeException exc)
 -        {
 -            throw exc.getCause();
 -        }
 -    }
+ }
 -