You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:49 UTC
[25/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 0243b0d..add4445 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -18,15 +18,13 @@
package org.apache.cassandra.db.index.composites;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
/**
@@ -48,67 +46,90 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
*/
public class CompositesIndexOnClusteringKey extends CompositesIndex
{
- public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- // Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n
- // components total (where n is the number of clustering keys)
- int ckCount = baseMetadata.clusteringColumns().size();
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount);
- types.add(SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+
+ List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
for (int i = 0; i < columnDef.position(); i++)
- types.add(baseMetadata.clusteringColumns().get(i).type);
- for (int i = columnDef.position() + 1; i < ckCount; i++)
- types.add(baseMetadata.clusteringColumns().get(i).type);
- return new CompoundDenseCellNameType(types);
+ {
+ ColumnDefinition def = cks.get(i);
+ indexMetadata.addClusteringColumn(def.name, def.type);
+ }
+ for (int i = columnDef.position() + 1; i < cks.size(); i++)
+ {
+ ColumnDefinition def = cks.get(i);
+ indexMetadata.addClusteringColumn(def.name, def.type);
+ }
}
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
- return cell.name().get(columnDef.position());
+ return clustering.get(columnDef.position());
}
- protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
{
- int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
- CBuilder builder = getIndexComparator().prefixBuilder();
+ CBuilder builder = CBuilder.create(getIndexComparator());
builder.add(rowKey);
- for (int i = 0; i < Math.min(columnDef.position(), count); i++)
- builder.add(columnName.get(i));
- for (int i = columnDef.position() + 1; i < count; i++)
- builder.add(columnName.get(i));
- return builder.build();
+ for (int i = 0; i < Math.min(columnDef.position(), prefix.size()); i++)
+ builder.add(prefix.get(i));
+ for (int i = columnDef.position() + 1; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+ return builder;
}
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
{
int ckCount = baseCfs.metadata.clusteringColumns().size();
- CBuilder builder = baseCfs.getComparator().builder();
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
for (int i = 0; i < columnDef.position(); i++)
- builder.add(indexEntry.name().get(i + 1));
+ builder.add(clustering.get(i + 1));
builder.add(indexedValue.getKey());
for (int i = columnDef.position() + 1; i < ckCount; i++)
- builder.add(indexEntry.name().get(i));
+ builder.add(clustering.get(i));
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+ return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
}
@Override
- public boolean indexes(CellName name)
+ protected boolean indexPrimaryKeyColumn()
{
- // For now, assume this is only used in CQL3 when we know name has enough component.
return true;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ @Override
+ public boolean indexes(ColumnDefinition c)
+ {
+ // Actual indexing for this index type is done through maybeIndex
+ return false;
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ return !data.hasLiveData(nowInSec);
+ }
+
+ @Override
+ public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
+ {
+ if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null)
+ insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
+ }
+
+ @Override
+ public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
{
- return data.hasOnlyTombstones(now);
+ if (clustering.get(columnDef.position()) != null && !deletion.isLive())
+ delete(partitionKey, clustering, null, null, deletion, opGroup);
}
@Override
- public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+ public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
{
// We only know that one column of the CQL row has been updated/deleted, but we don't know if the
// full row has been deleted so we should not do anything. If it ends up that the whole row has
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
index 1e40710..50e81c4 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.*;
/**
@@ -38,22 +38,21 @@ public class CompositesIndexOnCollectionKey extends CompositesIndexIncludingColl
return ((CollectionType)columnDef.type).nameComparator();
}
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
- return cell.name().get(columnDef.position() + 1);
+ return path.get(0);
}
@Override
public boolean supportsOperator(Operator operator)
{
return operator == Operator.CONTAINS_KEY ||
- operator == Operator.CONTAINS && columnDef.type instanceof SetType;
+ operator == Operator.CONTAINS && columnDef.type instanceof SetType;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
{
- CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexValue.getKey());
- Cell cell = data.getColumn(name);
- return cell == null || !cell.isLive(now);
+ Cell cell = data.getCell(columnDef, CellPath.create(indexValue));
+ return cell == null || !cell.isLive(nowInSec);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
index 0b7f579..766f803 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.index.composites;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.*;
/**
@@ -38,50 +38,22 @@ public class CompositesIndexOnCollectionKeyAndValue extends CompositesIndexInclu
return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
}
- @Override
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
- {
- final ByteBuffer key = cell.name().get(columnDef.position() + 1);
- final ByteBuffer value = cell.value();
- return CompositeType.build(key, value);
- }
-
- @Override
- public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
- Cell cell = extractTargetCell(entry, data);
- if (cellIsDead(cell, now))
- return true;
- ByteBuffer indexCollectionValue = extractCollectionValue(entry);
- ByteBuffer targetCollectionValue = cell.value();
- AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
- return valueComparator.compare(indexCollectionValue, targetCollectionValue) != 0;
+ return CompositeType.build(path.get(0), cellValue);
}
- private Cell extractTargetCell(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
{
- ByteBuffer collectionKey = extractCollectionKey(entry);
- CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, collectionKey);
- return data.getColumn(name);
- }
+ ByteBuffer[] components = ((CompositeType)getIndexKeyComparator()).split(indexValue);
+ ByteBuffer mapKey = components[0];
+ ByteBuffer mapValue = components[1];
- private ByteBuffer extractCollectionKey(IndexedEntry entry)
- {
- return extractIndexKeyComponent(entry, 0);
- }
-
- private ByteBuffer extractIndexKeyComponent(IndexedEntry entry, int component)
- {
- return CompositeType.extractComponent(entry.indexValue.getKey(), component);
- }
-
- private ByteBuffer extractCollectionValue(IndexedEntry entry)
- {
- return extractIndexKeyComponent(entry, 1);
- }
+ Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
+ if (cell == null || !cell.isLive(nowInSec))
+ return true;
- private boolean cellIsDead(Cell cell, long now)
- {
- return cell == null || !cell.isLive(now);
+ AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
+ return valueComparator.compare(mapValue, cell.value()) != 0;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index a11a0d9..5af842c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -18,19 +18,13 @@
package org.apache.cassandra.db.index.composites;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Iterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CBuilder;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.*;
/**
@@ -46,15 +40,12 @@ import org.apache.cassandra.db.marshal.*;
*/
public class CompositesIndexOnCollectionValue extends CompositesIndex
{
- public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- int prefixSize = columnDef.position();
- List<AbstractType<?>> types = new ArrayList<>(prefixSize + 2);
- types.add(SecondaryIndex.keyComparator);
- for (int i = 0; i < prefixSize; i++)
- types.add(baseMetadata.comparator.subtype(i));
- types.add(((CollectionType)columnDef.type).nameComparator()); // collection key
- return new CompoundDenseCellNameType(types);
+ addGenericClusteringColumns(indexMetadata, baseMetadata, columnDef);
+
+ // collection key
+ indexMetadata.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator());
}
@Override
@@ -63,36 +54,32 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
return ((CollectionType)columnDef.type).valueComparator();
}
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
- return cell.value();
+ return cellValue;
}
- protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
{
- CBuilder builder = getIndexComparator().prefixBuilder();
+ CBuilder builder = CBuilder.create(getIndexComparator());
builder.add(rowKey);
- for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
- builder.add(cellName.get(i));
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+
+ // When indexing, cell will be present, but when searching, it won't (CASSANDRA-7525)
+ if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
+ builder.add(path.get(0));
- // When indexing, cellName is a full name including the collection
- // key. When searching, restricted clustering columns are included
- // but the collection key is not. In this case, don't try to add an
- // element to the builder for it, as it will just end up null and
- // error out when retrieving cells from the index cf (CASSANDRA-7525)
- if (cellName.size() >= columnDef.position() + 1)
- builder.add(cellName.get(columnDef.position() + 1));
- return builder.build();
+ return builder;
}
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
{
- int prefixSize = columnDef.position();
- CellName name = indexEntry.name();
- CBuilder builder = baseCfs.getComparator().builder();
- for (int i = 0; i < prefixSize; i++)
- builder.add(name.get(i + 1));
- return new IndexedEntry(indexedValue, name, indexEntry.timestamp(), name.get(0), builder.build(), name.get(prefixSize + 1));
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
+ for (int i = 0; i < baseCfs.getComparator().size(); i++)
+ builder.add(clustering.get(i + 1));
+ return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
}
@Override
@@ -101,18 +88,15 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
return operator == Operator.CONTAINS && !(columnDef.type instanceof SetType);
}
- @Override
- public boolean indexes(CellName name)
- {
- AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return name.size() > columnDef.position()
- && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
- }
-
- public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
{
- CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexedEntryCollectionKey);
- Cell cell = data.getColumn(name);
- return cell == null || !cell.isLive(now) || ((CollectionType) columnDef.type).valueComparator().compare(entry.indexValue.getKey(), cell.value()) != 0;
+ Iterator<Cell> iter = data.getCells(columnDef);
+ while (iter.hasNext())
+ {
+ Cell cell = iter.next();
+ if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0)
+ return false;
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index df43057..d48e58b 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -18,14 +18,10 @@
package org.apache.cassandra.db.index.composites;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -49,57 +45,59 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
*/
public class CompositesIndexOnPartitionKey extends CompositesIndex
{
- public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
- {
- int ckCount = baseMetadata.clusteringColumns().size();
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1);
- types.add(SecondaryIndex.keyComparator);
- for (int i = 0; i < ckCount; i++)
- types.add(baseMetadata.comparator.subtype(i));
- return new CompoundDenseCellNameType(types);
- }
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
ByteBuffer[] components = keyComparator.split(rowKey);
return components[columnDef.position()];
}
- protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
{
- int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
- CBuilder builder = getIndexComparator().prefixBuilder();
+ CBuilder builder = CBuilder.create(getIndexComparator());
builder.add(rowKey);
- for (int i = 0; i < count; i++)
- builder.add(columnName.get(i));
- return builder.build();
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+ return builder;
}
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
{
int ckCount = baseCfs.metadata.clusteringColumns().size();
- CBuilder builder = baseCfs.getComparator().builder();
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
for (int i = 0; i < ckCount; i++)
- builder.add(indexEntry.name().get(i + 1));
+ builder.add(clustering.get(i + 1));
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+ return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
}
@Override
- public boolean indexes(CellName name)
+ protected boolean indexPrimaryKeyColumn()
{
- // Since a partition key is always full, we always index it
return true;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ @Override
+ public boolean indexes(ColumnDefinition c)
+ {
+ // Actual indexing for this index type is done through maybeIndex
+ return false;
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ return !data.hasLiveData(nowInSec);
+ }
+
+ @Override
+ public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
{
- return data.hasOnlyTombstones(now);
+ insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
}
@Override
- public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+ public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
{
// We only know that one column of the CQL row has been updated/deleted, but we don't know if the
// full row has been deleted so we should not do anything. If it ends up that the whole row has
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index b9dc07f..a88502a 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -18,15 +18,9 @@
package org.apache.cassandra.db.index.composites;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.rows.*;
/**
* Index on a REGULAR column definition on a composite type.
@@ -47,50 +41,35 @@ import org.apache.cassandra.db.marshal.*;
*/
public class CompositesIndexOnRegular extends CompositesIndex
{
- public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
- int prefixSize = columnDef.position();
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
- types.add(SecondaryIndex.keyComparator);
- for (int i = 0; i < prefixSize; i++)
- types.add(baseMetadata.comparator.subtype(i));
- return new CompoundDenseCellNameType(types);
+ return cellValue;
}
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
{
- return cell.value();
- }
-
- protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
- {
- CBuilder builder = getIndexComparator().prefixBuilder();
+ CBuilder builder = CBuilder.create(getIndexComparator());
builder.add(rowKey);
- for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
- builder.add(cellName.get(i));
- return builder.build();
- }
-
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
- {
- CBuilder builder = baseCfs.getComparator().builder();
- for (int i = 0; i < columnDef.position(); i++)
- builder.add(indexEntry.name().get(i + 1));
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+ return builder;
}
- @Override
- public boolean indexes(CellName name)
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
{
- AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return name.size() > columnDef.position()
- && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
+ Clustering clustering = indexEntry.clustering();
+ ClusteringComparator baseComparator = baseCfs.getComparator();
+ CBuilder builder = CBuilder.create(baseComparator);
+ for (int i = 0; i < baseComparator.size(); i++)
+ builder.add(clustering.get(i + 1));
+ return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
{
- CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef);
- Cell cell = data.getColumn(name);
- return cell == null || !cell.isLive(now) || columnDef.type.compare(entry.indexValue.getKey(), cell.value()) != 0;
+ Cell cell = data.getCell(columnDef);
+ return cell == null
+ || !cell.isLive(nowInSec)
+ || columnDef.type.compare(indexValue, cell.value()) != 0;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 88453df..f838ff1 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -17,296 +17,224 @@
*/
package org.apache.cassandra.db.index.composites;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.utils.concurrent.OpOrder;
+
public class CompositesSearcher extends SecondaryIndexSearcher
{
private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
- public CompositesSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns)
+ public CompositesSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
{
super(indexManager, columns);
}
- @Override
- public List<Row> search(ExtendedFilter filter)
+ private boolean isMatchingEntry(DecoratedKey partitionKey, CompositesIndex.IndexedEntry entry, ReadCommand command)
{
- assert filter.getClause() != null && !filter.getClause().isEmpty();
- final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
- final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column);
- // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
- // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
- try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
- {
- return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
- }
+ return command.selects(partitionKey, entry.indexedEntryClustering);
}
- private Composite makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
+ protected UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex secondaryIdx,
+ final DecoratedKey indexKey,
+ final RowIterator indexHits,
+ final ReadCommand command,
+ final ReadOrderGroup orderGroup)
{
- if (key.remaining() == 0)
- return Composites.EMPTY;
+ assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
- Composite prefix;
- IDiskAtomFilter columnFilter = filter.columnFilter(key);
- if (columnFilter instanceof SliceQueryFilter)
- {
- SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
- Composite columnName = isStart ? sqf.start() : sqf.finish();
- prefix = columnName.isEmpty() ? index.getIndexComparator().make(key) : index.makeIndexColumnPrefix(key, columnName);
- }
- else
- {
- prefix = index.getIndexComparator().make(key);
- }
- return isStart ? prefix.start() : prefix.end();
- }
+ assert secondaryIdx instanceof CompositesIndex;
+ final CompositesIndex index = (CompositesIndex)secondaryIdx;
- private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final OpOrder.Group writeOp, final ExtendedFilter filter, final IndexExpression primary, final CompositesIndex index)
- {
- // Start with the most-restrictive indexed clause, then apply remaining clauses
- // to each row matching that clause.
- // TODO: allow merge join instead of just one index + loop
- assert index != null;
- assert index.getIndexCfs() != null;
- final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
-
- if (logger.isDebugEnabled())
- logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary));
-
- /*
- * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
- * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the smallest
- * possible key having a given token. A fix would be to actually store the token along the key in the
- * indexed row.
- */
- final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
- ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).getKey() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).getKey() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
- final CellNameType baseComparator = baseCfs.getComparator();
- final CellNameType indexComparator = index.getIndexCfs().getComparator();
-
- final Composite startPrefix = makePrefix(index, startKey, filter, true);
- final Composite endPrefix = makePrefix(index, endKey, filter, false);
-
- return new ColumnFamilyStore.AbstractScanIterator()
+ return new UnfilteredPartitionIterator()
{
- private Composite lastSeenPrefix = startPrefix;
- private Deque<Cell> indexCells;
- private int columnsRead = Integer.MAX_VALUE;
- private int limit = filter.currentLimit();
- private int columnsCount = 0;
+ private CompositesIndex.IndexedEntry nextEntry;
- // We have to fetch at least two rows to avoid breaking paging if the first row doesn't satisfy all clauses
- private int indexCellsPerQuery = Math.max(2, Math.min(filter.maxColumns(), filter.maxRows()));
+ private UnfilteredRowIterator next;
- public boolean needsFiltering()
+ public boolean isForThrift()
{
- return false;
+ return command.isForThrift();
+ }
+
+ public boolean hasNext()
+ {
+ return prepareNext();
}
- private Row makeReturn(DecoratedKey key, ColumnFamily data)
+ public UnfilteredRowIterator next()
{
- if (data == null)
- return endOfData();
+ if (next == null)
+ prepareNext();
- assert key != null;
- return new Row(key, data);
+ UnfilteredRowIterator toReturn = next;
+ next = null;
+ return toReturn;
}
- protected Row computeNext()
+ private boolean prepareNext()
{
- /*
- * Our internal index code is wired toward internal rows. So we need to accumulate all results for a given
- * row before returning from this method. Which unfortunately means that this method has to do what
- * CFS.filter does for KeysIndex.
- */
- DecoratedKey currentKey = null;
- ColumnFamily data = null;
- Composite previousPrefix = null;
-
- while (true)
- {
- // Did we get more columns that needed to respect the user limit?
- // (but we still need to return what has been fetched already)
- if (columnsCount >= limit)
- return makeReturn(currentKey, data);
+ if (next != null)
+ return true;
- if (indexCells == null || indexCells.isEmpty())
+ if (nextEntry == null)
+ {
+ if (!indexHits.hasNext())
+ return false;
+
+ nextEntry = index.decodeEntry(indexKey, indexHits.next());
+ }
+
+ // Gather all index hits belonging to the same partition and query the data for those hits.
+ // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
+ // 1 read per index hit. However, this basically mean materializing all hits for a partition
+ // in memory so we should consider adding some paging mechanism. However, index hits should
+ // be relatively small so it's much better than the previous code that was materializing all
+ // *data* for a given partition.
+ NavigableSet<Clustering> clusterings = new TreeSet<>(baseCfs.getComparator());
+ List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
+ DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
+
+ while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
+ {
+ // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
+ if (isMatchingEntry(partitionKey, nextEntry, command))
{
- if (columnsRead < indexCellsPerQuery)
- {
- logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, indexCellsPerQuery);
- return makeReturn(currentKey, data);
- }
-
- if (logger.isTraceEnabled())
- logger.trace("Scanning index {} starting with {}",
- index.expressionString(primary), indexComparator.getString(startPrefix));
-
- QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
- index.getIndexCfs().name,
- lastSeenPrefix,
- endPrefix,
- false,
- indexCellsPerQuery,
- filter.timestamp);
- ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
- if (indexRow == null || !indexRow.hasColumns())
- return makeReturn(currentKey, data);
-
- Collection<Cell> sortedCells = indexRow.getSortedColumns();
- columnsRead = sortedCells.size();
- indexCells = new ArrayDeque<>(sortedCells);
- Cell firstCell = sortedCells.iterator().next();
-
- // Paging is racy, so it is possible the first column of a page is not the last seen one.
- if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstCell.name()))
- {
- // skip the row we already saw w/ the last page of results
- indexCells.poll();
- logger.trace("Skipping {}", indexComparator.getString(firstCell.name()));
- }
+ clusterings.add(nextEntry.indexedEntryClustering);
+ entries.add(nextEntry);
}
- while (!indexCells.isEmpty() && columnsCount <= limit)
- {
- Cell cell = indexCells.poll();
- lastSeenPrefix = cell.name();
- if (!cell.isLive(filter.timestamp))
- {
- logger.trace("skipping {}", cell.name());
- continue;
- }
-
- CompositesIndex.IndexedEntry entry = index.decodeEntry(indexKey, cell);
- DecoratedKey dk = baseCfs.partitioner.decorateKey(entry.indexedKey);
-
- // Are we done for this row?
- if (currentKey == null)
- {
- currentKey = dk;
- }
- else if (!currentKey.equals(dk))
- {
- DecoratedKey previousKey = currentKey;
- currentKey = dk;
- previousPrefix = null;
-
- // We're done with the previous row, return it if it had data, continue otherwise
- indexCells.addFirst(cell);
- if (data == null)
- continue;
- else
- return makeReturn(previousKey, data);
- }
-
- if (!range.contains(dk))
- {
- // Either we're not yet in the range cause the range is start excluding, or we're
- // past it.
- if (!range.right.isMinimum() && range.right.compareTo(dk) < 0)
- {
- logger.trace("Reached end of assigned scan range");
- return endOfData();
- }
- else
- {
- logger.debug("Skipping entry {} before assigned scan range", dk.getToken());
- continue;
- }
- }
-
- // Check if this entry cannot be a hit due to the original cell filter
- Composite start = entry.indexedEntryPrefix;
- if (!filter.columnFilter(dk.getKey()).maySelectPrefix(baseComparator, start))
- continue;
-
- // If we've record the previous prefix, it means we're dealing with an index on the collection value. In
- // that case, we can have multiple index prefix for the same CQL3 row. In that case, we want to only add
- // the CQL3 row once (because requesting the data multiple time would be inefficient but more importantly
- // because we shouldn't count the columns multiple times with the lastCounted() call at the end of this
- // method).
- if (previousPrefix != null && previousPrefix.equals(start))
- continue;
- else
- previousPrefix = null;
-
- if (logger.isTraceEnabled())
- logger.trace("Adding index hit to current row for {}", indexComparator.getString(cell.name()));
-
- // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
- // slightly wasteful, but this probably doesn't matter in practice and it simplify things.
- ColumnSlice dataSlice = new ColumnSlice(start, entry.indexedEntryPrefix.end());
- // If the table has static columns, we must fetch them too as they may need to be returned too.
- // Note that this is potentially wasteful for 2 reasons:
- // 1) we will retrieve the static parts for each indexed row, even if we have more than one row in
- // the same partition. If we were to group data queries to rows on the same slice, which would
- // speed up things in general, we would also optimize here since we would fetch static columns only
- // once for each group.
- // 2) at this point we don't know if the user asked for static columns or not, so we might be fetching
- // them for nothing. We would however need to ship the list of "CQL3 columns selected" with getRangeSlice
- // to be able to know that.
- // TODO: we should improve both point above
- ColumnSlice[] slices = baseCfs.metadata.hasStaticColumns()
- ? new ColumnSlice[]{ baseCfs.metadata.comparator.staticPrefix().slice(), dataSlice }
- : new ColumnSlice[]{ dataSlice };
- SliceQueryFilter dataFilter = new SliceQueryFilter(slices, false, Integer.MAX_VALUE, baseCfs.metadata.clusteringColumns().size());
- ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
- if (newData == null || index.isStale(entry, newData, filter.timestamp))
- {
- index.delete(entry, writeOp);
- continue;
- }
-
- assert newData != null : "An entry with no data should have been considered stale";
-
- // We know the entry is not stale and so the entry satisfy the primary clause. So whether
- // or not the data satisfies the other clauses, there will be no point to re-check the
- // same CQL3 row if we run into another collection value entry for this row.
- if (entry.indexedEntryCollectionKey != null)
- previousPrefix = start;
-
- if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryPrefix, entry.indexedEntryCollectionKey))
- continue;
-
- if (data == null)
- data = ArrayBackedSortedColumns.factory.create(baseCfs.metadata);
- data.addAll(newData);
- columnsCount += dataFilter.lastCounted();
- }
- }
- }
+ nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
+ }
+
+ // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
+ if (clusterings.isEmpty())
+ return prepareNext();
+
+ // Query the gathered index hits. We still need to filter stale hits from the resulting query.
+ ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false);
+ SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(baseCfs.metadata,
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ DataLimits.NONE,
+ partitionKey,
+ filter);
+ @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
+ // by the next caller of next, or through closing this iterator is this come before.
+ UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
+ index,
+ indexKey.getKey(),
+ entries,
+ orderGroup.writeOpOrderGroup(),
+ command.nowInSec());
+ if (dataIter.isEmpty())
+ {
+ dataIter.close();
+ return prepareNext();
+ }
+
+ next = dataIter;
+ return true;
+ }
- public void close() throws IOException {}
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ indexHits.close();
+ if (next != null)
+ next.close();
+ }
+ };
+ }
+
+ private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
+ final CompositesIndex index,
+ final ByteBuffer indexValue,
+ final List<CompositesIndex.IndexedEntry> entries,
+ final OpOrder.Group writeOp,
+ final int nowInSec)
+ {
+ return new WrappingUnfilteredRowIterator(dataIter)
+ {
+ private int entriesIdx;
+ private Unfiltered next;
+
+ @Override
+ public boolean hasNext()
+ {
+ return prepareNext();
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ if (next == null)
+ prepareNext();
+
+ Unfiltered toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ private boolean prepareNext()
+ {
+ if (next != null)
+ return true;
+
+ while (next == null && super.hasNext())
+ {
+ next = super.next();
+ if (next.kind() != Unfiltered.Kind.ROW)
+ return true;
+
+ Row row = (Row)next;
+ CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
+ if (!index.isStale(row, indexValue, nowInSec))
+ return true;
+
+ // The entry is stale: delete the entry and ignore otherwise
+ index.delete(entry, writeOp, nowInSec);
+ next = null;
+ }
+ return false;
+ }
+
+ private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec)
+ {
+ assert entriesIdx < entries.size();
+ while (entriesIdx < entries.size())
+ {
+ CompositesIndex.IndexedEntry entry = entries.get(entriesIdx++);
+ // The entries are in clustering order. So that the requested entry should be the
+ // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
+ // that have no corresponding row in the base table typically because of a range
+ // tombstone or partition level deletion. Delete such stale entries.
+ int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering);
+ assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
+ if (cmp == 0)
+ return entry;
+ else
+ index.delete(entry, writeOp, nowInSec);
+ }
+ // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
+ throw new AssertionError();
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index e771d99..7930bd6 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -20,14 +20,15 @@ package org.apache.cassandra.db.index.keys;
import java.nio.ByteBuffer;
import java.util.Set;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.concurrent.OpOrder;
/**
* Implements a secondary index for a column family using a second column family.
@@ -39,41 +40,51 @@ import org.apache.cassandra.exceptions.ConfigurationException;
*/
public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
{
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+ public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
{
- return cell.value();
+ indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
}
- protected CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell)
+ @Override
+ public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
{
- return CellNames.simpleDense(rowKey);
- }
+ super.indexRow(key, row, opGroup, nowInSec);
- public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
- {
- return new KeysSearcher(baseCfs.indexManager, columns);
+ // This is used when building indexes, in particular when the index is first created. On thrift, this
+ // potentially means the column definition just got created, and so we need to check if's not a "dynamic"
+ // row that actually correspond to the index definition.
+ assert baseCfs.metadata.isCompactTable();
+ if (!row.isStatic())
+ {
+ Clustering clustering = row.clustering();
+ if (clustering.get(0).equals(columnDef.name.bytes))
+ {
+ Cell cell = row.getCell(baseCfs.metadata.compactValueColumn());
+ if (cell != null && cell.isLive(nowInSec))
+ insert(key.getKey(), clustering, cell, opGroup);
+ }
+ }
}
- public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
{
- Cell cell = data.getColumn(data.getComparator().makeCellName(columnDef.name.bytes));
- return cell == null || !cell.isLive(now) || columnDef.type.compare(indexedValue, cell.value()) != 0;
+ return cellValue;
}
- public void validateOptions() throws ConfigurationException
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
{
- // no options used
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(rowKey);
+ return builder;
}
- public boolean indexes(CellName name)
+ public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
{
- // This consider the full cellName directly
- AbstractType<?> comparator = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return comparator.compare(columnDef.name.bytes, name.toByteBuffer()) == 0;
+ return new KeysSearcher(baseCfs.indexManager, columns);
}
- protected AbstractType getExpressionComparator()
+ public void validateOptions() throws ConfigurationException
{
- return baseCfs.getComparator().asAbstractType();
+ // no options used
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index b4fd0ba..6b53640 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,190 +17,169 @@
*/
package org.apache.cassandra.db.index.keys;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class KeysSearcher extends SecondaryIndexSearcher
{
private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
- public KeysSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns)
+ public KeysSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
{
super(indexManager, columns);
}
- @Override
- public List<Row> search(ExtendedFilter filter)
+ protected UnfilteredPartitionIterator queryDataFromIndex(final AbstractSimplePerColumnSecondaryIndex index,
+ final DecoratedKey indexKey,
+ final RowIterator indexHits,
+ final ReadCommand command,
+ final ReadOrderGroup orderGroup)
{
- assert filter.getClause() != null && !filter.getClause().isEmpty();
- final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
- final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
- // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
- // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
- try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
+ assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
+
+ return new UnfilteredPartitionIterator()
{
- return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
- }
- }
+ private UnfilteredRowIterator next;
- private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final OpOrder.Group writeOp, final ExtendedFilter filter, final IndexExpression primary, final SecondaryIndex index)
- {
+ public boolean isForThrift()
+ {
+ return command.isForThrift();
+ }
- // Start with the most-restrictive indexed clause, then apply remaining clauses
- // to each row matching that clause.
- // TODO: allow merge join instead of just one index + loop
- assert index != null;
- assert index.getIndexCfs() != null;
- final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
-
- if (logger.isDebugEnabled())
- logger.debug("Most-selective indexed predicate is {}",
- ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary));
-
- /*
- * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
- * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
- * possible key having a given token. A fix would be to actually store the token along the key in the
- * indexed row.
- */
- final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
- CellNameType type = index.getIndexCfs().getComparator();
- final Composite startKey = range.left instanceof DecoratedKey ? type.make(((DecoratedKey)range.left).getKey()) : Composites.EMPTY;
- final Composite endKey = range.right instanceof DecoratedKey ? type.make(((DecoratedKey)range.right).getKey()) : Composites.EMPTY;
-
- final CellName primaryColumn = baseCfs.getComparator().cellFromByteBuffer(primary.column);
-
- return new ColumnFamilyStore.AbstractScanIterator()
- {
- private Composite lastSeenKey = startKey;
- private Iterator<Cell> indexColumns;
- private int columnsRead = Integer.MAX_VALUE;
+ public boolean hasNext()
+ {
+ return prepareNext();
+ }
- protected Row computeNext()
+ public UnfilteredRowIterator next()
{
- int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
- // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
- int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
- while (true)
+ if (next == null)
+ prepareNext();
+
+ UnfilteredRowIterator toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ private boolean prepareNext()
+ {
+ while (next == null && indexHits.hasNext())
{
- if (indexColumns == null || !indexColumns.hasNext())
+ Row hit = indexHits.next();
+ DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0));
+
+ SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
+ baseCfs.metadata,
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ DataLimits.NONE,
+ key,
+ command.clusteringIndexFilter(key));
+ @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
+ // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
+ // by the next caller of next, or through closing this iterator is this come before.
+ UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
+ index,
+ hit,
+ indexKey.getKey(),
+ orderGroup.writeOpOrderGroup(),
+ isForThrift(),
+ command.nowInSec());
+
+ if (dataIter != null)
{
- if (columnsRead < rowsPerQuery)
- {
- logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
- return endOfData();
- }
-
- if (logger.isTraceEnabled() && (index instanceof AbstractSimplePerColumnSecondaryIndex))
- logger.trace("Scanning index {} starting with {}",
- ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey.toByteBuffer()));
-
- QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
- index.getIndexCfs().name,
- lastSeenKey,
- endKey,
- false,
- rowsPerQuery,
- filter.timestamp);
- ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
- logger.trace("fetched {}", indexRow);
- if (indexRow == null)
- {
- logger.trace("no data, all done");
- return endOfData();
- }
-
- Collection<Cell> sortedCells = indexRow.getSortedColumns();
- columnsRead = sortedCells.size();
- indexColumns = sortedCells.iterator();
- Cell firstCell = sortedCells.iterator().next();
-
- // Paging is racy, so it is possible the first column of a page is not the last seen one.
- if (lastSeenKey != startKey && lastSeenKey.equals(firstCell.name()))
- {
- // skip the row we already saw w/ the last page of results
- indexColumns.next();
- logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstCell.name().toByteBuffer()));
- }
- else if (range instanceof Range && indexColumns.hasNext() && firstCell.name().equals(startKey))
- {
- // skip key excluded by range
- indexColumns.next();
- logger.trace("Skipping first key as range excludes it");
- }
+ if (dataIter.isEmpty())
+ dataIter.close();
+ else
+ next = dataIter;
}
+ }
+ return next != null;
+ }
- while (indexColumns.hasNext())
- {
- Cell cell = indexColumns.next();
- lastSeenKey = cell.name();
- if (!cell.isLive(filter.timestamp))
- {
- logger.trace("skipping {}", cell.name());
- continue;
- }
-
- DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey.toByteBuffer());
- if (!range.right.isMinimum() && range.right.compareTo(dk) < 0)
- {
- logger.trace("Reached end of assigned scan range");
- return endOfData();
- }
- if (!range.contains(dk))
- {
- logger.trace("Skipping entry {} outside of assigned scan range", dk.getToken());
- continue;
- }
-
- logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey.toByteBuffer()), filter.timestamp));
- // While the column family we'll get in the end should contains the primary clause cell, the initialFilter may not have found it and can thus be null
- if (data == null)
- data = ArrayBackedSortedColumns.factory.create(baseCfs.metadata);
-
- // as in CFS.filter - extend the filter to ensure we include the columns
- // from the index expressions, just in case they weren't included in the initialFilter
- IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data);
- if (extraFilter != null)
- {
- ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
- if (cf != null)
- data.addAll(cf);
- }
-
- if (((KeysIndex)index).isIndexEntryStale(indexKey.getKey(), data, filter.timestamp))
- {
- // delete the index entry w/ its own timestamp
- Cell dummyCell = new BufferCell(primaryColumn, indexKey.getKey(), cell.timestamp());
- ((PerColumnSecondaryIndex)index).delete(dk.getKey(), dummyCell, writeOp);
- continue;
- }
- return new Row(dk, data);
- }
- }
- }
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
- public void close() throws IOException {}
+ public void close()
+ {
+ indexHits.close();
+ if (next != null)
+ next.close();
+ }
};
}
+
+ private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
+ AbstractSimplePerColumnSecondaryIndex index,
+ Row indexHit,
+ ByteBuffer indexedValue,
+ OpOrder.Group writeOp,
+ boolean isForThrift,
+ int nowInSec)
+ {
+ if (isForThrift)
+ {
+ // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
+ // is the indexed name. Ans so we need to materialize the partition.
+ ArrayBackedPartition result = ArrayBackedPartition.create(iterator);
+ iterator.close();
+ Row data = result.getRow(new SimpleClustering(index.indexedColumn().name.bytes));
+ Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn());
+ return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)
+ ? null
+ : result.unfilteredIterator();
+ }
+ else
+ {
+ assert iterator.metadata().isCompactTable();
+ Row data = iterator.staticRow();
+ Cell cell = data == null ? null : data.getCell(index.indexedColumn());
+ if (deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec))
+ {
+ iterator.close();
+ return null;
+ }
+ else
+ {
+ return iterator;
+ }
+ }
+ }
+
+ private boolean deleteIfStale(DecoratedKey partitionKey,
+ Cell cell,
+ AbstractSimplePerColumnSecondaryIndex index,
+ Row indexHit,
+ ByteBuffer indexedValue,
+ OpOrder.Group writeOp,
+ int nowInSec)
+ {
+ if (cell == null || !cell.isLive(nowInSec) || index.indexedColumn().type.compare(indexedValue, cell.value()) != 0)
+ {
+ // Index is stale, remove the index entry and ignore
+ index.delete(partitionKey.getKey(),
+ new SimpleClustering(index.indexedColumn().name.bytes),
+ indexedValue,
+ null,
+ new SimpleDeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+ writeOp);
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
index ff2abcb..52ac227 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
@@ -6,16 +6,16 @@ import java.util.List;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;
import org.apache.cassandra.utils.IntervalTree;
-public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
+public class SSTableIntervalTree extends IntervalTree<PartitionPosition, SSTableReader, Interval<PartitionPosition, SSTableReader>>
{
private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
- SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
+ SSTableIntervalTree(Collection<Interval<PartitionPosition, SSTableReader>> intervals)
{
super(intervals);
}
@@ -30,11 +30,11 @@ public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader
return new SSTableIntervalTree(buildIntervals(sstables));
}
- public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
+ public static List<Interval<PartitionPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
{
- List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
+ List<Interval<PartitionPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
for (SSTableReader sstable : sstables)
- intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
+ intervals.add(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
return intervals;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 0d1100b..f710dda 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -25,7 +25,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;
@@ -126,12 +126,12 @@ public class View
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
- public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
+ public List<SSTableReader> sstablesInBounds(AbstractBounds<PartitionPosition> rowBounds)
{
if (intervalTree.isEmpty())
return Collections.emptyList();
- RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
- return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
+ PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
+ return intervalTree.search(Interval.<PartitionPosition, SSTableReader>create(rowBounds.left, stopInTree));
}
// METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index aa25a81..b074b34 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.db.marshal;
+import java.io.DataInput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,11 +29,15 @@ import java.util.Map;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.github.jamm.Unmetered;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Specifies a Comparator for a specific type of ByteBuffer.
@@ -87,6 +93,9 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
/** get a string representation of the bytes suitable for log messages */
public String getString(ByteBuffer bytes)
{
+ if (bytes == null)
+ return "null";
+
TypeSerializer<T> serializer = getSerializer();
serializer.validate(bytes);
@@ -132,6 +141,17 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
return new CQL3Type.Custom(this);
}
+ /**
+ * Same as compare except that this ignore ReversedType. This is to be use when
+ * comparing 2 values to decide for a CQL condition (see Operator.isSatisfiedBy) as
+ * for CQL, ReversedType is simply an "hint" to the storage engine but it does not
+ * change the meaning of queries per-se.
+ */
+ public int compareForCQL(ByteBuffer v1, ByteBuffer v2)
+ {
+ return compare(v1, v2);
+ }
+
public abstract TypeSerializer<T> getSerializer();
/* convenience method */
@@ -291,6 +311,50 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
}
/**
+ * The length of values for this type if all values are of fixed length, -1 otherwise.
+ */
+ protected int valueLengthIfFixed()
+ {
+ return -1;
+ }
+
+ // This assumes that no empty values are passed
+ public void writeValue(ByteBuffer value, DataOutputPlus out) throws IOException
+ {
+ assert value.hasRemaining();
+ if (valueLengthIfFixed() >= 0)
+ out.write(value);
+ else
+ ByteBufferUtil.writeWithLength(value, out);
+ }
+
+ public long writtenLength(ByteBuffer value, TypeSizes sizes)
+ {
+ assert value.hasRemaining();
+ return valueLengthIfFixed() >= 0
+ ? value.remaining()
+ : sizes.sizeofWithLength(value);
+ }
+
+ public ByteBuffer readValue(DataInput in) throws IOException
+ {
+ int length = valueLengthIfFixed();
+ if (length >= 0)
+ return ByteBufferUtil.read(in, length);
+ else
+ return ByteBufferUtil.readWithLength(in);
+ }
+
+ public void skipValue(DataInput in) throws IOException
+ {
+ int length = valueLengthIfFixed();
+ if (length < 0)
+ length = in.readInt();
+
+ FileUtils.skipBytesFully(in, length);
+ }
+
+ /**
* This must be overriden by subclasses if necessary so that for any
* AbstractType, this == TypeParser.parse(toString()).
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/BooleanType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/BooleanType.java b/src/java/org/apache/cassandra/db/marshal/BooleanType.java
index bfe8c34..f87eb12 100644
--- a/src/java/org/apache/cassandra/db/marshal/BooleanType.java
+++ b/src/java/org/apache/cassandra/db/marshal/BooleanType.java
@@ -94,4 +94,10 @@ public class BooleanType extends AbstractType<Boolean>
{
return BooleanSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 1;
+ }
}