You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 13:41:16 UTC
[5/9] Replace supercolumns internally by composites
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 4bb14fc..2f96ef8 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -72,7 +72,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
protected abstract void init(ColumnDefinition columnDef);
- protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column);
+ protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column);
protected abstract AbstractType getExpressionComparator();
@@ -86,7 +86,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
}
- public void delete(ByteBuffer rowKey, IColumn column)
+ public void delete(ByteBuffer rowKey, Column column)
{
if (column.isMarkedForDelete())
return;
@@ -100,7 +100,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
}
- public void insert(ByteBuffer rowKey, IColumn column)
+ public void insert(ByteBuffer rowKey, Column column)
{
DecoratedKey valueKey = getIndexKeyFor(column.value());
ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
@@ -120,7 +120,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater);
}
- public void update(ByteBuffer rowKey, IColumn col)
+ public void update(ByteBuffer rowKey, Column col)
{
insert(rowKey, col);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index d202578..991581d 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -19,8 +19,7 @@ package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -35,7 +34,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void delete(ByteBuffer rowKey, IColumn col);
+ public abstract void delete(ByteBuffer rowKey, Column col);
/**
* insert a column to the index
@@ -43,7 +42,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void insert(ByteBuffer rowKey, IColumn col);
+ public abstract void insert(ByteBuffer rowKey, Column col);
/**
* update a column from the index
@@ -51,7 +50,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void update(ByteBuffer rowKey, IColumn col);
+ public abstract void update(ByteBuffer rowKey, Column col);
public String getNameForSystemTable(ByteBuffer column)
{
@@ -61,6 +60,6 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
@Override
public boolean validate(Column column)
{
- return column.value.remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
+ return column.value().remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index 0200667..1dd2de7 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index c7af4f1..f78061c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.SystemTable;
@@ -41,7 +42,6 @@ import org.apache.cassandra.db.marshal.LocalByPartionerType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.service.StorageService;
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 1be04dd..0de43b3 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.IndexExpression;
/**
@@ -49,11 +48,11 @@ public class SecondaryIndexManager
public static final Updater nullUpdater = new Updater()
{
- public void insert(IColumn column) { }
+ public void insert(Column column) { }
- public void update(IColumn oldColumn, IColumn column) { }
+ public void update(Column oldColumn, Column column) { }
- public void remove(IColumn current) { }
+ public void remove(Column current) { }
};
/**
@@ -172,7 +171,7 @@ public class SecondaryIndexManager
return null;
}
- public boolean indexes(IColumn column)
+ public boolean indexes(Column column)
{
return indexes(column.name());
}
@@ -434,7 +433,7 @@ public class SecondaryIndexManager
}
else
{
- for (IColumn column : cf)
+ for (Column column : cf)
{
if (index.indexes(column.name()))
((PerColumnSecondaryIndex) index).insert(key, column);
@@ -449,12 +448,12 @@ public class SecondaryIndexManager
* @param key the row key
* @param indexedColumnsInRow all column names in row
*/
- public void deleteFromIndexes(DecoratedKey key, List<IColumn> indexedColumnsInRow)
+ public void deleteFromIndexes(DecoratedKey key, List<Column> indexedColumnsInRow)
{
// Update entire row only once per row level index
Set<Class<? extends SecondaryIndex>> cleanedRowLevelIndexes = null;
- for (IColumn column : indexedColumnsInRow)
+ for (Column column : indexedColumnsInRow)
{
SecondaryIndex index = indexesByColumn.get(column.name());
if (index == null)
@@ -574,17 +573,17 @@ public class SecondaryIndexManager
public boolean validate(Column column)
{
- SecondaryIndex index = getIndexForColumn(column.name);
+ SecondaryIndex index = getIndexForColumn(column.name());
return index != null ? index.validate(column) : true;
}
public static interface Updater
{
- public void insert(IColumn column);
+ public void insert(Column column);
- public void update(IColumn oldColumn, IColumn column);
+ public void update(Column oldColumn, Column column);
- public void remove(IColumn current);
+ public void remove(Column current);
}
private class PerColumnIndexUpdater implements Updater
@@ -596,7 +595,7 @@ public class SecondaryIndexManager
this.key = key;
}
- public void insert(IColumn column)
+ public void insert(Column column)
{
if (column.isMarkedForDelete())
return;
@@ -608,7 +607,7 @@ public class SecondaryIndexManager
((PerColumnSecondaryIndex) index).insert(key.key, column);
}
- public void update(IColumn oldColumn, IColumn column)
+ public void update(Column oldColumn, Column column)
{
if (column.isMarkedForDelete())
return;
@@ -621,7 +620,7 @@ public class SecondaryIndexManager
((PerColumnSecondaryIndex) index).insert(key.key, column);
}
- public void remove(IColumn column)
+ public void remove(Column column)
{
if (column.isMarkedForDelete())
return;
@@ -644,7 +643,7 @@ public class SecondaryIndexManager
this.key = key;
}
- public void insert(IColumn column)
+ public void insert(Column column)
{
if (column.isMarkedForDelete())
return;
@@ -664,7 +663,7 @@ public class SecondaryIndexManager
}
}
- public void update(IColumn oldColumn, IColumn column)
+ public void update(Column oldColumn, Column column)
{
if (column.isMarkedForDelete())
return;
@@ -685,7 +684,7 @@ public class SecondaryIndexManager
}
}
- public void remove(IColumn column)
+ public void remove(Column column)
{
if (column.isMarkedForDelete())
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index a8c1dde..3085f48 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -47,7 +47,7 @@ public abstract class SecondaryIndexSearcher
protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
{
- IColumn liveColumn = liveData.getColumn(indexedColumnName);
+ Column liveColumn = liveData.getColumn(indexedColumnName);
if (liveColumn == null || liveColumn.isMarkedForDelete())
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index f1aa4aa..3d10ec5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -56,7 +56,7 @@ public class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
indexComparator = (CompositeType)SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
}
- protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
+ protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
{
CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
ByteBuffer[] components = baseComparator.split(column.name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 29333b1..1f201db 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -160,8 +160,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return new ColumnFamilyStore.AbstractScanIterator()
{
private ByteBuffer lastSeenPrefix = startPrefix;
- private Deque<IColumn> indexColumns;
- private final QueryPath path = new QueryPath(baseCfs.name);
+ private Deque<Column> indexColumns;
private int columnsRead = Integer.MAX_VALUE;
private final int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
@@ -218,7 +217,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
- new QueryPath(index.getIndexCfs().name),
+ index.getIndexCfs().name,
lastSeenPrefix,
endPrefix,
false,
@@ -227,10 +226,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (indexRow == null)
return makeReturn(currentKey, data);
- Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+ Collection<Column> sortedColumns = indexRow.getSortedColumns();
columnsRead = sortedColumns.size();
indexColumns = new ArrayDeque(sortedColumns);
- IColumn firstColumn = sortedColumns.iterator().next();
+ Column firstColumn = sortedColumns.iterator().next();
// Paging is racy, so it is possible the first column of a page is not the last seen one.
if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstColumn.name()))
@@ -249,7 +248,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
while (!indexColumns.isEmpty() && columnsCount <= limit)
{
- IColumn column = indexColumns.poll();
+ Column column = indexColumns.poll();
lastSeenPrefix = column.name();
if (column.isMarkedForDelete())
{
@@ -302,7 +301,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
continue;
SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE, prefixSize);
- ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter));
+ ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter));
if (newData != null)
{
ByteBuffer baseColumnName = builder.copy().add(primary.column_name).build();
@@ -311,7 +310,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (isIndexValueStale(newData, baseColumnName, indexedValue))
{
// delete the index entry w/ its own timestamp
- IColumn dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp());
+ Column dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp());
((PerColumnSecondaryIndex) index).delete(dk.key, dummyColumn);
continue;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 04c9946..8d065ab 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -38,7 +38,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
// Nothing specific
}
- protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
+ protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
{
return rowKey;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index cc7773c..62cdb78 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -108,8 +108,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
return new ColumnFamilyStore.AbstractScanIterator()
{
private ByteBuffer lastSeenKey = startKey;
- private Iterator<IColumn> indexColumns;
- private final QueryPath path = new QueryPath(baseCfs.name);
+ private Iterator<Column> indexColumns;
private int columnsRead = Integer.MAX_VALUE;
protected Row computeNext()
@@ -132,7 +131,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
- new QueryPath(index.getIndexCfs().name),
+ index.getIndexCfs().name,
lastSeenKey,
endKey,
false,
@@ -145,10 +144,10 @@ public class KeysSearcher extends SecondaryIndexSearcher
return endOfData();
}
- Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+ Collection<Column> sortedColumns = indexRow.getSortedColumns();
columnsRead = sortedColumns.size();
indexColumns = sortedColumns.iterator();
- IColumn firstColumn = sortedColumns.iterator().next();
+ Column firstColumn = sortedColumns.iterator().next();
// Paging is racy, so it is possible the first column of a page is not the last seen one.
if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
@@ -167,7 +166,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
while (indexColumns.hasNext())
{
- IColumn column = indexColumns.next();
+ Column column = indexColumns.next();
lastSeenKey = column.name();
if (column.isMarkedForDelete())
{
@@ -188,7 +187,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter()));
// While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
if (data == null)
data = ColumnFamily.create(baseCfs.metadata);
@@ -198,7 +197,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
if (extraFilter != null)
{
- ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter));
+ ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter));
if (cf != null)
data.addAll(cf, HeapAllocator.instance);
}
@@ -206,7 +205,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
if (isIndexValueStale(data, primary.column_name, indexKey.key))
{
// delete the index entry w/ its own timestamp
- IColumn dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
+ Column dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
((PerColumnSecondaryIndex)index).delete(dk.key, dummyColumn);
continue;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 5393c0c..b3d158d 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -23,7 +23,7 @@ import java.util.Comparator;
import java.util.Map;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.RangeTombstone;
import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
@@ -40,8 +40,8 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
{
public final Comparator<IndexInfo> indexComparator;
public final Comparator<IndexInfo> indexReverseComparator;
- public final Comparator<IColumn> columnComparator;
- public final Comparator<IColumn> columnReverseComparator;
+ public final Comparator<Column> columnComparator;
+ public final Comparator<Column> columnReverseComparator;
public final Comparator<OnDiskAtom> onDiskAtomComparator;
public final Comparator<ByteBuffer> reverseComparator;
@@ -61,16 +61,16 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
return AbstractType.this.compare(o1.firstName, o2.firstName);
}
};
- columnComparator = new Comparator<IColumn>()
+ columnComparator = new Comparator<Column>()
{
- public int compare(IColumn c1, IColumn c2)
+ public int compare(Column c1, Column c2)
{
return AbstractType.this.compare(c1.name(), c2.name());
}
};
- columnReverseComparator = new Comparator<IColumn>()
+ columnReverseComparator = new Comparator<Column>()
{
- public int compare(IColumn c1, IColumn c2)
+ public int compare(Column c1, Column c2)
{
return AbstractType.this.compare(c2.name(), c1.name());
}
@@ -159,10 +159,10 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
}
/* convenience method */
- public String getColumnsString(Collection<IColumn> columns)
+ public String getColumnsString(Collection<Column> columns)
{
StringBuilder builder = new StringBuilder();
- for (IColumn column : columns)
+ for (Column column : columns)
{
builder.append(column.getString(this)).append(",");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index a19912b..621e5c3 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -49,7 +49,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
protected abstract void appendToStringBuilder(StringBuilder sb);
- public abstract ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns);
+ public abstract ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns);
@Override
public String toString()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index fb80906..d843f5a 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -68,6 +68,11 @@ public class CompositeType extends AbstractCompositeType
return getInstance(parser.getTypeParameters());
}
+ public static CompositeType getInstance(AbstractType... types)
+ {
+ return getInstance(Arrays.<AbstractType<?>>asList(types));
+ }
+
public static synchronized CompositeType getInstance(List<AbstractType<?>> types)
{
assert types != null && !types.isEmpty();
@@ -126,6 +131,23 @@ public class CompositeType extends AbstractCompositeType
return build(serialized);
}
+ // Extract component idx from bb. Return null if there is not enough component.
+ public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
+ {
+ bb = bb.duplicate();
+ int i = 0;
+ while (bb.remaining() > 0)
+ {
+ ByteBuffer c = getWithShortLength(bb);
+ if (i == idx)
+ return c;
+
+ bb.get(); // skip end-of-component
+ ++i;
+ }
+ return null;
+ }
+
@Override
public boolean isCompatibleWith(AbstractType<?> previous)
{
@@ -190,7 +212,7 @@ public class CompositeType extends AbstractCompositeType
return new Builder(this);
}
- public ByteBuffer build(ByteBuffer... buffers)
+ public static ByteBuffer build(ByteBuffer... buffers)
{
int totalLength = 0;
for (ByteBuffer bb : buffers)
@@ -200,7 +222,7 @@ public class CompositeType extends AbstractCompositeType
for (ByteBuffer bb : buffers)
{
putShortLength(out, bb.remaining());
- out.put(bb);
+ out.put(bb.duplicate());
out.put((byte) 0);
}
out.flip();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 589e29e..76cf748 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -21,7 +21,7 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.Pair;
@@ -118,11 +118,11 @@ public class ListType<T> extends CollectionType<List<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns)
+ public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
{
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
int size = 0;
- for (Pair<ByteBuffer, IColumn> p : columns)
+ for (Pair<ByteBuffer, Column> p : columns)
{
bbs.add(p.right.value());
size += 2 + p.right.value().remaining();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 8364ea0..820abfa 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -21,7 +21,7 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.Pair;
@@ -135,11 +135,11 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
/**
* Creates the same output than decompose, but from the internal representation.
*/
- public ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns)
+ public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
{
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * columns.size());
int size = 0;
- for (Pair<ByteBuffer, IColumn> p : columns)
+ for (Pair<ByteBuffer, Column> p : columns)
{
bbs.add(p.left);
bbs.add(p.right.value());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index bbfb46f..31afd66 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -21,7 +21,7 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.Pair;
@@ -118,11 +118,11 @@ public class SetType<T> extends CollectionType<Set<T>>
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
}
- public ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns)
+ public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
{
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
int size = 0;
- for (Pair<ByteBuffer, IColumn> p : columns)
+ for (Pair<ByteBuffer, Column> p : columns)
{
bbs.add(p.left);
size += 2 + p.left.remaining();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 057d46a..a78a4ca 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -37,7 +37,7 @@ import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -65,8 +65,8 @@ import org.apache.thrift.TException;
*
* The default split size is 64k rows.
*/
-public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
- implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
+ implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
@@ -313,7 +313,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
return map;
}
- public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+ public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
}
@@ -332,7 +332,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
return oldInputSplits;
}
- public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+ public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
{
TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 07a5460..ea98cc9 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -30,11 +30,26 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.CounterSuperColumn;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -45,8 +60,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSocket;
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
- implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
+ implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
@@ -54,7 +69,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private ColumnFamilySplit split;
private RowIterator iter;
- private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
+ private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
private SlicePredicate predicate;
private boolean isEmptyPredicate;
private int totalRowCount; // total number of rows to fetch
@@ -93,7 +108,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return currentRow.left;
}
- public SortedMap<ByteBuffer, IColumn> getCurrentValue()
+ public SortedMap<ByteBuffer, Column> getCurrentValue()
{
return currentRow.right;
}
@@ -219,10 +234,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return split.getLocations()[0];
}
- private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
+ private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
{
protected List<KeySlice> rows;
protected int totalRead = 0;
+ protected final boolean isSuper;
protected final AbstractType<?> comparator;
protected final AbstractType<?> subComparator;
protected final IPartitioner partitioner;
@@ -242,6 +258,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
int idx = cfnames.indexOf(cfName);
CfDef cf_def = ks_def.cf_defs.get(idx);
+ isSuper = cf_def.column_type.equals("Super");
comparator = TypeParser.parse(cf_def.comparator_type);
subComparator = cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type);
}
@@ -267,46 +284,50 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return totalRead;
}
- protected IColumn unthriftify(ColumnOrSuperColumn cosc)
+ protected List<Column> unthriftify(ColumnOrSuperColumn cosc)
{
if (cosc.counter_column != null)
- return unthriftifyCounter(cosc.counter_column);
+ return Collections.<Column>singletonList(unthriftifyCounter(cosc.counter_column));
if (cosc.counter_super_column != null)
return unthriftifySuperCounter(cosc.counter_super_column);
if (cosc.super_column != null)
return unthriftifySuper(cosc.super_column);
assert cosc.column != null;
- return unthriftifySimple(cosc.column);
+ return Collections.<Column>singletonList(unthriftifySimple(cosc.column));
}
- private IColumn unthriftifySuper(SuperColumn super_column)
+ private List<Column> unthriftifySuper(SuperColumn super_column)
{
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
- for (Column column : super_column.columns)
+ List<Column> columns = new ArrayList<Column>(super_column.columns.size());
+ for (org.apache.cassandra.thrift.Column column : super_column.columns)
{
- sc.addColumn(unthriftifySimple(column));
+ Column c = unthriftifySimple(column);
+ columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
}
- return sc;
+ return columns;
}
- protected IColumn unthriftifySimple(Column column)
+ protected Column unthriftifySimple(org.apache.cassandra.thrift.Column column)
{
- return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+ return new Column(column.name, column.value, column.timestamp);
}
- private IColumn unthriftifyCounter(CounterColumn column)
+ private Column unthriftifyCounter(CounterColumn column)
{
//CounterColumns read the counterID from the System table, so need the StorageService running and access
//to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column.
- return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value), 0);
+ return new Column(column.name, ByteBufferUtil.bytes(column.value), 0);
}
- private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn)
+ private List<Column> unthriftifySuperCounter(CounterSuperColumn super_column)
{
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator);
- for (CounterColumn column : superColumn.columns)
- sc.addColumn(unthriftifyCounter(column));
- return sc;
+ List<Column> columns = new ArrayList<Column>(super_column.columns.size());
+ for (CounterColumn column : super_column.columns)
+ {
+ Column c = unthriftifyCounter(column);
+ columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
+ }
+ return columns;
}
}
@@ -385,7 +406,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
maybeInit();
if (rows == null)
@@ -393,11 +414,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
totalRead++;
KeySlice ks = rows.get(i++);
- SortedMap<ByteBuffer, IColumn> map = new TreeMap<ByteBuffer, IColumn>(comparator);
+ SortedMap<ByteBuffer, Column> map = new TreeMap<ByteBuffer, Column>(comparator);
for (ColumnOrSuperColumn cosc : ks.columns)
{
- IColumn column = unthriftify(cosc);
- map.put(column.name(), column);
+ List<Column> columns = unthriftify(cosc);
+ for (Column column : columns)
+ map.put(column.name(), column);
}
return Pair.create(ks.key, map);
}
@@ -405,7 +427,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
private class WideRowIterator extends RowIterator
{
- private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> wideColumns;
+ private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -454,13 +476,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
maybeInit();
if (rows == null)
return endOfData();
- Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next = wideColumns.next();
+ Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
lastColumn = next.right.values().iterator().next().name();
maybeIncreaseRowCounter(next);
@@ -472,7 +494,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
* Increases the row counter only if we really moved to the next row.
* @param next just fetched row slice
*/
- private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next)
+ private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
{
ByteBuffer currentKey = next.left;
if (!currentKey.equals(lastCountedKey))
@@ -482,7 +504,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
}
}
- private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
+ private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
{
private final Iterator<KeySlice> rows;
private Iterator<ColumnOrSuperColumn> columns;
@@ -503,16 +525,27 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
columns = currentRow.columns.iterator();
}
- protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+ protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
{
while (true)
{
if (columns.hasNext())
{
ColumnOrSuperColumn cosc = columns.next();
- IColumn column = unthriftify(cosc);
- ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(column.name(), column);
- return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(currentRow.key, map);
+ SortedMap<ByteBuffer, Column> map;
+ List<Column> columns = unthriftify(cosc);
+ if (columns.size() == 1)
+ {
+ map = ImmutableSortedMap.of(columns.get(0).name(), columns.get(0));
+ }
+ else
+ {
+ assert isSuper;
+ map = new TreeMap<ByteBuffer, Column>(CompositeType.getInstance(comparator, subComparator));
+ for (Column column : columns)
+ map.put(column.name(), column);
+ }
+ return Pair.<ByteBuffer, SortedMap<ByteBuffer, Column>>create(currentRow.key, map);
}
if (!rows.hasNext())
@@ -529,7 +562,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
// to the old. Thus, expect a small performance hit.
// And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
// and ColumnFamilyRecordReader don't support them, it should be fine for now.
- public boolean next(ByteBuffer key, SortedMap<ByteBuffer, IColumn> value) throws IOException
+ public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
{
if (this.nextKeyValue())
{
@@ -550,9 +583,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
return ByteBuffer.wrap(new byte[this.keyBufferSize]);
}
- public SortedMap<ByteBuffer, IColumn> createValue()
+ public SortedMap<ByteBuffer, Column> createValue()
{
- return new TreeMap<ByteBuffer, IColumn>();
+ return new TreeMap<ByteBuffer, Column>();
}
public long getPos() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 7c459b5..91174f3 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
import org.apache.cassandra.hadoop.*;
@@ -96,7 +95,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private String storeSignature;
private Configuration conf;
- private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader;
+ private RecordReader<ByteBuffer, Map<ByteBuffer, Column>> reader;
private RecordWriter<ByteBuffer, List<Mutation>> writer;
private String inputFormatClass;
private String outputFormatClass;
@@ -105,7 +104,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private boolean usePartitionFilter = false;
// wide row hacks
private ByteBuffer lastKey;
- private Map<ByteBuffer,IColumn> lastRow;
+ private Map<ByteBuffer,Column> lastRow;
private boolean hasNext = true;
public CassandraStorage()
@@ -147,7 +146,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
key = (ByteBuffer)reader.getCurrentKey();
tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
}
- for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -171,7 +170,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
// read too much, hold on to it for next time
lastKey = (ByteBuffer)reader.getCurrentKey();
- lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ lastRow = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
// but return what we have so far
tuple.append(bag);
return tuple;
@@ -182,28 +181,28 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
{
tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset()));
- for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
- lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ lastRow = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
return tuple;
}
tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
}
- SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ SortedMap<ByteBuffer,Column> row = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
- for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
- for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
@@ -228,7 +227,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
- Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
+ Map<ByteBuffer, Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@ -253,7 +252,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
added.put(cdef.name, true);
}
// now add all the other columns
- for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -307,7 +306,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return tuple;
}
- private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+ private Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
@@ -319,27 +318,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
setTupleValue(pair, 0, comparator.compose(col.name()));
}
- if (col instanceof Column)
- {
- // standard
- List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- if (validators.get(col.name()) == null)
- setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
- else
- setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
- return pair;
- }
- else
- {
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+ List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- pair.set(1, new DefaultDataBag(subcols));
- }
+ if (validators.get(col.name()) == null)
+ setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
+ else
+ setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
return pair;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/IColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/IColumnSerializer.java b/src/java/org/apache/cassandra/io/IColumnSerializer.java
deleted file mode 100644
index 5802140..0000000
--- a/src/java/org/apache/cassandra/io/IColumnSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.cassandra.db.IColumn;
-
-public interface IColumnSerializer extends ISerializer<IColumn>
-{
- /**
- * Flag affecting deserialization behavior.
- * - LOCAL: for deserialization of local data (Expired columns are
- * converted to tombstones (to gain disk space)).
- * - FROM_REMOTE: for deserialization of data received from remote hosts
- * (Expired columns are converted to tombstone and counters have
- * their delta cleared)
- * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
- * when we must ensure that deserializing and reserializing the
- * result yield the exact same bytes. Streaming uses this.
- */
- public static enum Flag
- {
- LOCAL, FROM_REMOTE, PRESERVE_SIZE;
- }
-
- public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f3d097f..63a6071 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
@@ -38,7 +39,7 @@ public abstract class AbstractSSTableSimpleWriter
protected final CFMetaData metadata;
protected DecoratedKey currentKey;
protected ColumnFamily columnFamily;
- protected SuperColumn currentSuperColumn;
+ protected ByteBuffer currentSuperColumn;
protected final CounterId counterid = CounterId.generate();
public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner)
@@ -102,20 +103,22 @@ public abstract class AbstractSSTableSimpleWriter
*/
public void newSuperColumn(ByteBuffer name)
{
- if (!columnFamily.isSuper())
+ if (!columnFamily.metadata().isSuper())
throw new IllegalStateException("Cannot add a super column to a standard column family");
- currentSuperColumn = new SuperColumn(name, metadata.subcolumnComparator);
- columnFamily.addColumn(currentSuperColumn);
+ currentSuperColumn = name;
}
- private void addColumn(IColumn column)
+ private void addColumn(Column column)
{
- if (columnFamily.isSuper() && currentSuperColumn == null)
- throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
+ if (columnFamily.metadata().isSuper())
+ {
+ if (currentSuperColumn == null)
+ throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
- IColumnContainer container = columnFamily.isSuper() ? currentSuperColumn : columnFamily;
- container.addColumn(column);
+ column = column.withUpdatedName(CompositeType.build(currentSuperColumn, column.name()));
+ }
+ columnFamily.addColumn(column);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index cf1907e..c96a336 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -47,7 +47,7 @@ public class Descriptor
public static class Version
{
// This needs to be at the begining for initialization sake
- private static final String current_version = "ib";
+ private static final String current_version = "ic";
public static final Version LEGACY = new Version("a"); // "pre-history"
// b (0.7.0): added version to sstable filenames
@@ -66,6 +66,10 @@ public class Descriptor
// records estimated histogram of deletion times in tombstones
// bloom filter (keys and columns) upgraded to Murmur3
// ib (1.2.1): tracks min client timestamp in metadata component
+ // ja (1.3.0): super columns are serialized as composites
+ // (note that there is no real format change, this is mostly a marker to know if we should expect super
+ // columns or not. We do need a major version bump however, because we should not allow streaming of
+ // super columns into this new format)
public static final Version CURRENT = new Version(current_version);
@@ -85,6 +89,7 @@ public class Descriptor
public final boolean hasPromotedIndexes;
public final FilterFactory.Type filterType;
public final boolean hasAncestors;
+ public final boolean hasSuperColumns;
public Version(String version)
{
@@ -108,6 +113,7 @@ public class Descriptor
filterType = FilterFactory.Type.MURMUR2;
else
filterType = FilterFactory.Type.MURMUR3;
+ hasSuperColumns = version.compareTo("ib") < 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index d9aabfb..d2839c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
+import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.BytesReadTracker;
@@ -38,13 +38,13 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
private final DataInput input;
private final long dataStart;
public final long dataSize;
- public final IColumnSerializer.Flag flag;
+ public final ColumnSerializer.Flag flag;
private final ColumnFamily columnFamily;
private final int columnCount;
private final long columnPosition;
- private final OnDiskAtom.Serializer atomSerializer;
+ private final Iterator<OnDiskAtom> atomIterator;
private final Descriptor.Version dataVersion;
private final BytesReadTracker inputWithTracker; // tracks bytes read
@@ -80,11 +80,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
*/
public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
{
- this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
+ this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
}
// Must only be used against current file format
- public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
+ public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, ColumnSerializer.Flag flag)
{
this(metadata, file, filename, key, dataStart, dataSize, false, null, flag);
}
@@ -99,7 +99,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
long dataSize,
boolean checkData,
SSTableReader sstable,
- IColumnSerializer.Flag flag)
+ ColumnSerializer.Flag flag)
{
assert !checkData || (sstable != null);
this.input = input;
@@ -157,8 +157,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
}
columnFamily = ColumnFamily.create(metadata);
columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
- atomSerializer = columnFamily.getOnDiskSerializer();
+
columnCount = inputWithTracker.readInt();
+ atomIterator = columnFamily.metadata().getOnDiskIterator(inputWithTracker, columnCount, dataVersion);
columnPosition = dataStart + inputWithTracker.getBytesRead();
}
catch (IOException e)
@@ -188,14 +189,17 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
{
try
{
- OnDiskAtom atom = atomSerializer.deserializeFromSSTable(inputWithTracker, flag, expireBefore, dataVersion);
+ OnDiskAtom atom = atomIterator.next();
if (validateColumns)
atom.validateFields(columnFamily.metadata());
return atom;
}
- catch (IOException e)
+ catch (IOError e)
{
- throw new CorruptSSTableException(e, filename);
+ if (e.getCause() instanceof IOException)
+ throw new CorruptSSTableException((IOException)e.getCause(), filename);
+ else
+ throw e;
}
catch (MarshalException me)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 771f18f..10af96d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.StorageService;
@@ -241,28 +240,14 @@ public class SSTableWriter extends SSTable
cf.delete(deletionInfo);
ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount, dataFile.stream);
- OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
+ OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
for (int i = 0; i < columnCount; i++)
{
// deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
// data size received, so we must reserialize the exact same data
- OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
+ OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
if (atom instanceof CounterColumn)
- {
atom = ((CounterColumn) atom).markDeltaToBeCleared();
- }
- else if (atom instanceof SuperColumn)
- {
- SuperColumn sc = (SuperColumn) atom;
- for (IColumn subColumn : sc.getSubColumns())
- {
- if (subColumn instanceof CounterColumn)
- {
- IColumn marked = ((CounterColumn) subColumn).markDeltaToBeCleared();
- sc.replace(subColumn, marked);
- }
- }
- }
int deletionTime = atom.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java b/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
deleted file mode 100644
index e01fd91..0000000
--- a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOError;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
-
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.io.IColumnSerializer;
-
-/**
- * Facade over a DataInput that contains IColumns in sorted order.
- * We use this because passing a SortedMap to the ConcurrentSkipListMap constructor is the only way
- * to invoke its private buildFromSorted method and avoid worst-case behavior of CSLM.put.
- */
-public class ColumnSortedMap implements SortedMap<ByteBuffer, IColumn>
-{
- private final ColumnSerializer serializer;
- private final DataInput dis;
- private final Comparator<ByteBuffer> comparator;
- private final int length;
- private final IColumnSerializer.Flag flag;
- private final int expireBefore;
-
- public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
- {
- this.comparator = comparator;
- this.serializer = serializer;
- this.dis = dis;
- this.length = length;
- this.flag = flag;
- this.expireBefore = expireBefore;
- }
-
- public int size()
- {
- return length;
- }
-
- public boolean isEmpty()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean containsKey(Object key)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean containsValue(Object value)
- {
- throw new UnsupportedOperationException();
- }
-
- public IColumn get(Object key)
- {
- throw new UnsupportedOperationException();
- }
-
- public IColumn put(ByteBuffer key, IColumn value)
- {
- throw new UnsupportedOperationException();
- }
-
- public IColumn remove(Object key)
- {
- throw new UnsupportedOperationException();
- }
-
- public void putAll(Map<? extends ByteBuffer, ? extends IColumn> m)
- {
- throw new UnsupportedOperationException();
- }
-
- public void clear()
- {
-
- }
-
- public Comparator<? super ByteBuffer> comparator()
- {
- return comparator;
- }
-
- public SortedMap<ByteBuffer, IColumn> subMap(ByteBuffer fromKey, ByteBuffer toKey)
- {
- throw new UnsupportedOperationException();
- }
-
- public SortedMap<ByteBuffer, IColumn> headMap(ByteBuffer toKey)
- {
- throw new UnsupportedOperationException();
- }
-
- public SortedMap<ByteBuffer, IColumn> tailMap(ByteBuffer fromKey)
- {
- throw new UnsupportedOperationException();
- }
-
- public ByteBuffer firstKey()
- {
- throw new UnsupportedOperationException();
- }
-
- public ByteBuffer lastKey()
- {
- throw new UnsupportedOperationException();
- }
-
- public Set<ByteBuffer> keySet()
- {
- throw new UnsupportedOperationException();
- }
-
- public Collection<IColumn> values()
- {
- throw new UnsupportedOperationException();
- }
-
- public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
- {
- return new ColumnSet(serializer, dis, length, flag, expireBefore);
- }
-}
-
-class ColumnSet implements Set<Map.Entry<ByteBuffer, IColumn>>
-{
- private final ColumnSerializer serializer;
- private final DataInput dis;
- private final int length;
- private final IColumnSerializer.Flag flag;
- private final int expireBefore;
-
- public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
- {
- this.serializer = serializer;
- this.dis = dis;
- this.length = length;
- this.flag = flag;
- this.expireBefore = expireBefore;
- }
-
- public int size()
- {
- return length;
- }
-
- public boolean isEmpty()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean contains(Object o)
- {
- throw new UnsupportedOperationException();
- }
-
- public Iterator<Entry<ByteBuffer, IColumn>> iterator()
- {
- return new ColumnIterator(serializer, dis, length, flag, expireBefore);
- }
-
- public Object[] toArray()
- {
- throw new UnsupportedOperationException();
- }
-
- public <T> T[] toArray(T[] a)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean add(Entry<ByteBuffer, IColumn> e)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean remove(Object o)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean containsAll(Collection<?> c)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean addAll(Collection<? extends Entry<ByteBuffer, IColumn>> c)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean retainAll(Collection<?> c)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean removeAll(Collection<?> c)
- {
- throw new UnsupportedOperationException();
- }
-
- public void clear()
- {
- }
-}
-
-class ColumnIterator implements Iterator<Map.Entry<ByteBuffer, IColumn>>
-{
- private final ColumnSerializer serializer;
- private final DataInput dis;
- private final int length;
- private final IColumnSerializer.Flag flag;
- private int count = 0;
- private final int expireBefore;
-
- public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
- {
- this.dis = dis;
- this.serializer = serializer;
- this.length = length;
- this.flag = flag;
- this.expireBefore = expireBefore;
- }
-
- private IColumn deserializeNext()
- {
- try
- {
- count++;
- return serializer.deserialize(dis, flag, expireBefore);
- }
- catch (IOException e)
- {
- throw new IOError(e); // can't throw more detailed error. can't rethrow IOException - Iterator interface next().
- }
- }
-
- public boolean hasNext()
- {
- return count < length;
- }
-
- public Entry<ByteBuffer, IColumn> next()
- {
- if (!hasNext())
- {
- throw new IllegalStateException("end of column iterator");
- }
-
- final IColumn column = deserializeNext();
- return new Entry<ByteBuffer, IColumn>()
- {
- public IColumn setValue(IColumn value)
- {
- throw new UnsupportedOperationException();
- }
-
- public IColumn getValue()
- {
- return column;
- }
-
- public ByteBuffer getKey()
- {
- return column.name();
- }
- };
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/util/IIterableColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/IIterableColumns.java b/src/java/org/apache/cassandra/io/util/IIterableColumns.java
index 030bef3..68c5645 100644
--- a/src/java/org/apache/cassandra/io/util/IIterableColumns.java
+++ b/src/java/org/apache/cassandra/io/util/IIterableColumns.java
@@ -17,10 +17,10 @@
*/
package org.apache.cassandra.io.util;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.AbstractType;
-public interface IIterableColumns extends Iterable<IColumn>
+public interface IIterableColumns extends Iterable<Column>
{
public int getEstimatedColumnCount();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 98495be..84438b9 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -73,7 +73,8 @@ public final class MessagingService implements MessagingServiceMBean
public static final int VERSION_11 = 4;
public static final int VERSION_117 = 5;
public static final int VERSION_12 = 6;
- public static final int current_version = VERSION_12;
+ public static final int VERSION_20 = 7;
+ public static final int current_version = VERSION_20;
/**
* we preface every message with this number so the recipient can validate the sender is sane
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 7e3aca8..b0aa693 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableReader.Operator;
@@ -340,7 +339,7 @@ public class CacheService implements CacheServiceMBean
public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
{
DecoratedKey key = cfs.partitioner.decorateKey(buffer);
- ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(cfs.name)), Integer.MIN_VALUE, true);
+ ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name), Integer.MIN_VALUE, true);
return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
}
});
@@ -351,7 +350,7 @@ public class CacheService implements CacheServiceMBean
for (ByteBuffer key : buffers)
{
DecoratedKey dk = cfs.partitioner.decorateKey(key);
- ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, new QueryPath(cfs.name)), Integer.MIN_VALUE, true);
+ ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name), Integer.MIN_VALUE, true);
rowCache.put(new RowCacheKey(cfs.metadata.cfId, dk), data);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 5bd9876..6d03009 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -43,7 +43,7 @@ public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand>
List<Row> rows = cfs.search(command.index_clause.expressions,
command.range,
command.index_clause.count,
- ThriftValidation.asIFilter(command.predicate, cfs.getComparator()));
+ ThriftValidation.asIFilter(command.predicate, cfs.metadata, null));
RangeSliceReply reply = new RangeSliceReply(rows);
Tracing.trace("Enqueuing response to {}", message.from);
MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 43fa8f7..943e6ae 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.*;
@@ -391,7 +390,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
Table defs = Table.open(Table.SYSTEM_KS);
ColumnFamilyStore cfStore = defs.getColumnFamilyStore(DefsTable.OLD_SCHEMA_CF);
- QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(DefsTable.OLD_SCHEMA_CF), LAST_MIGRATION_KEY);
+ QueryFilter filter = QueryFilter.getNamesFilter(dkey, DefsTable.OLD_SCHEMA_CF, LAST_MIGRATION_KEY);
ColumnFamily cf = cfStore.getColumnFamily(filter);
if (cf == null || cf.getColumnNames().size() == 0)
return null;