You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/12/13 09:27:32 UTC
[08/26] cassandra git commit: Thrift removal
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index e898c4f..dfcd772 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
import com.google.common.collect.ImmutableList;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -63,12 +62,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public class CompositeType extends AbstractCompositeType
{
- public static final int STATIC_MARKER = 0xFFFF;
+ private static final int STATIC_MARKER = 0xFFFF;
public final List<AbstractType<?>> types;
// interning instances
- private static final ConcurrentMap<List<AbstractType<?>>, CompositeType> instances = new ConcurrentHashMap<List<AbstractType<?>>, CompositeType>();
+ private static final ConcurrentMap<List<AbstractType<?>>, CompositeType> instances = new ConcurrentHashMap<>();
public static CompositeType getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
{
@@ -77,7 +76,7 @@ public class CompositeType extends AbstractCompositeType
public static CompositeType getInstance(AbstractType... types)
{
- return getInstance(Arrays.<AbstractType<?>>asList(types));
+ return getInstance(Arrays.asList(types));
}
protected boolean readIsStatic(ByteBuffer bb)
@@ -128,9 +127,8 @@ public class CompositeType extends AbstractCompositeType
}
catch (IndexOutOfBoundsException e)
{
- // We shouldn't get there in general because 1) we shouldn't construct broken composites
- // from CQL and 2) broken composites coming from thrift should be rejected by validate.
- // There is a few cases however where, if the schema has changed since we created/validated
+ // We shouldn't get there in general we shouldn't construct broken composites
+ // but there is a few cases where if the schema has changed since we created/validated
// the composite, this will be thrown (see #6262). Those cases are a user error but
// throwing a more meaningful error message to make understanding such error easier. .
throw new RuntimeException("Cannot get comparator " + i + " in " + this + ". "
@@ -204,11 +202,6 @@ public class CompositeType extends AbstractCompositeType
return l;
}
- public static byte lastEOC(ByteBuffer name)
- {
- return name.get(name.limit() - 1);
- }
-
// Extract component idx from bb. Return null if there is not enough component.
public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
{
@@ -227,13 +220,6 @@ public class CompositeType extends AbstractCompositeType
return null;
}
- // Extract CQL3 column name from the full column name.
- public ByteBuffer extractLastComponent(ByteBuffer bb)
- {
- int idx = types.get(types.size() - 1) instanceof ColumnToCollectionType ? types.size() - 2 : types.size() - 1;
- return extractComponent(bb, idx);
- }
-
public static boolean isStaticName(ByteBuffer bb)
{
return bb.remaining() >= 2 && (ByteBufferUtil.getShortLength(bb, bb.position()) & 0xFFFF) == STATIC_MARKER;
@@ -334,11 +320,6 @@ public class CompositeType extends AbstractCompositeType
return getClass().getName() + TypeParser.stringifyTypeParameters(types);
}
- public Builder builder()
- {
- return new Builder(this);
- }
-
public static ByteBuffer build(ByteBuffer... buffers)
{
return build(false, buffers);
@@ -366,143 +347,4 @@ public class CompositeType extends AbstractCompositeType
out.flip();
return out;
}
-
- public static class Builder
- {
- private final CompositeType composite;
-
- private final List<ByteBuffer> components;
- private final byte[] endOfComponents;
- private int serializedSize;
- private final boolean isStatic;
-
- public Builder(CompositeType composite)
- {
- this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], false);
- }
-
- public static Builder staticBuilder(CompositeType composite)
- {
- return new Builder(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], true);
- }
-
- private Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents, boolean isStatic)
- {
- assert endOfComponents.length == composite.types.size();
-
- this.composite = composite;
- this.components = components;
- this.endOfComponents = endOfComponents;
- this.isStatic = isStatic;
- if (isStatic)
- serializedSize = 2;
- }
-
- private Builder(Builder b)
- {
- this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
- this.serializedSize = b.serializedSize;
- }
-
- public Builder add(ByteBuffer bb)
- {
- if (components.size() >= composite.types.size())
- throw new IllegalStateException("Composite column is already fully constructed");
-
- components.add(bb);
- serializedSize += 3 + bb.remaining(); // 2 bytes lenght + 1 byte eoc
- return this;
- }
-
- public Builder add(ColumnIdentifier name)
- {
- return add(name.bytes);
- }
-
- public int componentCount()
- {
- return components.size();
- }
-
- public int remainingCount()
- {
- return composite.types.size() - components.size();
- }
-
- public ByteBuffer get(int i)
- {
- return components.get(i);
- }
-
- public ByteBuffer build()
- {
- try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize))
- {
- if (isStatic)
- out.writeShort(STATIC_MARKER);
-
- for (int i = 0; i < components.size(); i++)
- {
- ByteBufferUtil.writeWithShortLength(components.get(i), out);
- out.write(endOfComponents[i]);
- }
- return ByteBuffer.wrap(out.getData(), 0, out.getLength());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public ByteBuffer buildAsEndOfRange()
- {
- if (components.isEmpty())
- return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
- ByteBuffer bb = build();
- bb.put(bb.remaining() - 1, (byte)1);
- return bb;
- }
-
- public ByteBuffer buildForRelation(Operator op)
- {
- /*
- * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
- * We can select:
- * - = 'a' by using <'a'><0>
- * - < 'a' by using <'a'><-1>
- * - <= 'a' by using <'a'><1>
- * - > 'a' by using <'a'><1>
- * - >= 'a' by using <'a'><0>
- */
- int current = components.size() - 1;
- switch (op)
- {
- case LT:
- endOfComponents[current] = (byte) -1;
- break;
- case GT:
- case LTE:
- endOfComponents[current] = (byte) 1;
- break;
- default:
- endOfComponents[current] = (byte) 0;
- break;
- }
- return build();
- }
-
- public Builder copy()
- {
- return new Builder(this);
- }
-
- public ByteBuffer getComponent(int i)
- {
- if (i >= components.size())
- throw new IllegalArgumentException();
-
- return components.get(i);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
index afe1cc3..90d64f4 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -36,24 +36,17 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
private final int cachedLiveRows;
private final int rowsWithNonExpiringCells;
- private final int nonTombstoneCellCount;
- private final int nonExpiringLiveCells;
-
private CachedBTreePartition(CFMetaData metadata,
DecoratedKey partitionKey,
Holder holder,
int createdAtInSec,
int cachedLiveRows,
- int rowsWithNonExpiringCells,
- int nonTombstoneCellCount,
- int nonExpiringLiveCells)
+ int rowsWithNonExpiringCells)
{
super(metadata, partitionKey, holder);
this.createdAtInSec = createdAtInSec;
this.cachedLiveRows = cachedLiveRows;
this.rowsWithNonExpiringCells = rowsWithNonExpiringCells;
- this.nonTombstoneCellCount = nonTombstoneCellCount;
- this.nonExpiringLiveCells = nonExpiringLiveCells;
}
/**
@@ -89,30 +82,24 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
int cachedLiveRows = 0;
int rowsWithNonExpiringCells = 0;
- int nonTombstoneCellCount = 0;
- int nonExpiringLiveCells = 0;
for (Row row : BTree.<Row>iterable(holder.tree))
{
if (row.hasLiveData(nowInSec))
++cachedLiveRows;
- int nonExpiringLiveCellsThisRow = 0;
+ boolean hasNonExpiringLiveCell = false;
for (Cell cell : row.cells())
{
- if (!cell.isTombstone())
+ if (!cell.isTombstone() && !cell.isExpiring())
{
- ++nonTombstoneCellCount;
- if (!cell.isExpiring())
- ++nonExpiringLiveCellsThisRow;
+ hasNonExpiringLiveCell = true;
+ break;
}
}
- if (nonExpiringLiveCellsThisRow > 0)
- {
+ if (hasNonExpiringLiveCell)
++rowsWithNonExpiringCells;
- nonExpiringLiveCells += nonExpiringLiveCellsThisRow;
- }
}
return new CachedBTreePartition(iterator.metadata(),
@@ -120,9 +107,7 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
holder,
nowInSec,
cachedLiveRows,
- rowsWithNonExpiringCells,
- nonTombstoneCellCount,
- nonExpiringLiveCells);
+ rowsWithNonExpiringCells);
}
/**
@@ -153,16 +138,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
return rowsWithNonExpiringCells;
}
- public int nonTombstoneCellCount()
- {
- return nonTombstoneCellCount;
- }
-
- public int nonExpiringLiveCells()
- {
- return nonExpiringLiveCells;
- }
-
static class Serializer implements ISerializer<CachedPartition>
{
public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
@@ -175,8 +150,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
out.writeInt(p.createdAtInSec);
out.writeInt(p.cachedLiveRows);
out.writeInt(p.rowsWithNonExpiringCells);
- out.writeInt(p.nonTombstoneCellCount);
- out.writeInt(p.nonExpiringLiveCells);
CFMetaData.serializer.serialize(partition.metadata(), out, version);
try (UnfilteredRowIterator iter = p.unfilteredIterator())
{
@@ -198,8 +171,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
int createdAtInSec = in.readInt();
int cachedLiveRows = in.readInt();
int rowsWithNonExpiringCells = in.readInt();
- int nonTombstoneCellCount = in.readInt();
- int nonExpiringLiveCells = in.readInt();
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
@@ -217,9 +188,7 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
holder,
createdAtInSec,
cachedLiveRows,
- rowsWithNonExpiringCells,
- nonTombstoneCellCount,
- nonExpiringLiveCells);
+ rowsWithNonExpiringCells);
}
@@ -235,8 +204,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
return TypeSizes.sizeof(p.createdAtInSec)
+ TypeSizes.sizeof(p.cachedLiveRows)
+ TypeSizes.sizeof(p.rowsWithNonExpiringCells)
- + TypeSizes.sizeof(p.nonTombstoneCellCount)
- + TypeSizes.sizeof(p.nonExpiringLiveCells)
+ CFMetaData.serializer.serializedSize(partition.metadata(), version)
+ UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
index 0cbaba0..6c781f5 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
@@ -71,24 +71,4 @@ public interface CachedPartition extends Partition, IRowCacheEntry
* @return the last row of the partition, or {@code null} if the partition is empty.
*/
public Row lastRow();
-
- /**
- * The number of {@code cell} objects that are not tombstone in this cached partition.
- *
- * Please note that this is <b>not</b> the number of <em>live</em> cells since
- * some of the cells might be expired.
- *
- * @return the number of non tombstone cells in the partition.
- */
- public int nonTombstoneCellCount();
-
- /**
- * The number of cells in this cached partition that are neither tombstone nor expiring.
- *
- * Note that this is generally not a very meaningful number, but this is used by
- * {@link org.apache.cassandra.db.filter.DataLimits#hasEnoughLiveData} as an optimization.
- *
- * @return the number of cells that are neither tombstones nor expiring.
- */
- public int nonExpiringLiveCells();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 6679bdf..b5580c6 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -25,14 +25,12 @@ import org.apache.cassandra.db.transform.Transformation;
public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator>
{
- private final boolean isForThrift;
private final DeletionPurger purger;
private final int nowInSec;
private boolean isReverseOrder;
- public PurgeFunction(boolean isForThrift, int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
+ public PurgeFunction(int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
{
- this.isForThrift = isForThrift;
this.nowInSec = nowInSec;
this.purger = (timestamp, localDeletionTime) ->
!(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
@@ -64,7 +62,7 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
isReverseOrder = partition.isReverseOrder();
UnfilteredRowIterator purged = Transformation.apply(partition, this);
- if (!isForThrift && purged.isEmpty())
+ if (purged.isEmpty())
{
onEmptyPartitionPostPurge(purged.partitionKey());
purged.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
index 1f966db..3b968e6 100644
--- a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -25,18 +25,11 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator
{
private final UnfilteredRowIterator iter;
- private final boolean isForThrift;
private boolean returned;
- public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter, boolean isForThrift)
+ public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter)
{
this.iter = iter;
- this.isForThrift = isForThrift;
- }
-
- public boolean isForThrift()
- {
- return isForThrift;
}
public CFMetaData metadata()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
index 201c934..872225f 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -30,16 +30,5 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
*/
public interface UnfilteredPartitionIterator extends BasePartitionIterator<UnfilteredRowIterator>
{
- /**
- * Whether that partition iterator is for a thrift queries.
- * <p>
- * If this is true, the partition iterator may return some empty UnfilteredRowIterator and those
- * should be preserved as thrift include partitions that "exists" (have some cells even
- * if this are actually deleted) but have nothing matching the query.
- *
- * @return whether the iterator is for a thrift query.
- */
- public boolean isForThrift();
-
public CFMetaData metadata();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index bcc8d4d..d86e8b1 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -113,7 +113,6 @@ public abstract class UnfilteredPartitionIterators
assert listener != null;
assert !iterators.isEmpty();
- final boolean isForThrift = iterators.get(0).isForThrift();
final CFMetaData metadata = iterators.get(0).metadata();
final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
@@ -155,11 +154,6 @@ public abstract class UnfilteredPartitionIterators
return new AbstractUnfilteredPartitionIterator()
{
- public boolean isForThrift()
- {
- return isForThrift;
- }
-
public CFMetaData metadata()
{
return metadata;
@@ -190,7 +184,6 @@ public abstract class UnfilteredPartitionIterators
if (iterators.size() == 1)
return iterators.get(0);
- final boolean isForThrift = iterators.get(0).isForThrift();
final CFMetaData metadata = iterators.get(0).metadata();
final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
@@ -221,11 +214,6 @@ public abstract class UnfilteredPartitionIterators
return new AbstractUnfilteredPartitionIterator()
{
- public boolean isForThrift()
- {
- return isForThrift;
- }
-
public CFMetaData metadata()
{
return metadata;
@@ -301,7 +289,9 @@ public abstract class UnfilteredPartitionIterators
{
public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
{
- out.writeBoolean(iter.isForThrift());
+ // Previously, a boolean indicating if this was for a thrift query.
+ // Unused since 4.0 but kept on wire for compatibility.
+ out.writeBoolean(false);
while (iter.hasNext())
{
out.writeBoolean(true);
@@ -315,7 +305,8 @@ public abstract class UnfilteredPartitionIterators
public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException
{
- final boolean isForThrift = in.readBoolean();
+ // Skip now unused isForThrift boolean
+ in.readBoolean();
return new AbstractUnfilteredPartitionIterator()
{
@@ -323,11 +314,6 @@ public abstract class UnfilteredPartitionIterators
private boolean hasNext;
private boolean nextReturned = true;
- public boolean isForThrift()
- {
- return isForThrift;
- }
-
public CFMetaData metadata()
{
return metadata;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 30a0a37..8dbc606 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.thrift.ThriftResultsMerger;
import org.apache.cassandra.utils.IteratorWithLowerBound;
/**
@@ -48,27 +47,18 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt
private final SSTableReader sstable;
private final ClusteringIndexFilter filter;
private final ColumnFilter selectedColumns;
- private final boolean isForThrift;
- private final int nowInSec;
- private final boolean applyThriftTransformation;
private ClusteringBound lowerBound;
private boolean firstItemRetrieved;
public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey,
SSTableReader sstable,
ClusteringIndexFilter filter,
- ColumnFilter selectedColumns,
- boolean isForThrift,
- int nowInSec,
- boolean applyThriftTransformation)
+ ColumnFilter selectedColumns)
{
super(partitionKey);
this.sstable = sstable;
this.filter = filter;
this.selectedColumns = selectedColumns;
- this.isForThrift = isForThrift;
- this.nowInSec = nowInSec;
- this.applyThriftTransformation = applyThriftTransformation;
this.lowerBound = null;
this.firstItemRetrieved = false;
}
@@ -102,10 +92,8 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt
sstable.incrementReadCount();
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
- UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift);
- return isForThrift && applyThriftTransformation
- ? ThriftResultsMerger.maybeWrap(iter, nowInSec)
- : iter;
+ UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed());
+ return iter;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..729f06b 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.db.rows.*;
final class Filter extends Transformation
{
- private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
+ private final boolean filterEmpty; // generally true except for direct row filtration
private final int nowInSec;
public Filter(boolean filterEmpty, int nowInSec)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ed643bb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -52,7 +52,7 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
*/
public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
{
- Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
+ Filter filter = new Filter(true, nowInSecs);
if (iterator instanceof UnfilteredPartitions)
return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
return new FilteredPartitions(iterator, filter);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
index bad14ad..de0a51b 100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
@@ -26,18 +26,10 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, UnfilteredPartitionIterator> implements UnfilteredPartitionIterator
{
- final boolean isForThrift;
-
// wrap an iterator for transformation
public UnfilteredPartitions(UnfilteredPartitionIterator input)
{
super(input);
- this.isForThrift = input.isForThrift();
- }
-
- public boolean isForThrift()
- {
- return isForThrift;
}
public CFMetaData metadata()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index a4deb4a..078d712 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -28,16 +28,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TTransport;
public class ConfigHelper
{
@@ -59,16 +53,13 @@ public class ConfigHelper
private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
- private static final String INPUT_THRIFT_PORT = "cassandra.input.thrift.port";
- private static final String OUTPUT_THRIFT_PORT = "cassandra.output.thrift.port";
- private static final String INPUT_INITIAL_THRIFT_ADDRESS = "cassandra.input.thrift.address";
- private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address";
+ private static final String INPUT_INITIAL_ADDRESS = "cassandra.input.address";
+ private static final String OUTPUT_INITIAL_ADDRESS = "cassandra.output.address";
private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
private static final String OUTPUT_LOCAL_DC_ONLY = "cassandra.output.local.dc.only";
- private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
@@ -213,104 +204,28 @@ public class ConfigHelper
}
/**
- * Set the predicate that determines what columns will be selected from each row.
- *
- * @param conf Job configuration you are about to run
- * @param predicate
- */
- public static void setInputSlicePredicate(Configuration conf, SlicePredicate predicate)
- {
- conf.set(INPUT_PREDICATE_CONFIG, thriftToString(predicate));
- }
-
- public static SlicePredicate getInputSlicePredicate(Configuration conf)
- {
- String s = conf.get(INPUT_PREDICATE_CONFIG);
- return s == null ? null : predicateFromString(s);
- }
-
- private static String thriftToString(TBase object)
- {
- assert object != null;
- // this is so awful it's kind of cool!
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try
- {
- return Hex.bytesToHex(serializer.serialize(object));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private static SlicePredicate predicateFromString(String st)
- {
- assert st != null;
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- SlicePredicate predicate = new SlicePredicate();
- try
- {
- deserializer.deserialize(predicate, Hex.hexToBytes(st));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- return predicate;
- }
-
- /**
* Set the KeyRange to limit the rows.
* @param conf Job configuration you are about to run
*/
public static void setInputRange(Configuration conf, String startToken, String endToken)
{
- KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken);
- conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
+ conf.set(INPUT_KEYRANGE_CONFIG, startToken + "," + endToken);
}
/**
- * Set the KeyRange to limit the rows.
- * @param conf Job configuration you are about to run
- */
- public static void setInputRange(Configuration conf, String startToken, String endToken, List<IndexExpression> filter)
- {
- KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken).setRow_filter(filter);
- conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
- }
-
- /**
- * Set the KeyRange to limit the rows.
- * @param conf Job configuration you are about to run
+ * The start and end token of the input key range as a pair.
+ *
+ * may be null if unset.
*/
- public static void setInputRange(Configuration conf, List<IndexExpression> filter)
- {
- KeyRange range = new KeyRange().setRow_filter(filter);
- conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
- }
-
- /** may be null if unset */
- public static KeyRange getInputKeyRange(Configuration conf)
+ public static Pair<String, String> getInputKeyRange(Configuration conf)
{
String str = conf.get(INPUT_KEYRANGE_CONFIG);
- return str == null ? null : keyRangeFromString(str);
- }
+ if (str == null)
+ return null;
- private static KeyRange keyRangeFromString(String st)
- {
- assert st != null;
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- KeyRange keyRange = new KeyRange();
- try
- {
- deserializer.deserialize(keyRange, Hex.hexToBytes(st));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- return keyRange;
+ String[] parts = str.split(",");
+ assert parts.length == 2;
+ return Pair.create(parts[0], parts[1]);
}
public static String getInputKeyspace(Configuration conf)
@@ -413,26 +328,15 @@ public class ConfigHelper
conf.set(WRITE_CONSISTENCY_LEVEL, consistencyLevel);
}
- public static int getInputRpcPort(Configuration conf)
- {
- return Integer.parseInt(conf.get(INPUT_THRIFT_PORT, "9160"));
- }
-
- public static void setInputRpcPort(Configuration conf, String port)
- {
- conf.set(INPUT_THRIFT_PORT, port);
- }
-
public static String getInputInitialAddress(Configuration conf)
{
- return conf.get(INPUT_INITIAL_THRIFT_ADDRESS);
+ return conf.get(INPUT_INITIAL_ADDRESS);
}
public static void setInputInitialAddress(Configuration conf, String address)
{
- conf.set(INPUT_INITIAL_THRIFT_ADDRESS, address);
+ conf.set(INPUT_INITIAL_ADDRESS, address);
}
-
public static void setInputPartitioner(Configuration conf, String classname)
{
conf.set(INPUT_PARTITIONER_CONFIG, classname);
@@ -443,24 +347,14 @@ public class ConfigHelper
return FBUtilities.newPartitioner(conf.get(INPUT_PARTITIONER_CONFIG));
}
- public static int getOutputRpcPort(Configuration conf)
- {
- return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT, "9160"));
- }
-
- public static void setOutputRpcPort(Configuration conf, String port)
- {
- conf.set(OUTPUT_THRIFT_PORT, port);
- }
-
public static String getOutputInitialAddress(Configuration conf)
{
- return conf.get(OUTPUT_INITIAL_THRIFT_ADDRESS);
+ return conf.get(OUTPUT_INITIAL_ADDRESS);
}
public static void setOutputInitialAddress(Configuration conf, String address)
{
- conf.set(OUTPUT_INITIAL_THRIFT_ADDRESS, address);
+ conf.set(OUTPUT_INITIAL_ADDRESS, address);
}
public static void setOutputPartitioner(Configuration conf, String classname)
@@ -493,20 +387,6 @@ public class ConfigHelper
conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
}
- public static void setThriftFramedTransportSizeInMb(Configuration conf, int frameSizeInMB)
- {
- conf.setInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, frameSizeInMB);
- }
-
- /**
- * @param conf The configuration to use.
- * @return Value (converts MBs to Bytes) set by {@link #setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
- */
- public static int getThriftFramedTransportSize(Configuration conf)
- {
- return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
- }
-
public static boolean getOutputLocalDCOnly(Configuration conf)
{
return Boolean.parseBoolean(conf.get(OUTPUT_LOCAL_DC_ONLY, "false"));
@@ -517,79 +397,6 @@ public class ConfigHelper
conf.set(OUTPUT_LOCAL_DC_ONLY, Boolean.toString(localDCOnly));
}
- public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException
- {
- return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","), ConfigHelper.getInputRpcPort(conf));
- }
-
- public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException
- {
- return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","), ConfigHelper.getOutputRpcPort(conf));
- }
-
- private static Cassandra.Client getClientFromAddressList(Configuration conf, String[] addresses, int port) throws IOException
- {
- Cassandra.Client client = null;
- List<IOException> exceptions = new ArrayList<IOException>();
- for (String address : addresses)
- {
- try
- {
- client = createConnection(conf, address, port);
- break;
- }
- catch (IOException ioe)
- {
- exceptions.add(ioe);
- }
- }
- if (client == null)
- {
- logger.error("failed to connect to any initial addresses");
- for (IOException ioe : exceptions)
- {
- logger.error("", ioe);
- }
- throw exceptions.get(exceptions.size() - 1);
- }
- return client;
- }
-
- @SuppressWarnings("resource")
- public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException
- {
- try
- {
- TTransport transport = getClientTransportFactory(conf).openTransport(host, port);
- return new Cassandra.Client(new TBinaryProtocol(transport, true, true));
- }
- catch (Exception e)
- {
- throw new IOException("Unable to connect to server " + host + ":" + port, e);
- }
- }
-
- public static ITransportFactory getClientTransportFactory(Configuration conf)
- {
- String factoryClassName = conf.get(ITransportFactory.PROPERTY_KEY, TFramedTransportFactory.class.getName());
- ITransportFactory factory = getClientTransportFactory(factoryClassName);
- Map<String, String> options = getOptions(conf, factory.supportedOptions());
- factory.setOptions(options);
- return factory;
- }
-
- private static ITransportFactory getClientTransportFactory(String factoryClassName)
- {
- try
- {
- return (ITransportFactory) Class.forName(factoryClassName).newInstance();
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e);
- }
- }
-
private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions)
{
Map<String, String> options = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 5e47ed5..45a227b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -42,8 +42,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.utils.*;
import static java.util.stream.Collectors.toMap;
@@ -133,30 +133,12 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
Session session = cluster.connect())
{
List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
- KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+ Pair<String, String> jobKeyRange = ConfigHelper.getInputKeyRange(conf);
Range<Token> jobRange = null;
if (jobKeyRange != null)
{
- if (jobKeyRange.start_key != null)
- {
- if (!partitioner.preservesOrder())
- throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
- if (jobKeyRange.start_token != null)
- throw new IllegalArgumentException("only start_key supported");
- if (jobKeyRange.end_token != null)
- throw new IllegalArgumentException("only start_key supported");
- jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
- partitioner.getToken(jobKeyRange.end_key));
- }
- else if (jobKeyRange.start_token != null)
- {
- jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
- partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
- }
- else
- {
- logger.warn("ignoring jobKeyRange specified without start_key or start_token");
- }
+ jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.left),
+ partitioner.getTokenFactory().fromString(jobKeyRange.right));
}
Metadata metadata = cluster.getMetadata();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
index 89eebdf..8047e1d 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
@@ -59,7 +59,7 @@ public interface CassandraIndexFunctions
* *other* clustering values in the index - the indexed value being the index table's partition key
* * When the indexed value is a collection value, in which case we also need to capture the cell path from the base
* table
- * * In a KEYS index (for thrift/compact storage/static column indexes), where only the base partition key is
+ * * In a KEYS index (for compact storage/static column indexes), where only the base partition key is
* held in the index table.
*
* Called from indexCfsMetadata
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/IndexEntry.java b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
index 97525d6..3e4e41b 100644
--- a/src/java/org/apache/cassandra/index/internal/IndexEntry.java
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.DecoratedKey;
/**
* Entries in indexes on non-compact tables (tables with composite comparators)
* can be encapsulated as IndexedEntry instances. These are not used when dealing
- * with indexes on static/compact/thrift tables (i.e. KEYS indexes).
+ * with indexes on static/compact tables (i.e. KEYS indexes).
*/
public final class IndexEntry
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index 324d939..b5e4a78 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -69,11 +69,6 @@ public class CompositesSearcher extends CassandraIndexSearcher
private UnfilteredRowIterator next;
- public boolean isForThrift()
- {
- return command.isForThrift();
- }
-
public CFMetaData metadata()
{
return command.metadata();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index 0169d3f..febb09f 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -23,12 +23,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.index.internal.CassandraIndex;
@@ -57,11 +55,6 @@ public class KeysSearcher extends CassandraIndexSearcher
{
private UnfilteredRowIterator next;
- public boolean isForThrift()
- {
- return command.isForThrift();
- }
-
public CFMetaData metadata()
{
return command.metadata();
@@ -92,8 +85,7 @@ public class KeysSearcher extends CassandraIndexSearcher
continue;
ColumnFilter extendedFilter = getExtendedFilter(command.columnFilter());
- SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
- index.baseCfs.metadata,
+ SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
command.nowInSec(),
extendedFilter,
command.rowFilter(),
@@ -108,7 +100,6 @@ public class KeysSearcher extends CassandraIndexSearcher
hit,
indexKey.getKey(),
executionController.writeOpOrderGroup(),
- isForThrift(),
command.nowInSec());
if (dataIter != null)
@@ -151,66 +142,23 @@ public class KeysSearcher extends CassandraIndexSearcher
Row indexHit,
ByteBuffer indexedValue,
OpOrder.Group writeOp,
- boolean isForThrift,
int nowInSec)
{
- if (isForThrift)
+ assert iterator.metadata().isCompactTable();
+ Row data = iterator.staticRow();
+ if (index.isStale(data, indexedValue, nowInSec))
{
- // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
- // is the indexed name and so we need to materialize the partition.
- ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
+ // Index is stale, remove the index entry and ignore
+ index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
+ makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
+ new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+ writeOp);
iterator.close();
- Row data = result.getRow(Clustering.make(index.getIndexedColumn().name.bytes));
- if (data == null)
- return null;
-
- // for thrift tables, we need to compare the index entry against the compact value column,
- // not the column actually designated as the indexed column so we don't use the index function
- // lib for the staleness check like we do in every other case
- Cell baseData = data.getCell(index.baseCfs.metadata.compactValueColumn());
- if (baseData == null || !baseData.isLive(nowInSec) || index.getIndexedColumn().type.compare(indexedValue, baseData.value()) != 0)
- {
- // Index is stale, remove the index entry and ignore
- index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
- Clustering.make(index.getIndexedColumn().name.bytes),
- new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
- writeOp);
- return null;
- }
- else
- {
- if (command.columnFilter().fetches(index.getIndexedColumn()))
- return result.unfilteredIterator();
-
- // The query on the base table used an extended column filter to ensure that the
- // indexed column was actually read for use in the staleness check, before
- // returning the results we must filter the base table partition so that it
- // contains only the originally requested columns. See CASSANDRA-11523
- ClusteringComparator comparator = result.metadata().comparator;
- Slices.Builder slices = new Slices.Builder(comparator);
- for (ColumnDefinition selected : command.columnFilter().fetchedColumns())
- slices.add(Slice.make(comparator, selected.name.bytes));
- return result.unfilteredIterator(ColumnFilter.all(command.metadata()), slices.build(), false);
- }
+ return null;
}
else
{
- assert iterator.metadata().isCompactTable();
- Row data = iterator.staticRow();
- if (index.isStale(data, indexedValue, nowInSec))
- {
- // Index is stale, remove the index entry and ignore
- index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
- makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
- new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
- writeOp);
- iterator.close();
- return null;
- }
- else
- {
- return iterator;
- }
+ return iterator;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index 155cd4f..336a740 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -63,11 +63,6 @@ public class QueryController
this.executionStart = System.nanoTime();
}
- public boolean isForThrift()
- {
- return command.isForThrift();
- }
-
public CFMetaData metadata()
{
return command.metadata();
@@ -101,8 +96,7 @@ public class QueryController
throw new NullPointerException();
try
{
- SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
- cfs.metadata,
+ SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata,
command.nowInSec(),
command.columnFilter(),
command.rowFilter().withoutExpressions(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
index 4410756..8a25f79 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -153,11 +153,6 @@ public class QueryPlan
}
}
- public boolean isForThrift()
- {
- return controller.isForThrift();
- }
-
public CFMetaData metadata()
{
return controller.metadata();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index add8ddc..da37069 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1532,8 +1532,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*/
protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
- public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
- public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
+ public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed);
+ public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed);
public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
@@ -1693,7 +1693,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param dataRange filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift);
+ public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange);
public FileDataInput getFileDataInput(long position)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index ac7801c..02b685b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ChecksumType;
/**
* Legacy bigtable format
@@ -74,7 +73,7 @@ public class BigFormat implements SSTableFormat
@Override
public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData metadata, Version version, SerializationHeader header)
{
- return new RowIndexEntry.Serializer(metadata, version, header);
+ return new RowIndexEntry.Serializer(version, header);
}
static class WriterFactory extends SSTableWriter.Factory
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 8deb685..613961a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -55,19 +55,19 @@ public class BigTableReader extends SSTableReader
super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
- public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
+ public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed)
{
RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ);
- return iterator(null, key, rie, slices, selectedColumns, reversed, isForThrift);
+ return iterator(null, key, rie, slices, selectedColumns, reversed);
}
- public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
+ public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed)
{
if (indexEntry == null)
return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
return reversed
- ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile)
- : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile);
+ ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, ifile)
+ : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, ifile);
}
/**
@@ -75,9 +75,9 @@ public class BigTableReader extends SSTableReader
* @param dataRange filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift)
+ public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange)
{
- return BigTableScanner.getScanner(this, columns, dataRange, isForThrift);
+ return BigTableScanner.getScanner(this, columns, dataRange);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 5260b3a..1b33f5b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -60,7 +60,6 @@ public class BigTableScanner implements ISSTableScanner
private final ColumnFilter columns;
private final DataRange dataRange;
private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
- private final boolean isForThrift;
private long startScan = -1;
private long bytesScanned = 0;
@@ -69,12 +68,12 @@ public class BigTableScanner implements ISSTableScanner
// Full scan of the sstables
public static ISSTableScanner getScanner(SSTableReader sstable)
{
- return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, false, Iterators.singletonIterator(fullRange(sstable)));
+ return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, Iterators.singletonIterator(fullRange(sstable)));
}
- public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, boolean isForThrift)
+ public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange)
{
- return new BigTableScanner(sstable, columns, dataRange, isForThrift, makeBounds(sstable, dataRange).iterator());
+ return new BigTableScanner(sstable, columns, dataRange, makeBounds(sstable, dataRange).iterator());
}
public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
@@ -84,15 +83,15 @@ public class BigTableScanner implements ISSTableScanner
if (positions.isEmpty())
return new EmptySSTableScanner(sstable);
- return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, false, makeBounds(sstable, tokenRanges).iterator());
+ return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, makeBounds(sstable, tokenRanges).iterator());
}
public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
{
- return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, false, rangeIterator);
+ return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, rangeIterator);
}
- private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
+ private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
{
assert sstable != null;
@@ -104,7 +103,6 @@ public class BigTableScanner implements ISSTableScanner
this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
sstable.descriptor.version,
sstable.header);
- this.isForThrift = isForThrift;
this.rangeIterator = rangeIterator;
}
@@ -182,7 +180,7 @@ public class BigTableScanner implements ISSTableScanner
if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey))
{
// Found, just read the dataPosition and seek into index and data files
- long dataPosition = RowIndexEntry.Serializer.readPosition(ifile, sstable.descriptor.version);
+ long dataPosition = RowIndexEntry.Serializer.readPosition(ifile);
ifile.seek(indexPosition);
dfile.seek(dataPosition);
break;
@@ -239,11 +237,6 @@ public class BigTableScanner implements ISSTableScanner
return sstable.toString();
}
- public boolean isForThrift()
- {
- return isForThrift;
- }
-
public CFMetaData metadata()
{
return sstable.metadata;
@@ -359,7 +352,7 @@ public class BigTableScanner implements ISSTableScanner
}
ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
- return sstable.iterator(dfile, partitionKey(), currentEntry, filter.getSlices(BigTableScanner.this.metadata()), columns, filter.isReversed(), isForThrift);
+ return sstable.iterator(dfile, partitionKey(), currentEntry, filter.getSlices(BigTableScanner.this.metadata()), columns, filter.isReversed());
}
catch (CorruptSSTableException | IOException e)
{
@@ -421,11 +414,6 @@ public class BigTableScanner implements ISSTableScanner
return sstable.getFilename();
}
- public boolean isForThrift()
- {
- return false;
- }
-
public CFMetaData metadata()
{
return sstable.metadata;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java b/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
deleted file mode 100644
index 798f09e..0000000
--- a/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * Implementors of IRequestScheduler must provide a constructor taking a RequestSchedulerOptions object.
- */
-public interface IRequestScheduler
-{
- /**
- * Queue incoming request threads
- *
- * @param t Thread handing the request
- * @param id Scheduling parameter, an id to distinguish profiles (users/keyspace)
- * @param timeoutMS The max time in milliseconds to spend blocking for a slot
- */
- public void queue(Thread t, String id, long timeoutMS) throws TimeoutException;
-
- /**
- * A convenience method for indicating when a particular request has completed
- * processing, and before a return to the client
- */
- public void release();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/NoScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/NoScheduler.java b/src/java/org/apache/cassandra/scheduler/NoScheduler.java
deleted file mode 100644
index d3f4ce8..0000000
--- a/src/java/org/apache/cassandra/scheduler/NoScheduler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-import org.apache.cassandra.config.RequestSchedulerOptions;
-
-
-/**
- * This is basically not having a scheduler, the requests are
- * processed as normally would be handled by the JVM.
- */
-public class NoScheduler implements IRequestScheduler
-{
-
- public NoScheduler(RequestSchedulerOptions options) {}
-
- public NoScheduler() {}
-
- public void queue(Thread t, String id, long timeoutMS) {}
-
- public void release() {}
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
deleted file mode 100644
index 904deb3..0000000
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.RequestSchedulerOptions;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * A very basic Round Robin implementation of the RequestScheduler. It handles
- * request groups identified on user/keyspace by placing them in separate
- * queues and servicing a request from each queue in a RoundRobin fashion.
- * It optionally adds weights for each round.
- */
-public class RoundRobinScheduler implements IRequestScheduler
-{
- private static final Logger logger = LoggerFactory.getLogger(RoundRobinScheduler.class);
-
- //Map of queue id to weighted queue
- private final NonBlockingHashMap<String, WeightedQueue> queues;
-
- private final Semaphore taskCount;
-
- // Tracks the count of threads available in all queues
- // Used by the the scheduler thread so we don't need to busy-wait until there is a request to process
- private final Semaphore queueSize = new Semaphore(0, false);
-
- private final int defaultWeight;
- private final Map<String, Integer> weights;
-
- public RoundRobinScheduler(RequestSchedulerOptions options)
- {
- defaultWeight = options.default_weight;
- weights = options.weights;
-
- // the task count is acquired for the first time _after_ releasing a thread, so we pre-decrement
- taskCount = new Semaphore(options.throttle_limit - 1);
-
- queues = new NonBlockingHashMap<String, WeightedQueue>();
- Runnable runnable = () ->
- {
- while (true)
- {
- schedule();
- }
- };
- Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
- scheduler.start();
- logger.info("Started the RoundRobin Request Scheduler");
- }
-
- public void queue(Thread t, String id, long timeoutMS) throws TimeoutException
- {
- WeightedQueue weightedQueue = getWeightedQueue(id);
-
- try
- {
- queueSize.release();
- try
- {
- weightedQueue.put(t, timeoutMS);
- // the scheduler will release us when a slot is available
- }
- catch (TimeoutException | InterruptedException e)
- {
- queueSize.acquireUninterruptibly();
- throw e;
- }
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException("Interrupted while queueing requests", e);
- }
- }
-
- public void release()
- {
- taskCount.release();
- }
-
- private void schedule()
- {
- queueSize.acquireUninterruptibly();
- for (Map.Entry<String,WeightedQueue> request : queues.entrySet())
- {
- WeightedQueue queue = request.getValue();
- //Using the weight, process that many requests at a time (for that scheduler id)
- for (int i=0; i<queue.weight; i++)
- {
- Thread t = queue.poll();
- if (t == null)
- break;
- else
- {
- taskCount.acquireUninterruptibly();
- queueSize.acquireUninterruptibly();
- }
- }
- }
- queueSize.release();
- }
-
- /*
- * Get the Queue for the respective id, if one is not available
- * create a new queue for that corresponding id and return it
- */
- private WeightedQueue getWeightedQueue(String id)
- {
- WeightedQueue weightedQueue = queues.get(id);
- if (weightedQueue != null)
- // queue existed
- return weightedQueue;
-
- WeightedQueue maybenew = new WeightedQueue(id, getWeight(id));
- weightedQueue = queues.putIfAbsent(id, maybenew);
- if (weightedQueue == null)
- {
- return maybenew;
- }
-
- // another thread created the queue
- return weightedQueue;
- }
-
- Semaphore getTaskCount()
- {
- return taskCount;
- }
-
- private int getWeight(String weightingVar)
- {
- return (weights != null && weights.containsKey(weightingVar))
- ? weights.get(weightingVar)
- : defaultWeight;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java b/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
deleted file mode 100644
index 76c7e9d..0000000
--- a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.metrics.LatencyMetrics;
-
-class WeightedQueue
-{
- private final LatencyMetrics metric;
-
- public final String key;
- public final int weight;
- private final SynchronousQueue<Entry> queue;
- public WeightedQueue(String key, int weight)
- {
- this.key = key;
- this.weight = weight;
- this.queue = new SynchronousQueue<Entry>(true);
- this.metric = new LatencyMetrics("scheduler", "WeightedQueue", key);
- }
-
- public void put(Thread t, long timeoutMS) throws InterruptedException, TimeoutException
- {
- if (!queue.offer(new WeightedQueue.Entry(t), timeoutMS, TimeUnit.MILLISECONDS))
- throw new TimeoutException("Failed to acquire request scheduler slot for '" + key + "'");
- }
-
- public Thread poll()
- {
- Entry e = queue.poll();
- if (e == null)
- return null;
- metric.addNano(System.nanoTime() - e.creationTime);
- return e.thread;
- }
-
- @Override
- public String toString()
- {
- return "RoundRobinScheduler.WeightedQueue(key=" + key + " weight=" + weight + ")";
- }
-
- private final static class Entry
- {
- public final long creationTime = System.nanoTime();
- public final Thread thread;
- public Entry(Thread thread)
- {
- this.thread = thread;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 54fa7e2..26489c6 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.cache.*;
import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
@@ -50,7 +49,6 @@ import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -357,58 +355,25 @@ public class CacheService implements CacheServiceMBean
{
assert(cfs.metadata.isCounter());
out.write(cfs.metadata.ksAndCFBytes);
- ByteBufferUtil.writeWithLength(key.partitionKey, out);
- ByteBufferUtil.writeWithLength(key.cellName, out);
+ key.write(out);
}
public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
{
//Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
//parameter so they aren't deserialized here, even though they are serialized by this serializer
- final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
- final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
+ final CounterCacheKey cacheKey = CounterCacheKey.read(cfs.metadata.ksAndCFName, in);
if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled())
return null;
- assert(cfs.metadata.isCounter());
+
return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
{
public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
{
- DecoratedKey key = cfs.decorateKey(partitionKey);
- LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName);
- ColumnDefinition column = name.column;
- CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
-
- int nowInSec = FBUtilities.nowInSeconds();
- ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
- if (path == null)
- builder.add(column);
- else
- builder.select(column, path);
-
- ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, cfs.metadata.comparator), false);
- SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter);
- try (ReadExecutionController controller = cmd.executionController();
- RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, controller), nowInSec))
- {
- Cell cell;
- if (column.isStatic())
- {
- cell = iter.staticRow().getCell(column);
- }
- else
- {
- if (!iter.hasNext())
- return null;
- cell = iter.next().getCell(column);
- }
-
- if (cell == null)
- return null;
-
- ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
- return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, name.clustering, column, path), clockAndCount);
- }
+ ByteBuffer value = cacheKey.readCounterValue(cfs);
+ return value == null
+ ? null
+ : Pair.create(cacheKey, CounterContext.instance().getLocalClockAndCount(value));
}
});
}
@@ -487,7 +452,7 @@ public class CacheService implements CacheServiceMBean
// wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that
// this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this
// part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
- RowIndexEntry.Serializer.skipForCache(input, BigFormat.instance.getLatestVersion());
+ RowIndexEntry.Serializer.skipForCache(input);
return null;
}
RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata,