You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:42 UTC
[06/13] Push composites support in the storage engine
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 357ad65..27f0dd3 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -27,9 +27,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
@@ -144,12 +144,12 @@ public class SecondaryIndexManager
logger.info("Index build of {} complete", idxNames);
}
- public boolean indexes(ByteBuffer name, Collection<SecondaryIndex> indexes)
+ public boolean indexes(CellName name, Collection<SecondaryIndex> indexes)
{
return !indexFor(name, indexes).isEmpty();
}
- public List<SecondaryIndex> indexFor(ByteBuffer name, Collection<SecondaryIndex> indexes)
+ public List<SecondaryIndex> indexFor(CellName name, Collection<SecondaryIndex> indexes)
{
List<SecondaryIndex> matching = null;
for (SecondaryIndex index : indexes)
@@ -169,12 +169,12 @@ public class SecondaryIndexManager
return indexes(column.name());
}
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
return indexes(name, indexesByColumn.values());
}
- public List<SecondaryIndex> indexFor(ByteBuffer name)
+ public List<SecondaryIndex> indexFor(CellName name)
{
return indexFor(name, indexesByColumn.values());
}
@@ -437,7 +437,8 @@ public class SecondaryIndexManager
for (Column column : indexedColumnsInRow)
{
- SecondaryIndex index = indexesByColumn.get(column.name());
+ // TODO: this is probably incorrect, we should pull all indexes
+ SecondaryIndex index = indexesByColumn.get(column.name().toByteBuffer());
if (index == null)
continue;
@@ -559,8 +560,12 @@ public class SecondaryIndexManager
public boolean validate(Column column)
{
- SecondaryIndex index = getIndexForColumn(column.name());
- return index == null || index.validate(column);
+ for (SecondaryIndex index : indexFor(column.name()))
+ {
+ if (!index.validate(column))
+ return false;
+ }
+ return true;
}
public static interface Updater
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 6d137ca..95314cf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -24,12 +24,15 @@ import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.exceptions.ConfigurationException;
/**
@@ -37,9 +40,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
*/
public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
{
- private volatile CompositeType indexComparator;
+ private volatile CellNameType indexComparator;
- protected CompositeType getIndexComparator()
+ protected CellNameType getIndexComparator()
{
// Yes, this is racy, but doing this more than once is not a big deal, we just want to avoid doing it every time
// More seriously, we should fix that whole SecondaryIndex API so this can be a final and avoid all that non-sense.
@@ -81,7 +84,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
}
// Check SecondaryIndex.getIndexComparator if you want to know why this is static
- public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
+ public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
{
if (cfDef.type.isCollection())
{
@@ -110,12 +113,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
throw new AssertionError();
}
- protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
+ protected CellName makeIndexColumnName(ByteBuffer rowKey, Column column)
{
- return makeIndexColumnNameBuilder(rowKey, column.name()).build();
+ return getIndexComparator().create(makeIndexColumnPrefix(rowKey, column.name()), null);
}
- protected abstract ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName);
+ protected abstract Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName);
public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
@@ -132,17 +135,11 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
}
- protected AbstractType getExpressionComparator()
+ protected AbstractType<?> getExpressionComparator()
{
return baseCfs.metadata.getColumnDefinitionComparator(columnDef);
}
- protected CompositeType getBaseComparator()
- {
- assert baseCfs.getComparator() instanceof CompositeType;
- return (CompositeType)baseCfs.getComparator();
- }
-
public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
{
return new CompositesSearcher(baseCfs.indexManager, columns);
@@ -166,45 +163,31 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public static class IndexedEntry
{
public final DecoratedKey indexValue;
- public final ByteBuffer indexEntry;
+ public final CellName indexEntry;
public final long timestamp;
public final ByteBuffer indexedKey;
- public final ColumnNameBuilder indexedEntryNameBuilder;
+ public final Composite indexedEntryPrefix;
public final ByteBuffer indexedEntryCollectionKey; // may be null
- public IndexedEntry(DecoratedKey indexValue,
- ByteBuffer indexEntry,
- long timestamp,
- ByteBuffer indexedKey,
- ColumnNameBuilder indexedEntryNameBuilder)
+ public IndexedEntry(DecoratedKey indexValue, CellName indexEntry, long timestamp, ByteBuffer indexedKey, Composite indexedEntryPrefix)
{
- this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryNameBuilder, null);
+ this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryPrefix, null);
}
public IndexedEntry(DecoratedKey indexValue,
- ByteBuffer indexEntry,
+ CellName indexEntry,
long timestamp,
ByteBuffer indexedKey,
- ColumnNameBuilder indexedEntryNameBuilder,
+ Composite indexedEntryPrefix,
ByteBuffer indexedEntryCollectionKey)
{
this.indexValue = indexValue;
this.indexEntry = indexEntry;
this.timestamp = timestamp;
this.indexedKey = indexedKey;
- this.indexedEntryNameBuilder = indexedEntryNameBuilder;
+ this.indexedEntryPrefix = indexedEntryPrefix;
this.indexedEntryCollectionKey = indexedEntryCollectionKey;
}
-
- public ByteBuffer indexedEntryStart()
- {
- return indexedEntryNameBuilder.build();
- }
-
- public ByteBuffer indexedEntryEnd()
- {
- return indexedEntryNameBuilder.buildAsEndOfRange();
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 63889ee..38c55fd 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
@@ -47,62 +47,55 @@ import org.apache.cassandra.db.marshal.*;
*/
public class CompositesIndexOnClusteringKey extends CompositesIndex
{
- public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
{
// Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n
// components total (where n is the number of clustering keys)
int ckCount = baseMetadata.clusteringColumns().size();
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount);
- List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
types.add(SecondaryIndex.keyComparator);
for (int i = 0; i < columnDef.position(); i++)
- types.add(ckTypes.get(i));
+ types.add(baseMetadata.clusteringColumns().get(i).type);
for (int i = columnDef.position() + 1; i < ckCount; i++)
- types.add(ckTypes.get(i));
- return CompositeType.getInstance(types);
+ types.add(baseMetadata.clusteringColumns().get(i).type);
+ return new CompoundDenseCellNameType(types);
}
protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
{
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(column.name());
- return components[columnDef.position()];
+ return column.name().get(columnDef.position());
}
- protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
{
- int ckCount = baseCfs.metadata.clusteringColumns().size();
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(columnName);
- CompositeType.Builder builder = getIndexComparator().builder();
+ int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
+ CBuilder builder = getIndexComparator().prefixBuilder();
builder.add(rowKey);
-
- for (int i = 0; i < Math.min(components.length, columnDef.position()); i++)
- builder.add(components[i]);
- for (int i = columnDef.position() + 1; i < Math.min(components.length, ckCount); i++)
- builder.add(components[i]);
- return builder;
+ for (int i = 0; i < Math.min(columnDef.position(), count); i++)
+ builder.add(columnName.get(i));
+ for (int i = columnDef.position() + 1; i < count; i++)
+ builder.add(columnName.get(i));
+ return builder.build();
}
public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
{
int ckCount = baseCfs.metadata.clusteringColumns().size();
- ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
- ColumnNameBuilder builder = getBaseComparator().builder();
+ CBuilder builder = baseCfs.getComparator().builder();
for (int i = 0; i < columnDef.position(); i++)
- builder.add(components[i + 1]);
+ builder.add(indexEntry.name().get(i + 1));
builder.add(indexedValue.key);
for (int i = columnDef.position() + 1; i < ckCount; i++)
- builder.add(components[i]);
+ builder.add(indexEntry.name().get(i));
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+ return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
}
@Override
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
// For now, assume this is only used in CQL3 when we know name has enough component.
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
index c2acfc9..f3daaf2 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -23,11 +23,14 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.LocalToken;
/**
* Index on the collection element of the cell name of a collection.
@@ -45,15 +48,14 @@ import org.apache.cassandra.dht.LocalToken;
*/
public class CompositesIndexOnCollectionKey extends CompositesIndex
{
- public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
{
int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
- List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
types.add(SecondaryIndex.keyComparator);
for (int i = 0; i < count - 1; i++)
- types.add(ckTypes.get(i));
- return CompositeType.getInstance(types);
+ types.add(baseMetadata.comparator.subtype(i));
+ return new CompoundDenseCellNameType(types);
}
@Override
@@ -64,49 +66,41 @@ public class CompositesIndexOnCollectionKey extends CompositesIndex
protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
{
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(column.name());
- return components[columnDef.position() + 1];
+ return column.name().get(columnDef.position() + 1);
}
- protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
{
int count = 1 + baseCfs.metadata.clusteringColumns().size();
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(columnName);
- CompositeType.Builder builder = getIndexComparator().builder();
+ CBuilder builder = getIndexComparator().builder();
builder.add(rowKey);
for (int i = 0; i < count - 1; i++)
- builder.add(components[i]);
- return builder;
+ builder.add(cellName.get(i));
+ return builder.build();
}
public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
{
int count = 1 + baseCfs.metadata.clusteringColumns().size();
- ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
-
- ColumnNameBuilder builder = getBaseComparator().builder();
+ CBuilder builder = baseCfs.getComparator().builder();
for (int i = 0; i < count - 1; i++)
- builder.add(components[i + 1]);
-
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+ builder.add(indexEntry.name().get(i + 1));
+ return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
}
@Override
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
// We index if the CQL3 column name is the one of the collection we index
- ByteBuffer[] components = getBaseComparator().split(name);
AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return components.length > columnDef.position()
- && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+ return name.size() > columnDef.position()
+ && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
}
public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexValue.key).build();
- Column liveColumn = data.getColumn(bb);
+ CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef.name, entry.indexValue.key);
+ Column liveColumn = data.getColumn(name);
return (liveColumn == null || liveColumn.isMarkedForDelete(now));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index f416d0e..9bf297b 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -23,11 +23,14 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.LocalToken;
/**
* Index the value of a collection cell.
@@ -42,15 +45,15 @@ import org.apache.cassandra.dht.LocalToken;
*/
public class CompositesIndexOnCollectionValue extends CompositesIndex
{
- public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
{
int prefixSize = columnDef.position();
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 2);
types.add(SecondaryIndex.keyComparator);
for (int i = 0; i < prefixSize; i++)
- types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+ types.add(baseMetadata.comparator.subtype(i));
types.add(((CollectionType)columnDef.type).nameComparator()); // collection key
- return CompositeType.getInstance(types);
+ return new CompoundDenseCellNameType(types);
}
@Override
@@ -64,43 +67,38 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
return column.value();
}
- protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
{
- int prefixSize = columnDef.position();
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(columnName);
- assert components.length == baseComparator.types.size();
- CompositeType.Builder builder = getIndexComparator().builder();
+ CBuilder builder = getIndexComparator().prefixBuilder();
builder.add(rowKey);
- for (int i = 0; i < prefixSize; i++)
- builder.add(components[i]);
- builder.add(components[prefixSize + 1]);
- return builder;
+ for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
+ builder.add(cellName.get(i));
+ builder.add(cellName.get(columnDef.position() + 1));
+ return builder.build();
}
public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
{
int prefixSize = columnDef.position();
- ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
- CompositeType.Builder builder = getBaseComparator().builder();
+ CellName name = indexEntry.name();
+ CBuilder builder = baseCfs.getComparator().builder();
for (int i = 0; i < prefixSize; i++)
- builder.add(components[i + 1]);
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder, components[prefixSize + 1]);
+ builder.add(name.get(i + 1));
+ return new IndexedEntry(indexedValue, name, indexEntry.timestamp(), name.get(0), builder.build(), name.get(prefixSize + 1));
}
@Override
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
- ByteBuffer[] components = getBaseComparator().split(name);
AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return components.length > columnDef.position()
- && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+ return name.size() > columnDef.position()
+ && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
}
public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexedEntryCollectionKey).build();
- Column liveColumn = data.getColumn(bb);
+ CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef.name, entry.indexedEntryCollectionKey);
+ Column liveColumn = data.getColumn(name);
if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index d097899..6df1e8d 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
@@ -48,13 +48,14 @@ import org.apache.cassandra.db.marshal.*;
*/
public class CompositesIndexOnPartitionKey extends CompositesIndex
{
- public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
{
int ckCount = baseMetadata.clusteringColumns().size();
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1);
types.add(SecondaryIndex.keyComparator);
- types.addAll(baseMetadata.comparator.getComponents());
- return CompositeType.getInstance(types);
+ for (int i = 0; i < ckCount; i++)
+ types.add(baseMetadata.comparator.subtype(i));
+ return new CompoundDenseCellNameType(types);
}
protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
@@ -64,32 +65,28 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
return components[columnDef.position()];
}
- protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
{
- int ckCount = baseCfs.metadata.clusteringColumns().size();
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(columnName);
- CompositeType.Builder builder = getIndexComparator().builder();
+ int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
+ CBuilder builder = getIndexComparator().prefixBuilder();
builder.add(rowKey);
- for (int i = 0; i < ckCount; i++)
- builder.add(components[i]);
- return builder;
+ for (int i = 0; i < count; i++)
+ builder.add(columnName.get(i));
+ return builder.build();
}
public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
{
int ckCount = baseCfs.metadata.clusteringColumns().size();
- ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
-
- ColumnNameBuilder builder = getBaseComparator().builder();
+ CBuilder builder = baseCfs.getComparator().builder();
for (int i = 0; i < ckCount; i++)
- builder.add(components[i + 1]);
+ builder.add(indexEntry.name().get(i + 1));
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+ return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
}
@Override
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
// Since a partition key is always full, we always index it
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 55a0f88..6903b77 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
@@ -47,14 +47,14 @@ import org.apache.cassandra.db.marshal.*;
*/
public class CompositesIndexOnRegular extends CompositesIndex
{
- public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
{
int prefixSize = columnDef.position();
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
types.add(SecondaryIndex.keyComparator);
for (int i = 0; i < prefixSize; i++)
- types.add(((CompositeType)baseMetadata.comparator).types.get(i));
- return CompositeType.getInstance(types);
+ types.add(baseMetadata.comparator.subtype(i));
+ return new CompoundDenseCellNameType(types);
}
protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
@@ -62,39 +62,35 @@ public class CompositesIndexOnRegular extends CompositesIndex
return column.value();
}
- protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
{
- CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- ByteBuffer[] components = baseComparator.split(columnName);
- CompositeType.Builder builder = getIndexComparator().builder();
+ CBuilder builder = getIndexComparator().prefixBuilder();
builder.add(rowKey);
- for (int i = 0; i < Math.min(columnDef.position(), components.length); i++)
- builder.add(components[i]);
- return builder;
+ for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
+ builder.add(cellName.get(i));
+ return builder.build();
}
public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
{
- ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
- CompositeType.Builder builder = getBaseComparator().builder();
+ CBuilder builder = baseCfs.getComparator().builder();
for (int i = 0; i < columnDef.position(); i++)
- builder.add(components[i + 1]);
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+ builder.add(indexEntry.name().get(i + 1));
+ return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
}
@Override
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
- ByteBuffer[] components = getBaseComparator().split(name);
AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return components.length > columnDef.position()
- && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+ return name.size() > columnDef.position()
+ && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
}
public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
- Column liveColumn = data.getColumn(bb);
+ CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef.name);
+ Column liveColumn = data.getColumn(name);
if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index bcb0dd2..97602af 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -24,15 +24,16 @@ import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,23 +53,23 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return baseCfs.filter(getIndexedIterator(filter), filter);
}
- private ByteBuffer makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
+ private Composite makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
{
if (key.remaining() == 0)
- return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ return Composites.EMPTY;
- ColumnNameBuilder builder;
+ Composite prefix;
IDiskAtomFilter columnFilter = filter.columnFilter(key);
if (columnFilter instanceof SliceQueryFilter)
{
SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
- builder = index.makeIndexColumnNameBuilder(key, isStart ? sqf.start() : sqf.finish());
+ prefix = index.makeIndexColumnPrefix(key, isStart ? sqf.start() : sqf.finish());
}
else
{
- builder = index.getIndexComparator().builder().add(key);
+ prefix = index.getIndexComparator().make(key);
}
- return isStart ? builder.build() : builder.buildAsEndOfRange();
+ return isStart ? prefix.start() : prefix.end();
}
private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
@@ -94,15 +95,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- final CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
- final CompositeType indexComparator = (CompositeType)index.getIndexCfs().getComparator();
+ final CellNameType baseComparator = baseCfs.getComparator();
+ final CellNameType indexComparator = index.getIndexCfs().getComparator();
- final ByteBuffer startPrefix = makePrefix(index, startKey, filter, true);
- final ByteBuffer endPrefix = makePrefix(index, endKey, filter, false);
+ final Composite startPrefix = makePrefix(index, startKey, filter, true);
+ final Composite endPrefix = makePrefix(index, endKey, filter, false);
return new ColumnFamilyStore.AbstractScanIterator()
{
- private ByteBuffer lastSeenPrefix = startPrefix;
+ private Composite lastSeenPrefix = startPrefix;
private Deque<Column> indexColumns;
private int columnsRead = Integer.MAX_VALUE;
private int limit = filter.currentLimit();
@@ -135,7 +136,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
*/
DecoratedKey currentKey = null;
ColumnFamily data = null;
- ByteBuffer previousPrefix = null;
+ Composite previousPrefix = null;
while (true)
{
@@ -229,7 +230,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
}
// Check if this entry cannot be a hit due to the original column filter
- ByteBuffer start = entry.indexedEntryStart();
+ Composite start = entry.indexedEntryPrefix;
if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
continue;
@@ -248,7 +249,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
// slightly wasteful, but this probably doesn't matter in practice and it simplify things.
SliceQueryFilter dataFilter = new SliceQueryFilter(start,
- entry.indexedEntryEnd(),
+ entry.indexedEntryPrefix.end(),
false,
Integer.MAX_VALUE,
baseCfs.metadata.clusteringColumns().size());
@@ -267,7 +268,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (entry.indexedEntryCollectionKey != null)
previousPrefix = start;
- if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder, entry.indexedEntryCollectionKey))
+ if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryPrefix, entry.indexedEntryCollectionKey))
continue;
if (data == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 7d98c24..ee56c36 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.index.keys;
import java.nio.ByteBuffer;
import java.util.Set;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
@@ -38,9 +40,9 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
return column.value();
}
- protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
+ protected CellName makeIndexColumnName(ByteBuffer rowKey, Column column)
{
- return rowKey;
+ return CellNames.simpleDense(rowKey);
}
public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
@@ -50,7 +52,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
{
- Column liveColumn = data.getColumn(columnDef.name.bytes);
+ Column liveColumn = data.getColumn(data.getComparator().makeCellName(columnDef.name.bytes));
if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
@@ -63,8 +65,15 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
// no options used
}
+ public boolean indexes(CellName name)
+ {
+ // This consider the full cellName directly
+ AbstractType<?> comparator = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+ return comparator.compare(columnDef.name.bytes, name.toByteBuffer()) == 0;
+ }
+
protected AbstractType getExpressionComparator()
{
- return baseCfs.getComparator();
+ return baseCfs.getComparator().asAbstractType();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 3740e24..0101a0b 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -28,13 +28,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.index.*;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
public class KeysSearcher extends SecondaryIndexSearcher
@@ -75,12 +78,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
* indexed row.
*/
final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
- final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ CellNameType type = index.getIndexCfs().getComparator();
+ final Composite startKey = range.left instanceof DecoratedKey ? type.make(((DecoratedKey)range.left).key) : Composites.EMPTY;
+ final Composite endKey = range.right instanceof DecoratedKey ? type.make(((DecoratedKey)range.right).key) : Composites.EMPTY;
+
+ final CellName primaryColumn = baseCfs.getComparator().cellFromByteBuffer(primary.column);
return new ColumnFamilyStore.AbstractScanIterator()
{
- private ByteBuffer lastSeenKey = startKey;
+ private Composite lastSeenKey = startKey;
private Iterator<Column> indexColumns;
private int columnsRead = Integer.MAX_VALUE;
@@ -101,7 +107,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
if (logger.isTraceEnabled() && (index instanceof AbstractSimplePerColumnSecondaryIndex))
logger.trace("Scanning index {} starting with {}",
- ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey));
+ ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey.toByteBuffer()));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
index.getIndexCfs().name,
@@ -128,7 +134,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
{
// skip the row we already saw w/ the last page of results
indexColumns.next();
- logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
+ logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name().toByteBuffer()));
}
else if (range instanceof Range && indexColumns.hasNext() && firstColumn.name().equals(startKey))
{
@@ -148,7 +154,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
continue;
}
- DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+ DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey.toByteBuffer());
if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
{
logger.trace("Reached end of assigned scan range");
@@ -161,7 +167,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey), filter.timestamp));
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey.toByteBuffer()), filter.timestamp));
// While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
if (data == null)
data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
@@ -179,7 +185,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data, filter.timestamp))
{
// delete the index entry w/ its own timestamp
- Column dummyColumn = new Column(primary.column, indexKey.key, column.timestamp());
+ Column dummyColumn = new Column(primaryColumn, indexKey.key, column.timestamp());
((PerColumnSecondaryIndex)index).delete(dk.key, dummyColumn);
continue;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
index 01db148..a7162ae 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -44,5 +45,5 @@ public abstract class AbstractCommutativeType extends AbstractType<Long>
/**
* create commutative column
*/
- public abstract Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp);
+ public abstract Column createColumn(CellName name, ByteBuffer value, long timestamp);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index d002aa7..be66d21 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -35,21 +35,21 @@ import java.util.List;
public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
{
// changes bb position
- protected static int getShortLength(ByteBuffer bb)
+ public static int getShortLength(ByteBuffer bb)
{
int length = (bb.get() & 0xFF) << 8;
return length | (bb.get() & 0xFF);
}
// changes bb position
- protected static void putShortLength(ByteBuffer bb, int length)
+ public static void putShortLength(ByteBuffer bb, int length)
{
bb.put((byte) ((length >> 8) & 0xFF));
bb.put((byte) (length & 0xFF));
}
// changes bb position
- protected static ByteBuffer getBytes(ByteBuffer bb, int length)
+ public static ByteBuffer getBytes(ByteBuffer bb, int length)
{
ByteBuffer copy = bb.duplicate();
copy.limit(copy.position() + length);
@@ -58,7 +58,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
}
// changes bb position
- protected static ByteBuffer getWithShortLength(ByteBuffer bb)
+ public static ByteBuffer getWithShortLength(ByteBuffer bb)
{
int length = getShortLength(bb);
return getBytes(bb, length);
@@ -169,7 +169,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
* Escapes all occurences of the ':' character from the input, replacing them by "\:".
* Furthermore, if the last character is '\' or '!', a '!' is appended.
*/
- static String escape(String input)
+ public static String escape(String input)
{
if (input.isEmpty())
return input;
@@ -234,7 +234,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
byte b = bb.get();
if (b != 0)
{
- sb.append(":!");
+ sb.append(b < 0 ? ":_" : ":!");
break;
}
++i;
@@ -249,6 +249,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
List<ParsedComparator> comparators = new ArrayList<ParsedComparator>(parts.size());
int totalLength = 0, i = 0;
boolean lastByteIsOne = false;
+ boolean lastByteIsMinusOne = false;
for (String part : parts)
{
@@ -257,6 +258,11 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
lastByteIsOne = true;
break;
}
+ else if (part.equals("_"))
+ {
+ lastByteIsMinusOne = true;
+ break;
+ }
ParsedComparator p = parseComparator(i, part);
AbstractType<?> type = p.getAbstractType();
@@ -281,6 +287,8 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
}
if (lastByteIsOne)
bb.put(bb.limit() - 1, (byte)1);
+ else if (lastByteIsMinusOne)
+ bb.put(bb.limit() - 1, (byte)-1);
bb.rewind();
return bb;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index ffba918..cefa465 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -25,17 +25,9 @@ import java.util.List;
import java.util.Map;
import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
/**
* Specifies a Comparator for a specific type of ByteBuffer.
@@ -47,78 +39,10 @@ import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
*/
public abstract class AbstractType<T> implements Comparator<ByteBuffer>
{
- public final Comparator<IndexInfo> indexComparator;
- public final Comparator<IndexInfo> indexReverseComparator;
- public final Comparator<Column> columnComparator;
- public final Comparator<Column> columnReverseComparator;
- public final Comparator<OnDiskAtom> onDiskAtomComparator;
public final Comparator<ByteBuffer> reverseComparator;
protected AbstractType()
{
- indexComparator = new Comparator<IndexInfo>()
- {
- public int compare(IndexInfo o1, IndexInfo o2)
- {
- return AbstractType.this.compare(o1.lastName, o2.lastName);
- }
- };
- indexReverseComparator = new Comparator<IndexInfo>()
- {
- public int compare(IndexInfo o1, IndexInfo o2)
- {
- return AbstractType.this.compare(o1.firstName, o2.firstName);
- }
- };
- columnComparator = new Comparator<Column>()
- {
- public int compare(Column c1, Column c2)
- {
- return AbstractType.this.compare(c1.name(), c2.name());
- }
- };
- columnReverseComparator = new Comparator<Column>()
- {
- public int compare(Column c1, Column c2)
- {
- return AbstractType.this.compare(c2.name(), c1.name());
- }
- };
- onDiskAtomComparator = new Comparator<OnDiskAtom>()
- {
- public int compare(OnDiskAtom c1, OnDiskAtom c2)
- {
- int comp = AbstractType.this.compare(c1.name(), c2.name());
- if (comp != 0)
- return comp;
-
- if (c1 instanceof RangeTombstone)
- {
- if (c2 instanceof RangeTombstone)
- {
- RangeTombstone t1 = (RangeTombstone)c1;
- RangeTombstone t2 = (RangeTombstone)c2;
- int comp2 = AbstractType.this.compare(t1.max, t2.max);
- if (comp2 == 0)
- return t1.data.compareTo(t2.data);
- else
- return comp2;
- }
- else
- {
- return -1;
- }
- }
- else if (c2 instanceof RangeTombstone)
- {
- return 1;
- }
- else
- {
- return 0;
- }
- }
- };
reverseComparator = new Comparator<ByteBuffer>()
{
public int compare(ByteBuffer o1, ByteBuffer o2)
@@ -197,17 +121,6 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
return builder.toString();
}
- /* convenience method */
- public String getColumnsString(Iterable<Column> columns)
- {
- StringBuilder builder = new StringBuilder();
- for (Column column : columns)
- {
- builder.append(column.getString(this)).append(",");
- }
- return builder.toString();
- }
-
public boolean isCommutative()
{
return false;
@@ -312,25 +225,4 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
{
return getClass().getName();
}
-
- protected boolean intersects(ByteBuffer minColName, ByteBuffer maxColName, ByteBuffer sliceStart, ByteBuffer sliceEnd)
- {
- return (sliceStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || compare(maxColName, sliceStart) >= 0)
- && (sliceEnd.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || compare(sliceEnd, minColName) >= 0);
- }
-
- public boolean intersects(List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, SliceQueryFilter filter)
- {
- assert minColumnNames.size() == 1;
-
- for (ColumnSlice slice : filter.slices)
- {
- ByteBuffer start = filter.isReversed() ? slice.finish : slice.start;
- ByteBuffer finish = filter.isReversed() ? slice.start : slice.finish;
-
- if (intersects(minColumnNames.get(0), maxColumnNames.get(0), start, finish))
- return true;
- }
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 07c86e0..0f3f564 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -58,7 +58,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
protected abstract void appendToStringBuilder(StringBuilder sb);
- public abstract ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns);
+ public abstract ByteBuffer serialize(List<Column> columns);
@Override
public String toString()
@@ -113,7 +113,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
return (ByteBuffer)result.flip();
}
- protected List<Pair<ByteBuffer, Column>> enforceLimit(List<Pair<ByteBuffer, Column>> columns)
+ protected List<Column> enforceLimit(List<Column> columns)
{
if (columns.size() <= MAX_ELEMENTS)
return columns;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index e7a5fee..36249bf 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -27,11 +27,8 @@ import java.util.Map;
import com.google.common.collect.ImmutableList;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Relation;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -234,30 +231,6 @@ public class CompositeType extends AbstractCompositeType
return true;
}
- @Override
- public boolean intersects(List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, SliceQueryFilter filter)
- {
- assert minColumnNames.size() == maxColumnNames.size();
- outer:
- for (ColumnSlice slice : filter.slices)
- {
- // This slices intersects if all component intersect. And we don't intersect
- // only if no slice intersects
- ByteBuffer[] start = split(filter.isReversed() ? slice.finish : slice.start);
- ByteBuffer[] finish = split(filter.isReversed() ? slice.start : slice.finish);
- for (int i = 0; i < minColumnNames.size(); i++)
- {
- AbstractType<?> t = types.get(i);
- ByteBuffer s = i < start.length ? start[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- ByteBuffer f = i < finish.length ? finish[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- if (!t.intersects(minColumnNames.get(i), maxColumnNames.get(i), s, f))
- continue outer;
- }
- return true;
- }
- return false;
- }
-
private static class StaticParsedComparator implements ParsedComparator
{
final AbstractType<?> type;
@@ -315,7 +288,7 @@ public class CompositeType extends AbstractCompositeType
return out;
}
- public static class Builder implements ColumnNameBuilder
+ public static class Builder
{
private final CompositeType composite;
@@ -376,13 +349,11 @@ public class CompositeType extends AbstractCompositeType
return this;
}
- @Override
public Builder add(ByteBuffer bb)
{
return add(bb, Relation.Type.EQ);
}
- @Override
public Builder add(ColumnIdentifier name)
{
return add(name.bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
index 6a77458..37cd59b 100644
--- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.CounterSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -47,7 +48,7 @@ public class CounterColumnType extends AbstractCommutativeType
/**
* create commutative column
*/
- public Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+ public Column createColumn(CellName name, ByteBuffer value, long timestamp)
{
return new CounterUpdateColumn(name, value, timestamp);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 808ba45..58ba6f1 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -82,16 +82,16 @@ public class ListType<T> extends CollectionType<List<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
+ public ByteBuffer serialize(List<Column> columns)
{
columns = enforceLimit(columns);
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
int size = 0;
- for (Pair<ByteBuffer, Column> p : columns)
+ for (Column c : columns)
{
- bbs.add(p.right.value());
- size += 2 + p.right.value().remaining();
+ bbs.add(c.value());
+ size += 2 + c.value().remaining();
}
return pack(bbs, columns.size(), size);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index fd96da7..17bd7a7 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -89,17 +89,19 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
/**
* Creates the same output than serialize, but from the internal representation.
*/
- public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
+ public ByteBuffer serialize(List<Column> columns)
{
columns = enforceLimit(columns);
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * columns.size());
int size = 0;
- for (Pair<ByteBuffer, Column> p : columns)
+ for (Column c : columns)
{
- bbs.add(p.left);
- bbs.add(p.right.value());
- size += 4 + p.left.remaining() + p.right.value().remaining();
+ ByteBuffer key = c.name().collectionElement();
+ ByteBuffer value = c.value();
+ bbs.add(key);
+ bbs.add(value);
+ size += 4 + key.remaining() + value.remaining();
}
return pack(bbs, columns.size(), size);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index c947d26..9e45f8f 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -82,16 +82,17 @@ public class SetType<T> extends CollectionType<Set<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
+ public ByteBuffer serialize(List<Column> columns)
{
columns = enforceLimit(columns);
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
int size = 0;
- for (Pair<ByteBuffer, Column> p : columns)
+ for (Column c : columns)
{
- bbs.add(p.left);
- size += 2 + p.left.remaining();
+ ByteBuffer key = c.name().collectionElement();
+ bbs.add(key);
+ size += 2 + key.remaining();
}
return pack(bbs, columns.size(), size);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 1b5a4e2..4327aa9 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.Column;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
@@ -44,15 +45,15 @@ import org.apache.hadoop.mapreduce.*;
*
* The default split size is 64k rows.
*/
-public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<CellName, Column>>
{
- public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+ public RecordReader<ByteBuffer, SortedMap<CellName, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
}
- public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+ public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
{
TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 98a294d..7bda3fb 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,8 +29,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.*;
@@ -44,8 +44,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
- implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<CellName, Column>>
+ implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
@@ -53,7 +53,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private ColumnFamilySplit split;
private RowIterator iter;
- private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
+ private Pair<ByteBuffer, SortedMap<CellName, Column>> currentRow;
private SlicePredicate predicate;
private boolean isEmptyPredicate;
private int totalRowCount; // total number of rows to fetch
@@ -92,7 +92,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return currentRow.left;
}
- public SortedMap<ByteBuffer, Column> getCurrentValue()
+ public SortedMap<CellName, Column> getCurrentValue()
{
return currentRow.right;
}
@@ -210,12 +210,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return split.getLocations()[0];
}
- private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
+ private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Column>>>
{
protected List<KeySlice> rows;
protected int totalRead = 0;
protected final boolean isSuper;
- protected final AbstractType<?> comparator;
+ protected final CellNameType comparator;
protected final AbstractType<?> subComparator;
protected final IPartitioner partitioner;
@@ -253,7 +253,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
cfDef.column_type = ByteBufferUtil.string(type);
}
- comparator = TypeParser.parse(cfDef.comparator_type);
+ comparator = CellNames.fromAbstractType(TypeParser.parse(cfDef.comparator_type), true);
subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type);
}
catch (ConfigurationException e)
@@ -297,21 +297,21 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
for (org.apache.cassandra.thrift.Column column : super_column.columns)
{
Column c = unthriftifySimple(column);
- columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
+ columns.add(c.withUpdatedName(comparator.makeCellName(super_column.name, c.name().toByteBuffer())));
}
return columns;
}
protected Column unthriftifySimple(org.apache.cassandra.thrift.Column column)
{
- return new Column(column.name, column.value, column.timestamp);
+ return new Column(comparator.cellFromByteBuffer(column.name), column.value, column.timestamp);
}
private Column unthriftifyCounter(CounterColumn column)
{
//CounterColumns read the counterID from the System keyspace, so need the StorageService running and access
//to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column.
- return new Column(column.name, ByteBufferUtil.bytes(column.value), 0);
+ return new Column(comparator.cellFromByteBuffer(column.name), ByteBufferUtil.bytes(column.value), 0);
}
private List<Column> unthriftifySuperCounter(CounterSuperColumn super_column)
@@ -320,7 +320,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
for (CounterColumn column : super_column.columns)
{
Column c = unthriftifyCounter(column);
- columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
+ columns.add(c.withUpdatedName(comparator.makeCellName(super_column.name, c.name().toByteBuffer())));
}
return columns;
}
@@ -401,7 +401,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<CellName, Column>> computeNext()
{
maybeInit();
if (rows == null)
@@ -409,7 +409,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
totalRead++;
KeySlice ks = rows.get(i++);
- SortedMap<ByteBuffer, Column> map = new TreeMap<ByteBuffer, Column>(comparator);
+ SortedMap<CellName, Column> map = new TreeMap<CellName, Column>(comparator);
for (ColumnOrSuperColumn cosc : ks.columns)
{
List<Column> columns = unthriftify(cosc);
@@ -422,8 +422,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private class WideRowIterator extends RowIterator
{
- private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
- private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ private PeekingIterator<Pair<ByteBuffer, SortedMap<CellName, Column>>> wideColumns;
+ private Composite lastColumn = Composites.EMPTY;
private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private void maybeInit()
@@ -452,7 +452,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
try
{
- rows = client.get_paged_slice(cfName, keyRange, lastColumn, consistencyLevel);
+ rows = client.get_paged_slice(cfName, keyRange, lastColumn.toByteBuffer(), consistencyLevel);
int n = 0;
for (KeySlice row : rows)
n += row.columns.size();
@@ -471,14 +471,14 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<CellName, Column>> computeNext()
{
maybeInit();
if (rows == null)
return endOfData();
- Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
- lastColumn = next.right.values().iterator().next().name().duplicate();
+ Pair<ByteBuffer, SortedMap<CellName, Column>> next = wideColumns.next();
+ lastColumn = next.right.values().iterator().next().name();
maybeIncreaseRowCounter(next);
return next;
@@ -489,7 +489,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
* Increases the row counter only if we really moved to the next row.
* @param next just fetched row slice
*/
- private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
+ private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<CellName, Column>> next)
{
ByteBuffer currentKey = next.left;
if (!currentKey.equals(lastCountedKey))
@@ -499,7 +499,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
+ private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Column>>>
{
private final Iterator<KeySlice> rows;
private Iterator<ColumnOrSuperColumn> columns;
@@ -520,14 +520,17 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
columns = currentRow.columns.iterator();
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<CellName, Column>> computeNext()
{
+ CellNameType cellType = subComparator == null
+ ? comparator
+ : new CompoundDenseCellNameType(Arrays.asList(comparator.asAbstractType(), subComparator));
while (true)
{
if (columns.hasNext())
{
ColumnOrSuperColumn cosc = columns.next();
- SortedMap<ByteBuffer, Column> map;
+ SortedMap<CellName, Column> map;
List<Column> columns = unthriftify(cosc);
if (columns.size() == 1)
{
@@ -536,11 +539,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
else
{
assert isSuper;
- map = new TreeMap<ByteBuffer, Column>(CompositeType.getInstance(comparator, subComparator));
+ map = new TreeMap<CellName, Column>(cellType);
for (Column column : columns)
map.put(column.name(), column);
}
- return Pair.<ByteBuffer, SortedMap<ByteBuffer, Column>>create(currentRow.key, map);
+ return Pair.<ByteBuffer, SortedMap<CellName, Column>>create(currentRow.key, map);
}
if (!rows.hasNext())
@@ -557,7 +560,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
// to the old. Thus, expect a small performance hit.
// And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
// and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
+ public boolean next(ByteBuffer key, SortedMap<CellName, Column> value) throws IOException
{
if (this.nextKeyValue())
{
@@ -578,9 +581,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return ByteBuffer.wrap(new byte[this.keyBufferSize]);
}
- public SortedMap<ByteBuffer, Column> createValue()
+ public SortedMap<CellName, Column> createValue()
{
- return new TreeMap<ByteBuffer, Column>();
+ return new TreeMap<CellName, Column>();
}
public long getPos() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index b0b7fe9..e5b8bb1 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -123,22 +123,21 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
CfDef cfDef = cfInfo.cfDef;
Tuple pair = TupleFactory.getInstance().newTuple(2);
+ ByteBuffer colName = col.name().toByteBuffer();
+
// name
if(comparator instanceof AbstractCompositeType)
- setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
+ setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName));
else
- setTupleValue(pair, 0, cassandraToObj(comparator, col.name()));
+ setTupleValue(pair, 0, cassandraToObj(comparator, col.name().toByteBuffer()));
// value
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- ByteBuffer colName;
if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
{
- ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+ ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(colName);
colName = names[names.length-1];
}
- else
- colName = col.name();
if (validators.get(colName) == null)
{
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 89c1944..7ce78de 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -24,6 +24,7 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -111,8 +112,8 @@ public class CqlStorage extends AbstractCassandraStorage
ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
if (columnValue != null)
{
- Column column = new Column(cdef.name, columnValue);
- AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ Column column = new Column(CellNames.simpleDense(cdef.name), columnValue);
+ AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 6018369..27f1c12 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.CounterId;
@@ -117,7 +116,7 @@ public abstract class AbstractSSTableSimpleWriter
if (currentSuperColumn == null)
throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
- column = column.withUpdatedName(CompositeType.build(currentSuperColumn, column.name()));
+ column = column.withUpdatedName(columnFamily.getComparator().makeCellName(currentSuperColumn, column.name().toByteBuffer()));
}
columnFamily.addColumn(column);
}
@@ -130,7 +129,7 @@ public abstract class AbstractSSTableSimpleWriter
*/
public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
{
- addColumn(new Column(name, value, timestamp));
+ addColumn(new Column(metadata.comparator.cellFromByteBuffer(name), value, timestamp));
}
/**
@@ -145,7 +144,7 @@ public abstract class AbstractSSTableSimpleWriter
*/
public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS)
{
- addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
+ addColumn(new ExpiringColumn(metadata.comparator.cellFromByteBuffer(name), value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
}
/**
@@ -155,7 +154,7 @@ public abstract class AbstractSSTableSimpleWriter
*/
public void addCounterColumn(ByteBuffer name, long value)
{
- addColumn(new CounterColumn(name, CounterContext.instance().create(counterid, 1L, value, false), System.currentTimeMillis()));
+ addColumn(new CounterColumn(metadata.comparator.cellFromByteBuffer(name), CounterContext.instance().create(counterid, 1L, value, false), System.currentTimeMillis()));
}
/**