You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:45 UTC

[12/20] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/JsonTransformer.java
index 364070e,0000000..3deed96
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@@ -1,536 -1,0 +1,556 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.tools;
 +
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.io.OutputStreamWriter;
 +import java.nio.ByteBuffer;
 +import java.time.Instant;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Stream;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.ClusteringPrefix;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.LivenessInfo;
 +import org.apache.cassandra.db.RangeTombstone;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.CollectionType;
 +import org.apache.cassandra.db.marshal.CompositeType;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.ColumnData;
 +import org.apache.cassandra.db.rows.ComplexColumnData;
 +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
 +import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.Unfiltered;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.codehaus.jackson.JsonFactory;
 +import org.codehaus.jackson.JsonGenerator;
 +import org.codehaus.jackson.impl.Indenter;
 +import org.codehaus.jackson.util.DefaultPrettyPrinter;
 +import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public final class JsonTransformer
 +{
 +
 +    private static final Logger logger = LoggerFactory.getLogger(JsonTransformer.class);
 +
 +    private static final JsonFactory jsonFactory = new JsonFactory();
 +
 +    private final JsonGenerator json;
 +
 +    private final CompactIndenter objectIndenter = new CompactIndenter();
 +
 +    private final CompactIndenter arrayIndenter = new CompactIndenter();
 +
 +    private final CFMetaData metadata;
 +
 +    private final ISSTableScanner currentScanner;
 +
 +    private boolean rawTime = false;
 +
 +    private long currentPosition = 0;
 +
 +    private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, CFMetaData metadata)
 +    {
 +        this.json = json;
 +        this.metadata = metadata;
 +        this.currentScanner = currentScanner;
 +        this.rawTime = rawTime;
 +
 +        DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
 +        prettyPrinter.indentObjectsWith(objectIndenter);
 +        prettyPrinter.indentArraysWith(arrayIndenter);
 +        json.setPrettyPrinter(prettyPrinter);
 +    }
 +
 +    public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, CFMetaData metadata, OutputStream out)
 +            throws IOException
 +    {
 +        try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8")))
 +        {
 +            JsonTransformer transformer = new JsonTransformer(json, currentScanner, rawTime, metadata);
 +            json.writeStartArray();
 +            partitions.forEach(transformer::serializePartition);
 +            json.writeEndArray();
 +        }
 +    }
 +
 +    public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, CFMetaData metadata, OutputStream out) throws IOException
 +    {
 +        try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8")))
 +        {
 +            JsonTransformer transformer = new JsonTransformer(json, currentScanner, rawTime, metadata);
 +            json.writeStartArray();
 +            keys.forEach(transformer::serializePartitionKey);
 +            json.writeEndArray();
 +        }
 +    }
 +
 +    private void updatePosition()
 +    {
 +        this.currentPosition = currentScanner.getCurrentPosition();
 +    }
 +
 +    private void serializePartitionKey(DecoratedKey key)
 +    {
 +        AbstractType<?> keyValidator = metadata.getKeyValidator();
 +        objectIndenter.setCompact(true);
 +        try
 +        {
 +            arrayIndenter.setCompact(true);
 +            json.writeStartArray();
 +            if (keyValidator instanceof CompositeType)
 +            {
 +                // if a composite type, the partition has multiple keys.
 +                CompositeType compositeType = (CompositeType) keyValidator;
 +                ByteBuffer keyBytes = key.getKey().duplicate();
 +                // Skip static data if it exists.
 +                if (keyBytes.remaining() >= 2)
 +                {
 +                    int header = ByteBufferUtil.getShortLength(keyBytes, keyBytes.position());
 +                    if ((header & 0xFFFF) == 0xFFFF)
 +                    {
 +                        ByteBufferUtil.readShortLength(keyBytes);
 +                    }
 +                }
 +
 +                int i = 0;
 +                while (keyBytes.remaining() > 0 && i < compositeType.getComponents().size())
 +                {
 +                    AbstractType<?> colType = compositeType.getComponents().get(i);
 +
 +                    ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(keyBytes);
 +                    String colValue = colType.getString(value);
 +
 +                    json.writeString(colValue);
 +
 +                    byte b = keyBytes.get();
 +                    if (b != 0)
 +                    {
 +                        break;
 +                    }
 +                    ++i;
 +                }
 +            }
 +            else
 +            {
 +                // if not a composite type, assume a single column partition key.
 +                assert metadata.partitionKeyColumns().size() == 1;
 +                json.writeString(keyValidator.getString(key.getKey()));
 +            }
 +            json.writeEndArray();
 +            objectIndenter.setCompact(false);
 +            arrayIndenter.setCompact(false);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Failure serializing partition key.", e);
 +        }
 +    }
 +
 +    private void serializePartition(UnfilteredRowIterator partition)
 +    {
 +        String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey());
 +        try
 +        {
 +            json.writeStartObject();
 +
 +            json.writeFieldName("partition");
 +            json.writeStartObject();
 +            json.writeFieldName("key");
 +            serializePartitionKey(partition.partitionKey());
 +            json.writeNumberField("position", this.currentScanner.getCurrentPosition());
 +
 +            if (!partition.partitionLevelDeletion().isLive())
 +            {
 +                serializeDeletion(partition.partitionLevelDeletion());
 +            }
 +            else
 +            {
 +                json.writeEndObject();
 +                json.writeFieldName("rows");
 +                json.writeStartArray();
 +                updatePosition();
 +                if (!partition.staticRow().isEmpty())
 +                {
 +                    serializeRow(partition.staticRow());
 +                }
 +                Unfiltered unfiltered;
 +                updatePosition();
 +                while (partition.hasNext())
 +                {
 +                    unfiltered = partition.next();
 +                    if (unfiltered instanceof Row)
 +                    {
 +                        serializeRow((Row) unfiltered);
 +                    }
 +                    else if (unfiltered instanceof RangeTombstoneMarker)
 +                    {
 +                        serializeTombstone((RangeTombstoneMarker) unfiltered);
 +                    }
 +                    updatePosition();
 +                }
 +                json.writeEndArray();
 +            }
 +
 +            json.writeEndObject();
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Fatal error parsing partition: {}", key, e);
 +        }
 +    }
 +
 +    private void serializeRow(Row row)
 +    {
 +        try
 +        {
 +            json.writeStartObject();
 +            String rowType = row.isStatic() ? "static_block" : "row";
 +            json.writeFieldName("type");
 +            json.writeString(rowType);
 +            json.writeNumberField("position", this.currentPosition);
 +
 +            // Only print clustering information for non-static rows.
 +            if (!row.isStatic())
 +            {
 +                serializeClustering(row.clustering());
 +            }
 +
 +            LivenessInfo liveInfo = row.primaryKeyLivenessInfo();
 +            if (!liveInfo.isEmpty())
 +            {
 +                objectIndenter.setCompact(false);
 +                json.writeFieldName("liveness_info");
 +                objectIndenter.setCompact(true);
 +                json.writeStartObject();
 +                json.writeFieldName("tstamp");
 +                json.writeString(dateString(TimeUnit.MICROSECONDS, liveInfo.timestamp()));
 +                if (liveInfo.isExpiring())
 +                {
 +                    json.writeNumberField("ttl", liveInfo.ttl());
 +                    json.writeFieldName("expires_at");
 +                    json.writeString(dateString(TimeUnit.SECONDS, liveInfo.localExpirationTime()));
 +                    json.writeFieldName("expired");
 +                    json.writeBoolean(liveInfo.localExpirationTime() < (System.currentTimeMillis() / 1000));
 +                }
 +                json.writeEndObject();
 +                objectIndenter.setCompact(false);
 +            }
 +
 +            // If this is a deletion, indicate that, otherwise write cells.
 +            if (!row.deletion().isLive())
 +            {
 +                serializeDeletion(row.deletion().time());
 +            }
 +            json.writeFieldName("cells");
 +            json.writeStartArray();
 +            for (ColumnData cd : row)
 +            {
 +                serializeColumnData(cd, liveInfo);
 +            }
 +            json.writeEndArray();
 +            json.writeEndObject();
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Fatal error parsing row.", e);
 +        }
 +    }
 +
 +    private void serializeTombstone(RangeTombstoneMarker tombstone)
 +    {
 +        try
 +        {
 +            json.writeStartObject();
 +            json.writeFieldName("type");
 +
 +            if (tombstone instanceof RangeTombstoneBoundMarker)
 +            {
 +                json.writeString("range_tombstone_bound");
 +                RangeTombstoneBoundMarker bm = (RangeTombstoneBoundMarker) tombstone;
 +                serializeBound(bm.clustering(), bm.deletionTime());
 +            }
 +            else
 +            {
 +                assert tombstone instanceof RangeTombstoneBoundaryMarker;
 +                json.writeString("range_tombstone_boundary");
 +                RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker) tombstone;
 +                serializeBound(bm.openBound(false), bm.openDeletionTime(false));
 +                serializeBound(bm.closeBound(false), bm.closeDeletionTime(false));
 +            }
 +            json.writeEndObject();
 +            objectIndenter.setCompact(false);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Failure parsing tombstone.", e);
 +        }
 +    }
 +
 +    private void serializeBound(RangeTombstone.Bound bound, DeletionTime deletionTime) throws IOException
 +    {
 +        json.writeFieldName(bound.isStart() ? "start" : "end");
 +        json.writeStartObject();
 +        json.writeFieldName("type");
 +        json.writeString(bound.isInclusive() ? "inclusive" : "exclusive");
 +        serializeClustering(bound.clustering());
 +        serializeDeletion(deletionTime);
 +        json.writeEndObject();
 +    }
 +
 +    private void serializeClustering(ClusteringPrefix clustering) throws IOException
 +    {
 +        if (clustering.size() > 0)
 +        {
 +            json.writeFieldName("clustering");
 +            objectIndenter.setCompact(true);
 +            json.writeStartArray();
 +            arrayIndenter.setCompact(true);
 +            List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns();
 +            for (int i = 0; i < clusteringColumns.size(); i++)
 +            {
 +                ColumnDefinition column = clusteringColumns.get(i);
 +                if (i >= clustering.size())
 +                {
 +                    json.writeString("*");
 +                }
 +                else
 +                {
 +                    json.writeString(column.cellValueType().getString(clustering.get(i)));
 +                }
 +            }
 +            json.writeEndArray();
 +            objectIndenter.setCompact(false);
 +            arrayIndenter.setCompact(false);
 +        }
 +    }
 +
 +    private void serializeDeletion(DeletionTime deletion) throws IOException
 +    {
 +        json.writeFieldName("deletion_info");
 +        objectIndenter.setCompact(true);
 +        json.writeStartObject();
 +        json.writeFieldName("marked_deleted");
 +        json.writeString(dateString(TimeUnit.MICROSECONDS, deletion.markedForDeleteAt()));
 +        json.writeFieldName("local_delete_time");
 +        json.writeString(dateString(TimeUnit.SECONDS, deletion.localDeletionTime()));
 +        json.writeEndObject();
 +        objectIndenter.setCompact(false);
 +    }
 +
 +    private void serializeColumnData(ColumnData cd, LivenessInfo liveInfo)
 +    {
 +        if (cd.column().isSimple())
 +        {
 +            serializeCell((Cell) cd, liveInfo);
 +        }
 +        else
 +        {
 +            ComplexColumnData complexData = (ComplexColumnData) cd;
 +            if (!complexData.complexDeletion().isLive())
 +            {
 +                try
 +                {
 +                    objectIndenter.setCompact(true);
 +                    json.writeStartObject();
 +                    json.writeFieldName("name");
 +                    AbstractType<?> type = cd.column().type;
 +                    json.writeString(cd.column().name.toCQLString());
 +                    serializeDeletion(complexData.complexDeletion());
 +                    objectIndenter.setCompact(true);
 +                    json.writeEndObject();
 +                    objectIndenter.setCompact(false);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.error("Failure parsing ColumnData.", e);
 +                }
 +            }
 +            for (Cell cell : complexData){
 +                serializeCell(cell, liveInfo);
 +            }
 +        }
 +    }
 +
 +    private void serializeCell(Cell cell, LivenessInfo liveInfo)
 +    {
 +        try
 +        {
 +            json.writeStartObject();
 +            objectIndenter.setCompact(true);
 +            json.writeFieldName("name");
 +            AbstractType<?> type = cell.column().type;
 +            json.writeString(cell.column().name.toCQLString());
 +
 +            if (cell.path() != null && cell.path().size() > 0)
 +            {
 +                CollectionType ct = (CollectionType) type;
 +                json.writeFieldName("path");
 +                arrayIndenter.setCompact(true);
 +                json.writeStartArray();
 +                for (int i = 0; i < cell.path().size(); i++)
 +                {
 +                    json.writeString(ct.nameComparator().getString(cell.path().get(i)));
 +                }
 +                json.writeEndArray();
 +                arrayIndenter.setCompact(false);
 +            }
 +            if (cell.isTombstone())
 +            {
 +                json.writeFieldName("deletion_info");
 +                objectIndenter.setCompact(true);
 +                json.writeStartObject();
 +                json.writeFieldName("local_delete_time");
 +                json.writeString(dateString(TimeUnit.SECONDS, cell.localDeletionTime()));
 +                json.writeEndObject();
 +                objectIndenter.setCompact(false);
 +            }
 +            else
 +            {
 +                json.writeFieldName("value");
 +                json.writeString(cell.column().cellValueType().getString(cell.value()));
 +            }
 +            if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
 +            {
 +                json.writeFieldName("tstamp");
 +                json.writeString(dateString(TimeUnit.MICROSECONDS, cell.timestamp()));
 +            }
 +            if (cell.isExpiring() && (liveInfo.isEmpty() || cell.ttl() != liveInfo.ttl()))
 +            {
 +                json.writeFieldName("ttl");
 +                json.writeNumber(cell.ttl());
 +                json.writeFieldName("expires_at");
 +                json.writeString(dateString(TimeUnit.SECONDS, cell.localDeletionTime()));
 +                json.writeFieldName("expired");
 +                json.writeBoolean(!cell.isLive((int) (System.currentTimeMillis() / 1000)));
 +            }
 +            json.writeEndObject();
 +            objectIndenter.setCompact(false);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Failure parsing cell.", e);
 +        }
 +    }
 +
 +    private String dateString(TimeUnit from, long time)
 +    {
 +        long secs = from.toSeconds(time);
 +        long offset = Math.floorMod(from.toNanos(time), 1000_000_000L); // nanos per sec
 +        return rawTime? Long.toString(time) : Instant.ofEpochSecond(secs, offset).toString();
 +    }
 +
 +    /**
 +     * A specialized {@link Indenter} that enables a 'compact' mode which puts all subsequent json values on the same
 +     * line. This is manipulated via {@link CompactIndenter#setCompact(boolean)}
 +     */
 +    private static final class CompactIndenter extends NopIndenter
 +    {
 +
 +        private static final int INDENT_LEVELS = 16;
 +        private final char[] indents;
 +        private final int charsPerLevel;
 +        private final String eol;
 +        private static final String space = " ";
 +
 +        private boolean compact = false;
 +
 +        CompactIndenter()
 +        {
 +            this("  ", System.lineSeparator());
 +        }
 +
 +        CompactIndenter(String indent, String eol)
 +        {
 +            this.eol = eol;
 +
 +            charsPerLevel = indent.length();
 +
 +            indents = new char[indent.length() * INDENT_LEVELS];
 +            int offset = 0;
 +            for (int i = 0; i < INDENT_LEVELS; i++)
 +            {
 +                indent.getChars(0, indent.length(), indents, offset);
 +                offset += indent.length();
 +            }
 +        }
 +
 +        @Override
 +        public boolean isInline()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Configures whether or not subsequent json values should be on the same line delimited by string or not.
 +         *
 +         * @param compact
 +         *            Whether or not to compact.
 +         */
 +        public void setCompact(boolean compact)
 +        {
 +            this.compact = compact;
 +        }
 +
 +        @Override
 +        public void writeIndentation(JsonGenerator jg, int level)
 +        {
 +            try
 +            {
 +                if (!compact)
 +                {
 +                    jg.writeRaw(eol);
 +                    if (level > 0)
 +                    { // should we err on negative values (as there's some flaw?)
 +                        level *= charsPerLevel;
 +                        while (level > indents.length)
 +                        { // unlike to happen but just in case
 +                            jg.writeRaw(indents, 0, indents.length);
 +                            level -= indents.length;
 +                        }
 +                        jg.writeRaw(indents, 0, level);
 +                    }
 +                }
 +                else
 +                {
 +                    jg.writeRaw(space);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                e.printStackTrace();
 +                System.exit(1);
 +            }
 +        }
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/OverlapIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/OverlapIterator.java
index 131a749,7c1544a..b346a62
--- a/src/java/org/apache/cassandra/utils/OverlapIterator.java
+++ b/src/java/org/apache/cassandra/utils/OverlapIterator.java
@@@ -1,3 -1,23 +1,23 @@@
+ /*
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
 - */
++*/
  package org.apache.cassandra.utils;
  
  import java.util.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/RMIServerSocketFactoryImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/SyncUtil.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/SyncUtil.java
index b217e29,0d293aa..4c0d89d
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@@ -1,6 -1,30 +1,26 @@@
+ /*
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
  package org.apache.cassandra.utils;
  
 -import java.io.FileDescriptor;
 -import java.io.FileOutputStream;
 -import java.io.IOException;
 -import java.io.RandomAccessFile;
 -import java.io.SyncFailedException;
 +import java.io.*;
  import java.lang.reflect.Field;
  import java.nio.MappedByteBuffer;
  import java.nio.channels.ClosedChannelException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
index afca512,0000000..238a58d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
@@@ -1,85 -1,0 +1,105 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.cql3;
 +
 +import org.junit.Test;
 +
 +import com.datastax.driver.core.Session;
 +import com.datastax.driver.core.SimpleStatement;
 +import com.datastax.driver.core.Statement;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class IndexQueryPagingTest extends CQLTester
 +{
 +    /*
 +     * Some simple tests to verify the behaviour of paging during
 +     * 2i queries. We only use a single index type (CompositesIndexOnRegular)
 +     * as the code we want to exercise here is in their abstract
 +     * base class.
 +     */
 +
 +    @Test
 +    public void pagingOnRegularColumn() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (" +
 +                    " k1 int," +
 +                    " v1 int," +
 +                    "PRIMARY KEY (k1))");
 +        createIndex("CREATE INDEX ON %s(v1)");
 +
 +        int rowCount = 3;
 +        for (int i=0; i<rowCount; i++)
 +            execute("INSERT INTO %s (k1, v1) VALUES (?, ?)", i, 0);
 +
 +        executePagingQuery("SELECT * FROM %s WHERE v1=0", rowCount);
 +    }
 +
 +    @Test
 +    public void pagingOnRegularColumnWithPartitionRestriction() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (" +
 +                    " k1 int," +
 +                    " c1 int," +
 +                    " v1 int," +
 +                    "PRIMARY KEY (k1, c1))");
 +        createIndex("CREATE INDEX ON %s(v1)");
 +
 +        int partitions = 3;
 +        int rowCount = 3;
 +        for (int i=0; i<partitions; i++)
 +            for (int j=0; j<rowCount; j++)
 +                execute("INSERT INTO %s (k1, c1, v1) VALUES (?, ?, ?)", i, j, 0);
 +
 +        executePagingQuery("SELECT * FROM %s WHERE k1=0 AND v1=0", rowCount);
 +    }
 +
 +    @Test
 +    public void pagingOnRegularColumnWithClusteringRestrictions() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (" +
 +                    " k1 int," +
 +                    " c1 int," +
 +                    " v1 int," +
 +                    "PRIMARY KEY (k1, c1))");
 +        createIndex("CREATE INDEX ON %s(v1)");
 +
 +        int partitions = 3;
 +        int rowCount = 3;
 +        for (int i=0; i<partitions; i++)
 +            for (int j=0; j<rowCount; j++)
 +                execute("INSERT INTO %s (k1, c1, v1) VALUES (?, ?, ?)", i, j, 0);
 +
 +        executePagingQuery("SELECT * FROM %s WHERE k1=0 AND c1>=0 AND c1<=3 AND v1=0", rowCount);
 +    }
 +
 +    private void executePagingQuery(String cql, int rowCount)
 +    {
 +        // Execute an index query which should return all rows,
 +        // setting the fetch size < than the row count. Assert
 +        // that all rows are returned, so we know that paging
 +        // of the results was involved.
 +        Session session = sessionNet();
 +        Statement stmt = new SimpleStatement(String.format(cql, KEYSPACE + '.' + currentTable()));
 +        stmt.setFetchSize(rowCount - 1);
 +        assertEquals(rowCount, session.execute(stmt).all().size());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 9af6028,0000000..b5d8159
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@@ -1,198 -1,0 +1,218 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.marshal.IntegerType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class SinglePartitionSliceCommandTest
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class);
 +
 +    private static final String KEYSPACE = "ks";
 +    private static final String TABLE = "tbl";
 +
 +    private static CFMetaData cfm;
 +    private static ColumnDefinition v;
 +    private static ColumnDefinition s;
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        cfm = CFMetaData.Builder.create(KEYSPACE, TABLE)
 +                                .addPartitionKey("k", UTF8Type.instance)
 +                                .addStaticColumn("s", UTF8Type.instance)
 +                                .addClusteringColumn("i", IntegerType.instance)
 +                                .addRegularColumn("v", UTF8Type.instance)
 +                                .build();
 +
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm);
 +        cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
 +        v = cfm.getColumnDefinition(new ColumnIdentifier("v", true));
 +        s = cfm.getColumnDefinition(new ColumnIdentifier("s", true));
 +    }
 +
 +    @Before
 +    public void truncate()
 +    {
 +        Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking();
 +    }
 +
 +    @Test
 +    public void staticColumnsAreFiltered() throws IOException
 +    {
 +        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k"));
 +
 +        UntypedResultSet rows;
 +
 +        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s, i, v) VALUES ('k', 's', 0, 'v')");
 +        QueryProcessor.executeInternal("DELETE v FROM ks.tbl WHERE k='k' AND i=0");
 +        QueryProcessor.executeInternal("DELETE FROM ks.tbl WHERE k='k' AND i=0");
 +        rows = QueryProcessor.executeInternal("SELECT * FROM ks.tbl WHERE k='k' AND i=0");
 +
 +        for (UntypedResultSet.Row row: rows)
 +        {
 +            logger.debug("Current: k={}, s={}, v={}", (row.has("k") ? row.getString("k") : null), (row.has("s") ? row.getString("s") : null), (row.has("v") ? row.getString("v") : null));
 +        }
 +
 +        assert rows.isEmpty();
 +
 +        ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v));
 +        ByteBuffer zero = ByteBufferUtil.bytes(0);
 +        Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero)));
 +        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false);
 +        ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
 +                                                          FBUtilities.nowInSeconds(),
 +                                                          columnFilter,
 +                                                          RowFilter.NONE,
 +                                                          DataLimits.NONE,
 +                                                          key,
 +                                                          sliceFilter);
 +
 +        DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21));
 +        ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21);
 +        DataInputPlus in = new DataInputBuffer(out.buffer(), true);
 +        cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21);
 +
 +        logger.debug("ReadCommand: {}", cmd);
 +        UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup());
 +        ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd);
 +
 +        logger.debug("creating response: {}", response);
 +        partitionIterator = response.makeIterator(cmd);
 +        assert partitionIterator.hasNext();
 +        UnfilteredRowIterator partition = partitionIterator.next();
 +
 +        LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition);
 +        Assert.assertEquals(Collections.emptyList(), rowIter.cells);
 +    }
 +
 +    private void checkForS(UnfilteredPartitionIterator pi)
 +    {
 +        Assert.assertTrue(pi.toString(), pi.hasNext());
 +        UnfilteredRowIterator ri = pi.next();
 +        Assert.assertTrue(ri.columns().contains(s));
 +        Row staticRow = ri.staticRow();
 +        Iterator<Cell> cellIterator = staticRow.cells().iterator();
 +        Assert.assertTrue(staticRow.toString(cfm, true), cellIterator.hasNext());
 +        Cell cell = cellIterator.next();
 +        Assert.assertEquals(s, cell.column());
 +        Assert.assertEquals(ByteBufferUtil.bytesToHex(cell.value()), ByteBufferUtil.bytes("s"), cell.value());
 +        Assert.assertFalse(cellIterator.hasNext());
 +    }
 +
 +    @Test
 +    public void staticColumnsAreReturned() throws IOException
 +    {
 +        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1"));
 +
 +        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')");
 +        Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty());
 +
 +        ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
 +        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
 +        ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
 +                                                         FBUtilities.nowInSeconds(),
 +                                                         columnFilter,
 +                                                         RowFilter.NONE,
 +                                                         DataLimits.NONE,
 +                                                         key,
 +                                                         sliceFilter);
 +
 +        // check raw iterator for static cell
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
 +        {
 +            checkForS(pi);
 +        }
 +
 +        ReadResponse response;
 +        DataOutputBuffer out;
 +        DataInputPlus in;
 +        ReadResponse dst;
 +
 +        // check (de)serialized iterator for memtable static cell
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
 +        {
 +            response = ReadResponse.createDataResponse(pi, cmd);
 +        }
 +
 +        out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
 +        ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
 +        in = new DataInputBuffer(out.buffer(), true);
 +        dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
 +        try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
 +        {
 +            checkForS(pi);
 +        }
 +
 +        // check (de)serialized iterator for sstable static cell
 +        Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
 +        {
 +            response = ReadResponse.createDataResponse(pi, cmd);
 +        }
 +        out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
 +        ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
 +        in = new DataInputBuffer(out.buffer(), true);
 +        dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
 +        try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
 +        {
 +            checkForS(pi);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
index b5c2f41,0000000..6a4aace
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@@ -1,100 -1,0 +1,120 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Random;
 +import java.util.concurrent.Semaphore;
 +
 +import javax.naming.ConfigurationException;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.Config.CommitLogSync;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.jboss.byteman.contrib.bmunit.BMRule;
 +import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +
 +import com.google.common.collect.ImmutableMap;
 +
 +@RunWith(BMUnitRunner.class)
 +public class CommitLogSegmentManagerTest
 +{
 +    //Block commit log service from syncing
 +    private static final Semaphore allowSync = new Semaphore(0);
 +
 +    private static final String KEYSPACE1 = "CommitLogTest";
 +    private static final String STANDARD1 = "Standard1";
 +    private static final String STANDARD2 = "Standard2";
 +
 +    private final static byte[] entropy = new byte[1024 * 256];
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        new Random().nextBytes(entropy);
 +        DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
 +        DatabaseDescriptor.setCommitLogSegmentSize(1);
 +        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
 +        DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
 +
 +        CompactionManager.instance.disableAutoCompaction();
 +    }
 +
 +    @Test
 +    @BMRule(name = "Block AbstractCommitLogSegment segment flushing",
 +            targetClass = "AbstractCommitLogService$1",
 +            targetMethod = "run",
 +            targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
 +            action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()")
 +    public void testCompressedCommitLogBackpressure() throws Throwable
 +    {
 +        CommitLog.instance.resetUnsafe(true);
 +        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
 +                     .clustering("bytes")
 +                     .add("val", ByteBuffer.wrap(entropy))
 +                     .build();
 +
 +        Thread dummyThread = new Thread( () ->
 +        {
 +            for (int i = 0; i < 20; i++)
 +                CommitLog.instance.add(m);
 +        });
 +        dummyThread.start();
 +
 +        CommitLogSegmentManager clsm = CommitLog.instance.allocator;
 +
 +        //Protect against delay, but still break out as fast as possible
 +        long start = System.currentTimeMillis();
 +        while (System.currentTimeMillis() - start < 5000)
 +        {
 +            if (clsm.getActiveSegments().size() >= 3)
 +                break;
 +        }
 +        Thread.sleep(1000);
 +
 +        //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
 +        Assert.assertEquals(3, clsm.getActiveSegments().size());
 +
 +        clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment));
 +
 +        Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index 98ad2bc,0000000..400d65a
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@@ -1,428 -1,0 +1,448 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.rows;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.Slice.Bound;
 +import org.apache.cassandra.db.ClusteringPrefix;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.FBUtilities;
 +import static org.junit.Assert.*;
 +
 +public class RowAndDeletionMergeIteratorTest
 +{
 +    private static final String KEYSPACE1 = "RowTest";
 +    private static final String CF_STANDARD1 = "Standard1";
 +
 +    private int nowInSeconds;
 +    private DecoratedKey dk;
 +    private ColumnFamilyStore cfs;
 +    private CFMetaData cfm;
 +    private ColumnDefinition defA;
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
 +                                                  .addPartitionKey("key", AsciiType.instance)
 +                                                  .addClusteringColumn("col1", Int32Type.instance)
 +                                                  .addRegularColumn("a", Int32Type.instance)
 +                                                  .build();
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    cfMetadata);
 +
 +    }
 +
 +    @Before
 +    public void setup()
 +    {
 +        nowInSeconds = FBUtilities.nowInSeconds();
 +        dk = Util.dk("key0");
 +        cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
 +        cfm = cfs.metadata;
 +        defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
 +    }
 +
 +    @Test
 +    public void testWithNoRangeTombstones()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, Collections.emptyIterator(), false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 0);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 1);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 3);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 4);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testWithOnlyRangeTombstones()
 +    {
 +        int delTime = nowInSeconds + 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(rt(1, false, 3, false, timestamp, delTime),
 +                                                                                       atLeast(4, timestamp, delTime));
 +        UnfilteredRowIterator iterator = createMergeIterator(Collections.emptyIterator(), rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 1);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_BOUND, 3);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_START_BOUND, 4);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.TOP);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testWithAtMostRangeTombstone()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        int delTime = nowInSeconds + 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime));
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.BOTTOM);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 1);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 3);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 4);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testWithGreaterThanRangeTombstone()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        int delTime = nowInSeconds + 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(greaterThan(2, timestamp, delTime));
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 0);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 1);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.TOP);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testWithAtMostAndGreaterThanRangeTombstone()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        int delTime = nowInSeconds + 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime),
 +                                                                                       greaterThan(2, timestamp, delTime));
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.BOTTOM);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 1);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.TOP);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    private void assertRtMarker(Unfiltered unfiltered, ClusteringPrefix.Kind kind, int col1)
 +    {
 +        assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
 +        assertEquals(kind, unfiltered.clustering().kind());
 +        assertEquals(bb(col1), unfiltered.clustering().get(0));
 +    }
 +
 +    @Test
 +    public void testWithIncludingEndExcludingStartMarker()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        int delTime = nowInSeconds + 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(2, timestamp, delTime),
 +                                                                                       greaterThan(2, timestamp, delTime));
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.BOTTOM);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.TOP);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testWithExcludingEndIncludingStartMarker()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        int delTime = nowInSeconds + 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(lessThan(2, timestamp, delTime),
 +                                                                                       atLeast(2, timestamp, delTime));
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.BOTTOM);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.TOP);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testNonShadowingTombstone()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, -1L, 0));
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), Bound.BOTTOM);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 0);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 1);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 2);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 3);
 +
 +        assertTrue(iterator.hasNext());
 +        assertRow(iterator.next(), 4);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +    @Test
 +    public void testWithPartitionLevelTombstone()
 +    {
 +        Iterator<Row> rowIterator = createRowIterator();
 +
 +        int delTime = nowInSeconds - 1;
 +        long timestamp = toMillis(delTime);
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime),
 +                                                                                       greaterThan(2, timestamp, delTime));
 +
 +        int partitionDelTime = nowInSeconds + 1;
 +        long partitionTimestamp = toMillis(partitionDelTime);
 +
 +        UnfilteredRowIterator iterator = createMergeIterator(rowIterator,
 +                                                             rangeTombstoneIterator,
 +                                                             new DeletionTime(partitionTimestamp, partitionDelTime),
 +                                                             false);
 +
 +        assertFalse(iterator.hasNext());
 +    }
 +
 +
 +    private void assertRtMarker(Unfiltered unfiltered, Bound bound)
 +    {
 +        assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
 +        assertEquals(bound, unfiltered.clustering());
 +    }
 +
 +    private void assertRow(Unfiltered unfiltered, int col1)
 +    {
 +        assertEquals(Unfiltered.Kind.ROW, unfiltered.kind());
 +        assertEquals(cfm.comparator.make(col1), unfiltered.clustering());
 +    }
 +
 +    private Iterator<RangeTombstone> createRangeTombstoneIterator(RangeTombstone... tombstones)
 +    {
 +        RangeTombstoneList list = new RangeTombstoneList(cfm.comparator, 10);
 +
 +        for (RangeTombstone tombstone : tombstones)
 +            list.add(tombstone);
 +
 +        return list.iterator(Slice.ALL, false);
 +    }
 +
 +    private Iterator<Row> createRowIterator()
 +    {
 +        PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
 +        for (int i = 0; i < 5; i++)
 +            addRow(update, i, i);
 +
 +        return update.iterator();
 +    }
 +
 +    private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, Iterator<RangeTombstone> tombstones, boolean reversed)
 +    {
 +        return createMergeIterator(rows, tombstones, DeletionTime.LIVE, reversed);
 +    }
 +
 +    private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows,
 +                                                      Iterator<RangeTombstone> tombstones,
 +                                                      DeletionTime deletionTime,
 +                                                      boolean reversed)
 +    {
 +        return new RowAndDeletionMergeIterator(cfm,
 +                                               Util.dk("k"),
 +                                               deletionTime,
 +                                               ColumnFilter.all(cfm),
 +                                               Rows.EMPTY_STATIC_ROW,
 +                                               reversed,
 +                                               EncodingStats.NO_STATS,
 +                                               rows,
 +                                               tombstones,
 +                                               true);
 +    }
 +
 +    private void addRow(PartitionUpdate update, int col1, int a)
 +    {
 +        update.add(BTreeRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
 +    }
 +
 +    private Cell makeCell(CFMetaData cfm, ColumnDefinition columnDefinition, int value, long timestamp)
 +    {
 +        return BufferCell.live(cfm, columnDefinition, timestamp, ((AbstractType)columnDefinition.cellValueType()).decompose(value));
 +    }
 +
 +    private static RangeTombstone atLeast(int start, long tstamp, int delTime)
 +    {
 +        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime));
 +    }
 +
 +    private static RangeTombstone atMost(int end, long tstamp, int delTime)
 +    {
 +        return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
 +    }
 +
 +    private static RangeTombstone lessThan(int end, long tstamp, int delTime)
 +    {
 +        return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.exclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
 +    }
 +
 +    private static RangeTombstone greaterThan(int start, long tstamp, int delTime)
 +    {
 +        return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime));
 +    }
 +
 +    private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp, int delTime)
 +    {
 +        Slice.Bound startBound = startInclusive ? Slice.Bound.inclusiveStartOf(bb(start)) : Slice.Bound.exclusiveStartOf(bb(start));
 +        Slice.Bound endBound = endInclusive ? Slice.Bound.inclusiveEndOf(bb(end)) : Slice.Bound.exclusiveEndOf(bb(end));
 +
 +        return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(tstamp, delTime));
 +    }
 +
 +    private static ByteBuffer bb(int i)
 +    {
 +        return ByteBufferUtil.bytes(i);
 +    }
 +
 +    private long toMillis(int timeInSeconds)
 +    {
 +        return timeInSeconds * 1000L;
 +    }
 +}