You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:45 UTC
[12/20] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/JsonTransformer.java
index 364070e,0000000..3deed96
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@@ -1,536 -1,0 +1,556 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.impl.Indenter;
+import org.codehaus.jackson.util.DefaultPrettyPrinter;
+import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class JsonTransformer
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(JsonTransformer.class);
+
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private final JsonGenerator json;
+
+ private final CompactIndenter objectIndenter = new CompactIndenter();
+
+ private final CompactIndenter arrayIndenter = new CompactIndenter();
+
+ private final CFMetaData metadata;
+
+ private final ISSTableScanner currentScanner;
+
+ private boolean rawTime = false;
+
+ private long currentPosition = 0;
+
+ private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, CFMetaData metadata)
+ {
+ this.json = json;
+ this.metadata = metadata;
+ this.currentScanner = currentScanner;
+ this.rawTime = rawTime;
+
+ DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
+ prettyPrinter.indentObjectsWith(objectIndenter);
+ prettyPrinter.indentArraysWith(arrayIndenter);
+ json.setPrettyPrinter(prettyPrinter);
+ }
+
+ public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, CFMetaData metadata, OutputStream out)
+ throws IOException
+ {
+ try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8")))
+ {
+ JsonTransformer transformer = new JsonTransformer(json, currentScanner, rawTime, metadata);
+ json.writeStartArray();
+ partitions.forEach(transformer::serializePartition);
+ json.writeEndArray();
+ }
+ }
+
+ public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, CFMetaData metadata, OutputStream out) throws IOException
+ {
+ try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8")))
+ {
+ JsonTransformer transformer = new JsonTransformer(json, currentScanner, rawTime, metadata);
+ json.writeStartArray();
+ keys.forEach(transformer::serializePartitionKey);
+ json.writeEndArray();
+ }
+ }
+
+ private void updatePosition()
+ {
+ this.currentPosition = currentScanner.getCurrentPosition();
+ }
+
+ private void serializePartitionKey(DecoratedKey key)
+ {
+ AbstractType<?> keyValidator = metadata.getKeyValidator();
+ objectIndenter.setCompact(true);
+ try
+ {
+ arrayIndenter.setCompact(true);
+ json.writeStartArray();
+ if (keyValidator instanceof CompositeType)
+ {
+ // if a composite type, the partition has multiple keys.
+ CompositeType compositeType = (CompositeType) keyValidator;
+ ByteBuffer keyBytes = key.getKey().duplicate();
+ // Skip static data if it exists.
+ if (keyBytes.remaining() >= 2)
+ {
+ int header = ByteBufferUtil.getShortLength(keyBytes, keyBytes.position());
+ if ((header & 0xFFFF) == 0xFFFF)
+ {
+ ByteBufferUtil.readShortLength(keyBytes);
+ }
+ }
+
+ int i = 0;
+ while (keyBytes.remaining() > 0 && i < compositeType.getComponents().size())
+ {
+ AbstractType<?> colType = compositeType.getComponents().get(i);
+
+ ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(keyBytes);
+ String colValue = colType.getString(value);
+
+ json.writeString(colValue);
+
+ byte b = keyBytes.get();
+ if (b != 0)
+ {
+ break;
+ }
+ ++i;
+ }
+ }
+ else
+ {
+ // if not a composite type, assume a single column partition key.
+ assert metadata.partitionKeyColumns().size() == 1;
+ json.writeString(keyValidator.getString(key.getKey()));
+ }
+ json.writeEndArray();
+ objectIndenter.setCompact(false);
+ arrayIndenter.setCompact(false);
+ }
+ catch (IOException e)
+ {
+ logger.error("Failure serializing partition key.", e);
+ }
+ }
+
+ private void serializePartition(UnfilteredRowIterator partition)
+ {
+ String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey());
+ try
+ {
+ json.writeStartObject();
+
+ json.writeFieldName("partition");
+ json.writeStartObject();
+ json.writeFieldName("key");
+ serializePartitionKey(partition.partitionKey());
+ json.writeNumberField("position", this.currentScanner.getCurrentPosition());
+
+ if (!partition.partitionLevelDeletion().isLive())
+ {
+ serializeDeletion(partition.partitionLevelDeletion());
+ }
+ else
+ {
+ json.writeEndObject();
+ json.writeFieldName("rows");
+ json.writeStartArray();
+ updatePosition();
+ if (!partition.staticRow().isEmpty())
+ {
+ serializeRow(partition.staticRow());
+ }
+ Unfiltered unfiltered;
+ updatePosition();
+ while (partition.hasNext())
+ {
+ unfiltered = partition.next();
+ if (unfiltered instanceof Row)
+ {
+ serializeRow((Row) unfiltered);
+ }
+ else if (unfiltered instanceof RangeTombstoneMarker)
+ {
+ serializeTombstone((RangeTombstoneMarker) unfiltered);
+ }
+ updatePosition();
+ }
+ json.writeEndArray();
+ }
+
+ json.writeEndObject();
+ }
+ catch (IOException e)
+ {
+ logger.error("Fatal error parsing partition: {}", key, e);
+ }
+ }
+
+ private void serializeRow(Row row)
+ {
+ try
+ {
+ json.writeStartObject();
+ String rowType = row.isStatic() ? "static_block" : "row";
+ json.writeFieldName("type");
+ json.writeString(rowType);
+ json.writeNumberField("position", this.currentPosition);
+
+ // Only print clustering information for non-static rows.
+ if (!row.isStatic())
+ {
+ serializeClustering(row.clustering());
+ }
+
+ LivenessInfo liveInfo = row.primaryKeyLivenessInfo();
+ if (!liveInfo.isEmpty())
+ {
+ objectIndenter.setCompact(false);
+ json.writeFieldName("liveness_info");
+ objectIndenter.setCompact(true);
+ json.writeStartObject();
+ json.writeFieldName("tstamp");
+ json.writeString(dateString(TimeUnit.MICROSECONDS, liveInfo.timestamp()));
+ if (liveInfo.isExpiring())
+ {
+ json.writeNumberField("ttl", liveInfo.ttl());
+ json.writeFieldName("expires_at");
+ json.writeString(dateString(TimeUnit.SECONDS, liveInfo.localExpirationTime()));
+ json.writeFieldName("expired");
+ json.writeBoolean(liveInfo.localExpirationTime() < (System.currentTimeMillis() / 1000));
+ }
+ json.writeEndObject();
+ objectIndenter.setCompact(false);
+ }
+
+ // If this is a deletion, indicate that, otherwise write cells.
+ if (!row.deletion().isLive())
+ {
+ serializeDeletion(row.deletion().time());
+ }
+ json.writeFieldName("cells");
+ json.writeStartArray();
+ for (ColumnData cd : row)
+ {
+ serializeColumnData(cd, liveInfo);
+ }
+ json.writeEndArray();
+ json.writeEndObject();
+ }
+ catch (IOException e)
+ {
+ logger.error("Fatal error parsing row.", e);
+ }
+ }
+
+ private void serializeTombstone(RangeTombstoneMarker tombstone)
+ {
+ try
+ {
+ json.writeStartObject();
+ json.writeFieldName("type");
+
+ if (tombstone instanceof RangeTombstoneBoundMarker)
+ {
+ json.writeString("range_tombstone_bound");
+ RangeTombstoneBoundMarker bm = (RangeTombstoneBoundMarker) tombstone;
+ serializeBound(bm.clustering(), bm.deletionTime());
+ }
+ else
+ {
+ assert tombstone instanceof RangeTombstoneBoundaryMarker;
+ json.writeString("range_tombstone_boundary");
+ RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker) tombstone;
+ serializeBound(bm.openBound(false), bm.openDeletionTime(false));
+ serializeBound(bm.closeBound(false), bm.closeDeletionTime(false));
+ }
+ json.writeEndObject();
+ objectIndenter.setCompact(false);
+ }
+ catch (IOException e)
+ {
+ logger.error("Failure parsing tombstone.", e);
+ }
+ }
+
+ private void serializeBound(RangeTombstone.Bound bound, DeletionTime deletionTime) throws IOException
+ {
+ json.writeFieldName(bound.isStart() ? "start" : "end");
+ json.writeStartObject();
+ json.writeFieldName("type");
+ json.writeString(bound.isInclusive() ? "inclusive" : "exclusive");
+ serializeClustering(bound.clustering());
+ serializeDeletion(deletionTime);
+ json.writeEndObject();
+ }
+
+ private void serializeClustering(ClusteringPrefix clustering) throws IOException
+ {
+ if (clustering.size() > 0)
+ {
+ json.writeFieldName("clustering");
+ objectIndenter.setCompact(true);
+ json.writeStartArray();
+ arrayIndenter.setCompact(true);
+ List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns();
+ for (int i = 0; i < clusteringColumns.size(); i++)
+ {
+ ColumnDefinition column = clusteringColumns.get(i);
+ if (i >= clustering.size())
+ {
+ json.writeString("*");
+ }
+ else
+ {
+ json.writeString(column.cellValueType().getString(clustering.get(i)));
+ }
+ }
+ json.writeEndArray();
+ objectIndenter.setCompact(false);
+ arrayIndenter.setCompact(false);
+ }
+ }
+
+ private void serializeDeletion(DeletionTime deletion) throws IOException
+ {
+ json.writeFieldName("deletion_info");
+ objectIndenter.setCompact(true);
+ json.writeStartObject();
+ json.writeFieldName("marked_deleted");
+ json.writeString(dateString(TimeUnit.MICROSECONDS, deletion.markedForDeleteAt()));
+ json.writeFieldName("local_delete_time");
+ json.writeString(dateString(TimeUnit.SECONDS, deletion.localDeletionTime()));
+ json.writeEndObject();
+ objectIndenter.setCompact(false);
+ }
+
+ private void serializeColumnData(ColumnData cd, LivenessInfo liveInfo)
+ {
+ if (cd.column().isSimple())
+ {
+ serializeCell((Cell) cd, liveInfo);
+ }
+ else
+ {
+ ComplexColumnData complexData = (ComplexColumnData) cd;
+ if (!complexData.complexDeletion().isLive())
+ {
+ try
+ {
+ objectIndenter.setCompact(true);
+ json.writeStartObject();
+ json.writeFieldName("name");
+ AbstractType<?> type = cd.column().type;
+ json.writeString(cd.column().name.toCQLString());
+ serializeDeletion(complexData.complexDeletion());
+ objectIndenter.setCompact(true);
+ json.writeEndObject();
+ objectIndenter.setCompact(false);
+ }
+ catch (IOException e)
+ {
+ logger.error("Failure parsing ColumnData.", e);
+ }
+ }
+ for (Cell cell : complexData){
+ serializeCell(cell, liveInfo);
+ }
+ }
+ }
+
+ private void serializeCell(Cell cell, LivenessInfo liveInfo)
+ {
+ try
+ {
+ json.writeStartObject();
+ objectIndenter.setCompact(true);
+ json.writeFieldName("name");
+ AbstractType<?> type = cell.column().type;
+ json.writeString(cell.column().name.toCQLString());
+
+ if (cell.path() != null && cell.path().size() > 0)
+ {
+ CollectionType ct = (CollectionType) type;
+ json.writeFieldName("path");
+ arrayIndenter.setCompact(true);
+ json.writeStartArray();
+ for (int i = 0; i < cell.path().size(); i++)
+ {
+ json.writeString(ct.nameComparator().getString(cell.path().get(i)));
+ }
+ json.writeEndArray();
+ arrayIndenter.setCompact(false);
+ }
+ if (cell.isTombstone())
+ {
+ json.writeFieldName("deletion_info");
+ objectIndenter.setCompact(true);
+ json.writeStartObject();
+ json.writeFieldName("local_delete_time");
+ json.writeString(dateString(TimeUnit.SECONDS, cell.localDeletionTime()));
+ json.writeEndObject();
+ objectIndenter.setCompact(false);
+ }
+ else
+ {
+ json.writeFieldName("value");
+ json.writeString(cell.column().cellValueType().getString(cell.value()));
+ }
+ if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
+ {
+ json.writeFieldName("tstamp");
+ json.writeString(dateString(TimeUnit.MICROSECONDS, cell.timestamp()));
+ }
+ if (cell.isExpiring() && (liveInfo.isEmpty() || cell.ttl() != liveInfo.ttl()))
+ {
+ json.writeFieldName("ttl");
+ json.writeNumber(cell.ttl());
+ json.writeFieldName("expires_at");
+ json.writeString(dateString(TimeUnit.SECONDS, cell.localDeletionTime()));
+ json.writeFieldName("expired");
+ json.writeBoolean(!cell.isLive((int) (System.currentTimeMillis() / 1000)));
+ }
+ json.writeEndObject();
+ objectIndenter.setCompact(false);
+ }
+ catch (IOException e)
+ {
+ logger.error("Failure parsing cell.", e);
+ }
+ }
+
+ private String dateString(TimeUnit from, long time)
+ {
+ long secs = from.toSeconds(time);
+ long offset = Math.floorMod(from.toNanos(time), 1000_000_000L); // nanos per sec
+ return rawTime? Long.toString(time) : Instant.ofEpochSecond(secs, offset).toString();
+ }
+
+ /**
+ * A specialized {@link Indenter} that enables a 'compact' mode which puts all subsequent json values on the same
+ * line. This is manipulated via {@link CompactIndenter#setCompact(boolean)}
+ */
+ private static final class CompactIndenter extends NopIndenter
+ {
+
+ private static final int INDENT_LEVELS = 16;
+ private final char[] indents;
+ private final int charsPerLevel;
+ private final String eol;
+ private static final String space = " ";
+
+ private boolean compact = false;
+
+ CompactIndenter()
+ {
+ this(" ", System.lineSeparator());
+ }
+
+ CompactIndenter(String indent, String eol)
+ {
+ this.eol = eol;
+
+ charsPerLevel = indent.length();
+
+ indents = new char[indent.length() * INDENT_LEVELS];
+ int offset = 0;
+ for (int i = 0; i < INDENT_LEVELS; i++)
+ {
+ indent.getChars(0, indent.length(), indents, offset);
+ offset += indent.length();
+ }
+ }
+
+ @Override
+ public boolean isInline()
+ {
+ return false;
+ }
+
+ /**
+ * Configures whether or not subsequent json values should be on the same line delimited by string or not.
+ *
+ * @param compact
+ * Whether or not to compact.
+ */
+ public void setCompact(boolean compact)
+ {
+ this.compact = compact;
+ }
+
+ @Override
+ public void writeIndentation(JsonGenerator jg, int level)
+ {
+ try
+ {
+ if (!compact)
+ {
+ jg.writeRaw(eol);
+ if (level > 0)
+ { // should we err on negative values (as there's some flaw?)
+ level *= charsPerLevel;
+ while (level > indents.length)
+ { // unlike to happen but just in case
+ jg.writeRaw(indents, 0, indents.length);
+ level -= indents.length;
+ }
+ jg.writeRaw(indents, 0, level);
+ }
+ }
+ else
+ {
+ jg.writeRaw(space);
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ }
- }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/OverlapIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/OverlapIterator.java
index 131a749,7c1544a..b346a62
--- a/src/java/org/apache/cassandra/utils/OverlapIterator.java
+++ b/src/java/org/apache/cassandra/utils/OverlapIterator.java
@@@ -1,3 -1,23 +1,23 @@@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
- */
++*/
package org.apache.cassandra.utils;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/RMIServerSocketFactoryImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/SyncUtil.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/SyncUtil.java
index b217e29,0d293aa..4c0d89d
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@@ -1,6 -1,30 +1,26 @@@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.cassandra.utils;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.SyncFailedException;
+import java.io.*;
import java.lang.reflect.Field;
import java.nio.MappedByteBuffer;
import java.nio.channels.ClosedChannelException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
index afca512,0000000..238a58d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
@@@ -1,85 -1,0 +1,105 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.cql3;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
+import static org.junit.Assert.assertEquals;
+
+public class IndexQueryPagingTest extends CQLTester
+{
+ /*
+ * Some simple tests to verify the behaviour of paging during
+ * 2i queries. We only use a single index type (CompositesIndexOnRegular)
+ * as the code we want to exercise here is in their abstract
+ * base class.
+ */
+
+ @Test
+ public void pagingOnRegularColumn() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ " k1 int," +
+ " v1 int," +
+ "PRIMARY KEY (k1))");
+ createIndex("CREATE INDEX ON %s(v1)");
+
+ int rowCount = 3;
+ for (int i=0; i<rowCount; i++)
+ execute("INSERT INTO %s (k1, v1) VALUES (?, ?)", i, 0);
+
+ executePagingQuery("SELECT * FROM %s WHERE v1=0", rowCount);
+ }
+
+ @Test
+ public void pagingOnRegularColumnWithPartitionRestriction() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ " k1 int," +
+ " c1 int," +
+ " v1 int," +
+ "PRIMARY KEY (k1, c1))");
+ createIndex("CREATE INDEX ON %s(v1)");
+
+ int partitions = 3;
+ int rowCount = 3;
+ for (int i=0; i<partitions; i++)
+ for (int j=0; j<rowCount; j++)
+ execute("INSERT INTO %s (k1, c1, v1) VALUES (?, ?, ?)", i, j, 0);
+
+ executePagingQuery("SELECT * FROM %s WHERE k1=0 AND v1=0", rowCount);
+ }
+
+ @Test
+ public void pagingOnRegularColumnWithClusteringRestrictions() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ " k1 int," +
+ " c1 int," +
+ " v1 int," +
+ "PRIMARY KEY (k1, c1))");
+ createIndex("CREATE INDEX ON %s(v1)");
+
+ int partitions = 3;
+ int rowCount = 3;
+ for (int i=0; i<partitions; i++)
+ for (int j=0; j<rowCount; j++)
+ execute("INSERT INTO %s (k1, c1, v1) VALUES (?, ?, ?)", i, j, 0);
+
+ executePagingQuery("SELECT * FROM %s WHERE k1=0 AND c1>=0 AND c1<=3 AND v1=0", rowCount);
+ }
+
+ private void executePagingQuery(String cql, int rowCount)
+ {
+ // Execute an index query which should return all rows,
+ // setting the fetch size < than the row count. Assert
+ // that all rows are returned, so we know that paging
+ // of the results was involved.
+ Session session = sessionNet();
+ Statement stmt = new SimpleStatement(String.format(cql, KEYSPACE + '.' + currentTable()));
+ stmt.setFetchSize(rowCount - 1);
+ assertEquals(rowCount, session.execute(stmt).all().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 9af6028,0000000..b5d8159
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@@ -1,198 -1,0 +1,218 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class SinglePartitionSliceCommandTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class);
+
+ private static final String KEYSPACE = "ks";
+ private static final String TABLE = "tbl";
+
+ private static CFMetaData cfm;
+ private static ColumnDefinition v;
+ private static ColumnDefinition s;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ cfm = CFMetaData.Builder.create(KEYSPACE, TABLE)
+ .addPartitionKey("k", UTF8Type.instance)
+ .addStaticColumn("s", UTF8Type.instance)
+ .addClusteringColumn("i", IntegerType.instance)
+ .addRegularColumn("v", UTF8Type.instance)
+ .build();
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm);
+ cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+ v = cfm.getColumnDefinition(new ColumnIdentifier("v", true));
+ s = cfm.getColumnDefinition(new ColumnIdentifier("s", true));
+ }
+
+ @Before
+ public void truncate()
+ {
+ Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking();
+ }
+
+ @Test
+ public void staticColumnsAreFiltered() throws IOException
+ {
+ DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k"));
+
+ UntypedResultSet rows;
+
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s, i, v) VALUES ('k', 's', 0, 'v')");
+ QueryProcessor.executeInternal("DELETE v FROM ks.tbl WHERE k='k' AND i=0");
+ QueryProcessor.executeInternal("DELETE FROM ks.tbl WHERE k='k' AND i=0");
+ rows = QueryProcessor.executeInternal("SELECT * FROM ks.tbl WHERE k='k' AND i=0");
+
+ for (UntypedResultSet.Row row: rows)
+ {
+ logger.debug("Current: k={}, s={}, v={}", (row.has("k") ? row.getString("k") : null), (row.has("s") ? row.getString("s") : null), (row.has("v") ? row.getString("v") : null));
+ }
+
+ assert rows.isEmpty();
+
+ ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v));
+ ByteBuffer zero = ByteBufferUtil.bytes(0);
+ Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero)));
+ ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false);
+ ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ RowFilter.NONE,
+ DataLimits.NONE,
+ key,
+ sliceFilter);
+
+ DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21));
+ ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21);
+ DataInputPlus in = new DataInputBuffer(out.buffer(), true);
+ cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21);
+
+ logger.debug("ReadCommand: {}", cmd);
+ UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup());
+ ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd);
+
+ logger.debug("creating response: {}", response);
+ partitionIterator = response.makeIterator(cmd);
+ assert partitionIterator.hasNext();
+ UnfilteredRowIterator partition = partitionIterator.next();
+
+ LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition);
+ Assert.assertEquals(Collections.emptyList(), rowIter.cells);
+ }
+
+ private void checkForS(UnfilteredPartitionIterator pi)
+ {
+ Assert.assertTrue(pi.toString(), pi.hasNext());
+ UnfilteredRowIterator ri = pi.next();
+ Assert.assertTrue(ri.columns().contains(s));
+ Row staticRow = ri.staticRow();
+ Iterator<Cell> cellIterator = staticRow.cells().iterator();
+ Assert.assertTrue(staticRow.toString(cfm, true), cellIterator.hasNext());
+ Cell cell = cellIterator.next();
+ Assert.assertEquals(s, cell.column());
+ Assert.assertEquals(ByteBufferUtil.bytesToHex(cell.value()), ByteBufferUtil.bytes("s"), cell.value());
+ Assert.assertFalse(cellIterator.hasNext());
+ }
+
+ @Test
+ public void staticColumnsAreReturned() throws IOException
+ {
+ DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1"));
+
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')");
+ Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty());
+
+ ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
+ ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
+ ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ RowFilter.NONE,
+ DataLimits.NONE,
+ key,
+ sliceFilter);
+
+ // check raw iterator for static cell
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+ {
+ checkForS(pi);
+ }
+
+ ReadResponse response;
+ DataOutputBuffer out;
+ DataInputPlus in;
+ ReadResponse dst;
+
+ // check (de)serialized iterator for memtable static cell
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+ {
+ response = ReadResponse.createDataResponse(pi, cmd);
+ }
+
+ out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
+ ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
+ in = new DataInputBuffer(out.buffer(), true);
+ dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
+ try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
+ {
+ checkForS(pi);
+ }
+
+ // check (de)serialized iterator for sstable static cell
+ Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+ {
+ response = ReadResponse.createDataResponse(pi, cmd);
+ }
+ out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
+ ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
+ in = new DataInputBuffer(out.buffer(), true);
+ dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
+ try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
+ {
+ checkForS(pi);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
index b5c2f41,0000000..6a4aace
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@@ -1,100 -1,0 +1,120 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import javax.naming.ConfigurationException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableMap;
+
+@RunWith(BMUnitRunner.class)
+public class CommitLogSegmentManagerTest
+{
+ //Block commit log service from syncing
+ private static final Semaphore allowSync = new Semaphore(0);
+
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "Standard1";
+ private static final String STANDARD2 = "Standard2";
+
+ private final static byte[] entropy = new byte[1024 * 256];
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ new Random().nextBytes(entropy);
+ DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+ DatabaseDescriptor.setCommitLogSegmentSize(1);
+ DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+ }
+
+ @Test
+ @BMRule(name = "Block AbstractCommitLogSegment segment flushing",
+ targetClass = "AbstractCommitLogService$1",
+ targetMethod = "run",
+ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()")
+ public void testCompressedCommitLogBackpressure() throws Throwable
+ {
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ Thread dummyThread = new Thread( () ->
+ {
+ for (int i = 0; i < 20; i++)
+ CommitLog.instance.add(m);
+ });
+ dummyThread.start();
+
+ CommitLogSegmentManager clsm = CommitLog.instance.allocator;
+
+ //Protect against delay, but still break out as fast as possible
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 5000)
+ {
+ if (clsm.getActiveSegments().size() >= 3)
+ break;
+ }
+ Thread.sleep(1000);
+
+ //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
+ Assert.assertEquals(3, clsm.getActiveSegments().size());
+
+ clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment));
+
+ Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
+ }
- }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index 98ad2bc,0000000..400d65a
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@@ -1,428 -1,0 +1,448 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Slice.Bound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.*;
+
+public class RowAndDeletionMergeIteratorTest
+{
+ private static final String KEYSPACE1 = "RowTest";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ private int nowInSeconds;
+ private DecoratedKey dk;
+ private ColumnFamilyStore cfs;
+ private CFMetaData cfm;
+ private ColumnDefinition defA;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
+ .addPartitionKey("key", AsciiType.instance)
+ .addClusteringColumn("col1", Int32Type.instance)
+ .addRegularColumn("a", Int32Type.instance)
+ .build();
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ cfMetadata);
+
+ }
+
+ @Before
+ public void setup()
+ {
+ nowInSeconds = FBUtilities.nowInSeconds();
+ dk = Util.dk("key0");
+ cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+ cfm = cfs.metadata;
+ defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
+ }
+
+ @Test
+ public void testWithNoRangeTombstones()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, Collections.emptyIterator(), false);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 0);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 1);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 2);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 3);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 4);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testWithOnlyRangeTombstones()
+ {
+ int delTime = nowInSeconds + 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(rt(1, false, 3, false, timestamp, delTime),
+ atLeast(4, timestamp, delTime));
+ UnfilteredRowIterator iterator = createMergeIterator(Collections.emptyIterator(), rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 1);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_BOUND, 3);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_START_BOUND, 4);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.TOP);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testWithAtMostRangeTombstone()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ int delTime = nowInSeconds + 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime));
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 1);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 2);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 3);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 4);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testWithGreaterThanRangeTombstone()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ int delTime = nowInSeconds + 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(greaterThan(2, timestamp, delTime));
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 0);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 1);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 2);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.TOP);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testWithAtMostAndGreaterThanRangeTombstone()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ int delTime = nowInSeconds + 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime),
+ greaterThan(2, timestamp, delTime));
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 1);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 2);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.TOP);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ private void assertRtMarker(Unfiltered unfiltered, ClusteringPrefix.Kind kind, int col1)
+ {
+ assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
+ assertEquals(kind, unfiltered.clustering().kind());
+ assertEquals(bb(col1), unfiltered.clustering().get(0));
+ }
+
+ @Test
+ public void testWithIncludingEndExcludingStartMarker()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ int delTime = nowInSeconds + 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(2, timestamp, delTime),
+ greaterThan(2, timestamp, delTime));
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 2);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.TOP);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testWithExcludingEndIncludingStartMarker()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ int delTime = nowInSeconds + 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(lessThan(2, timestamp, delTime),
+ atLeast(2, timestamp, delTime));
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 2);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.TOP);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testNonShadowingTombstone()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, -1L, 0));
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 0);
+
+ assertTrue(iterator.hasNext());
+ assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 1);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 2);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 3);
+
+ assertTrue(iterator.hasNext());
+ assertRow(iterator.next(), 4);
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testWithPartitionLevelTombstone()
+ {
+ Iterator<Row> rowIterator = createRowIterator();
+
+ int delTime = nowInSeconds - 1;
+ long timestamp = toMillis(delTime);
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime),
+ greaterThan(2, timestamp, delTime));
+
+ int partitionDelTime = nowInSeconds + 1;
+ long partitionTimestamp = toMillis(partitionDelTime);
+
+ UnfilteredRowIterator iterator = createMergeIterator(rowIterator,
+ rangeTombstoneIterator,
+ new DeletionTime(partitionTimestamp, partitionDelTime),
+ false);
+
+ assertFalse(iterator.hasNext());
+ }
+
+
+ private void assertRtMarker(Unfiltered unfiltered, Bound bound)
+ {
+ assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
+ assertEquals(bound, unfiltered.clustering());
+ }
+
+ private void assertRow(Unfiltered unfiltered, int col1)
+ {
+ assertEquals(Unfiltered.Kind.ROW, unfiltered.kind());
+ assertEquals(cfm.comparator.make(col1), unfiltered.clustering());
+ }
+
+ private Iterator<RangeTombstone> createRangeTombstoneIterator(RangeTombstone... tombstones)
+ {
+ RangeTombstoneList list = new RangeTombstoneList(cfm.comparator, 10);
+
+ for (RangeTombstone tombstone : tombstones)
+ list.add(tombstone);
+
+ return list.iterator(Slice.ALL, false);
+ }
+
+ private Iterator<Row> createRowIterator()
+ {
+ PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
+ for (int i = 0; i < 5; i++)
+ addRow(update, i, i);
+
+ return update.iterator();
+ }
+
+ private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, Iterator<RangeTombstone> tombstones, boolean reversed)
+ {
+ return createMergeIterator(rows, tombstones, DeletionTime.LIVE, reversed);
+ }
+
+ private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows,
+ Iterator<RangeTombstone> tombstones,
+ DeletionTime deletionTime,
+ boolean reversed)
+ {
+ return new RowAndDeletionMergeIterator(cfm,
+ Util.dk("k"),
+ deletionTime,
+ ColumnFilter.all(cfm),
+ Rows.EMPTY_STATIC_ROW,
+ reversed,
+ EncodingStats.NO_STATS,
+ rows,
+ tombstones,
+ true);
+ }
+
+ private void addRow(PartitionUpdate update, int col1, int a)
+ {
+ update.add(BTreeRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
+ }
+
+ private Cell makeCell(CFMetaData cfm, ColumnDefinition columnDefinition, int value, long timestamp)
+ {
+ return BufferCell.live(cfm, columnDefinition, timestamp, ((AbstractType)columnDefinition.cellValueType()).decompose(value));
+ }
+
+ private static RangeTombstone atLeast(int start, long tstamp, int delTime)
+ {
+ return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime));
+ }
+
+ private static RangeTombstone atMost(int end, long tstamp, int delTime)
+ {
+ return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
+ }
+
+ private static RangeTombstone lessThan(int end, long tstamp, int delTime)
+ {
+ return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.exclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
+ }
+
+ private static RangeTombstone greaterThan(int start, long tstamp, int delTime)
+ {
+ return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime));
+ }
+
+ private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp, int delTime)
+ {
+ Slice.Bound startBound = startInclusive ? Slice.Bound.inclusiveStartOf(bb(start)) : Slice.Bound.exclusiveStartOf(bb(start));
+ Slice.Bound endBound = endInclusive ? Slice.Bound.inclusiveEndOf(bb(end)) : Slice.Bound.exclusiveEndOf(bb(end));
+
+ return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(tstamp, delTime));
+ }
+
+ private static ByteBuffer bb(int i)
+ {
+ return ByteBufferUtil.bytes(i);
+ }
+
+ private long toMillis(int timeInSeconds)
+ {
+ return timeInSeconds * 1000L;
+ }
+}