You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/08 17:17:21 UTC
svn commit: r961795 [4/4] - in /cassandra/trunk: interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/...
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul 8 15:17:20 2010
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOError;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,34 +29,40 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Condition;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableTracker;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -106,6 +111,7 @@ public class ColumnFamilyStore implement
private final String table_;
public final String columnFamily_;
+ private final IPartitioner partitioner_;
private volatile int memtableSwitchCount = 0;
@@ -115,6 +121,8 @@ public class ColumnFamilyStore implement
/* active memtable associated with this ColumnFamilyStore. */
private Memtable memtable_;
+ private final Map<byte[], ColumnFamilyStore> indexedColumns_;
+
// TODO binarymemtable ops are not threadsafe (do they need to be?)
private AtomicReference<BinaryMemtable> binaryMemtable_;
@@ -128,13 +136,17 @@ public class ColumnFamilyStore implement
private long maxRowCompactedSize = 0L;
private long rowsCompactedTotalSize = 0L;
private long rowsCompactedCount = 0L;
-
- ColumnFamilyStore(String table, String columnFamilyName, int indexValue)
+ final CFMetaData metadata;
+
+ ColumnFamilyStore(String table, String columnFamilyName, IPartitioner partitioner, int generation, CFMetaData metadata)
{
+ assert metadata != null : "null metadata for " + table + ":" + columnFamilyName;
table_ = table;
columnFamily_ = columnFamilyName;
- fileIndexGenerator_.set(indexValue);
- memtable_ = new Memtable(this);
+ this.metadata = metadata;
+ this.partitioner_ = partitioner;
+ fileIndexGenerator_.set(generation);
+ memtable_ = new Memtable(this, partitioner_);
binaryMemtable_ = new AtomicReference<BinaryMemtable>(new BinaryMemtable(this));
if (logger_.isDebugEnabled())
@@ -197,7 +209,7 @@ public class ColumnFamilyStore implement
SSTableReader sstable;
try
{
- sstable = SSTableReader.open(filename);
+ sstable = SSTableReader.open(filename, partitioner_);
}
catch (IOException ex)
{
@@ -208,6 +220,39 @@ public class ColumnFamilyStore implement
}
ssTables_ = new SSTableTracker(table, columnFamilyName);
ssTables_.add(sstables);
+
+ indexedColumns_ = new TreeMap<byte[], ColumnFamilyStore>(BytesType.instance);
+ for (Map.Entry<byte[], ColumnDefinition> entry : metadata.column_metadata.entrySet())
+ {
+ byte[] column = entry.getKey();
+ ColumnDefinition info = entry.getValue();
+ if (info.index_type == null)
+ continue;
+
+ String indexedCfName = columnFamily_ + "." + (info.index_name == null ? FBUtilities.bytesToHex(column) : info.index_name);
+ IPartitioner rowPartitioner = StorageService.getPartitioner();
+ AbstractType columnComparator = (rowPartitioner instanceof OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
+ ? BytesType.instance
+ : new LocalByPartionerType(StorageService.getPartitioner());
+ CFMetaData indexedCfMetadata = new CFMetaData(table,
+ indexedCfName,
+ ColumnFamilyType.Standard,
+ ClockType.Timestamp,
+ columnComparator,
+ null,
+ new TimestampReconciler(),
+ "",
+ 0,
+ false,
+ 0,
+ 0,
+ Collections.<byte[], ColumnDefinition>emptyMap());
+ ColumnFamilyStore indexedCfs = ColumnFamilyStore.createColumnFamilyStore(table,
+ indexedCfName,
+ new LocalPartitioner(metadata.column_metadata.get(column).validator),
+ indexedCfMetadata);
+ indexedColumns_.put(column, indexedCfs);
+ }
}
public void addToCompactedRowStats(long rowsize)
@@ -240,6 +285,11 @@ public class ColumnFamilyStore implement
public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily)
{
+ return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), DatabaseDescriptor.getCFMetaData(table, columnFamily));
+ }
+
+ public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
+ {
/*
* Get all data files associated with old Memtables for this table.
* These files are named as follows <Table>-1.db, ..., <Table>-n.db. Get
@@ -269,22 +319,7 @@ public class ColumnFamilyStore implement
Collections.sort(generations);
int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
- ColumnFamilyStore cfs = new ColumnFamilyStore(table, columnFamily, value);
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilyStores,keyspace=" + table + ",columnfamily=" + columnFamily);
- if (mbs.isRegistered(mbeanName))
- mbs.unregisterMBean(mbeanName);
- mbs.registerMBean(cfs, mbeanName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- return cfs;
+ return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata);
}
private Set<File> files()
@@ -367,7 +402,7 @@ public class ColumnFamilyStore implement
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance().getContext() : null;
logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable at " + ctx);
final Condition condition = submitFlush(oldMemtable);
- memtable_ = new Memtable(this);
+ memtable_ = new Memtable(this, partitioner_);
// a second executor that makes sure the onMemtableFlushes get called in the right order,
// while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
return commitLogUpdater_.submit(new WrappedRunnable()
@@ -379,9 +414,8 @@ public class ColumnFamilyStore implement
{
// if we're not writing to the commit log, we are replaying the log, so marking
// the log header with "you can discard anything written before the context" is not valid
- final Integer cfId = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).cfId;
- logger_.debug("Discarding {}", cfId);
- CommitLog.instance().discardCompletedSegments(cfId, ctx);
+ logger_.debug("Discarding {}", metadata.cfId);
+ CommitLog.instance().discardCompletedSegments(metadata.cfId, ctx);
}
}
});
@@ -800,7 +834,7 @@ public class ColumnFamilyStore implement
{
// we are querying top-level columns, do a merging fetch with indexes.
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
- final ColumnFamily returnCF = ColumnFamily.create(table_, columnFamily_);
+ final ColumnFamily returnCF = ColumnFamily.create(metadata);
try
{
IColumnIterator iter;
@@ -947,7 +981,7 @@ public class ColumnFamilyStore implement
else
{
// wrapped range
- Token min = StorageService.getPartitioner().getMinimumToken();
+ Token min = partitioner_.getMinimumToken();
Range first = new Range(range.left, min);
completed = getRangeRows(rows, super_column, first, keyMax, columnFilter);
if (!completed && min.compareTo(range.right) < 0)
@@ -960,9 +994,42 @@ public class ColumnFamilyStore implement
return rows;
}
+ public List<Row> scan(IndexClause indexClause, IFilter dataFilter)
+ {
+ // TODO: use statistics to pick clause w/ highest selectivity
+ // TODO even later: allow merge join instead of just one index + loop
+ IndexExpression first = indexClause.expressions.get(0);
+ ColumnFamilyStore indexCFS = getIndexedColumnFamilyStore(first.column_name);
+ assert indexCFS != null;
+ DecoratedKey indexKey = indexCFS.partitioner_.decorateKey(first.value);
+ QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+ new QueryPath(indexCFS.getColumnFamilyName()),
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ null,
+ false,
+ indexClause.count);
+
+ List<Row> rows = new ArrayList<Row>();
+ ColumnFamily indexRow = indexCFS.getColumnFamily(indexFilter);
+ if (indexRow == null)
+ return rows;
+
+ for (byte[] dataKey : indexRow.getColumnNames())
+ {
+ DecoratedKey dk = partitioner_.decorateKey(dataKey);
+ ColumnFamily data = getColumnFamily(new QueryFilter(dk, new QueryPath(columnFamily_), dataFilter));
+ rows.add(new Row(dk, data));
+ }
+
+ // TODO apply remaining expressions
+
+ return rows;
+ }
+
public AbstractType getComparator()
{
- return DatabaseDescriptor.getComparator(table_, columnFamily_);
+ return metadata.comparator;
}
/**
@@ -1020,13 +1087,11 @@ public class ColumnFamilyStore implement
public void loadRowCache()
{
- CFMetaData metadata = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_);
- assert metadata != null;
if (metadata.preloadRowCache)
{
logger_.debug(String.format("Loading cache for keyspace/columnfamily %s/%s", table_, columnFamily_));
int ROWS = 4096;
- Token min = StorageService.getPartitioner().getMinimumToken();
+ Token min = partitioner_.getMinimumToken();
Token start = min;
long i = 0;
while (i < ssTables_.getRowCache().getCapacity())
@@ -1047,7 +1112,7 @@ public class ColumnFamilyStore implement
if (result.size() < ROWS)
break;
- start = DatabaseDescriptor.getPartitioner().getToken(result.get(ROWS - 1).key.key);
+ start = partitioner_.getToken(result.get(ROWS - 1).key.key);
}
logger_.info(String.format("Loaded %s rows into the %s cache", i, columnFamily_));
}
@@ -1228,4 +1293,33 @@ public class ColumnFamilyStore implement
return 0d;
return (double) falseCount / (trueCount + falseCount);
}
+
+ public Set<byte[]> getIndexedColumns()
+ {
+ return indexedColumns_.keySet();
+ }
+
+ public ColumnFamilyStore getIndexedColumnFamilyStore(byte[] column)
+ {
+ return indexedColumns_.get(column);
+ }
+
+ public ColumnFamily newIndexedColumnFamily(byte[] column)
+ {
+ return ColumnFamily.create(indexedColumns_.get(column).metadata);
+ }
+
+ public DecoratedKey getIndexKeyFor(byte[] name, byte[] value)
+ {
+ return indexedColumns_.get(name).partitioner_.decorateKey(value);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ColumnFamilyStore(" +
+ "table='" + table_ + '\'' +
+ ", columnFamily='" + columnFamily_ + '\'' +
+ ')';
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Thu Jul 8 15:17:20 2010
@@ -0,0 +1,85 @@
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+public class IndexScanCommand
+{
+ private static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
+
+ public final String keyspace;
+ public final String column_family;
+ public final IndexClause index_clause;
+ public final SlicePredicate predicate;
+
+ public IndexScanCommand(String keyspace, String column_family, IndexClause index_clause, SlicePredicate predicate)
+ {
+
+ this.keyspace = keyspace;
+ this.column_family = column_family;
+ this.index_clause = index_clause;
+ this.predicate = predicate;
+ }
+
+ public Message getMessage()
+ {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try
+ {
+ serializer.serialize(this, dob);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return new Message(FBUtilities.getLocalAddress(),
+ StageManager.READ_STAGE,
+ StorageService.Verb.INDEX_SCAN,
+ Arrays.copyOf(dob.getData(), dob.getLength()));
+ }
+
+ public static IndexScanCommand read(Message message) throws IOException
+ {
+ byte[] bytes = message.getMessageBody();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ return serializer.deserialize(new DataInputStream(bis));
+ }
+
+ private static class IndexScanCommandSerializer implements ICompactSerializer2<IndexScanCommand>
+ {
+ public void serialize(IndexScanCommand o, DataOutput out) throws IOException
+ {
+ out.writeUTF(o.keyspace);
+ out.writeUTF(o.column_family);
+ TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
+ FBUtilities.serialize(ser, o.index_clause, out);
+ FBUtilities.serialize(ser, o.predicate, out);
+ }
+
+ public IndexScanCommand deserialize(DataInput in) throws IOException
+ {
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+
+ TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
+ IndexClause indexClause = new IndexClause();
+ FBUtilities.deserialize(dser, indexClause, in);
+ SlicePredicate predicate = new SlicePredicate();
+ FBUtilities.deserialize(dser, predicate, in);
+
+ return new IndexScanCommand(keyspace, columnFamily, indexClause, predicate);
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jul 8 15:17:20 2010
@@ -55,13 +55,14 @@ public class Memtable implements Compara
private final long creationTime;
private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
- private final IPartitioner partitioner = StorageService.getPartitioner();
+ private final IPartitioner partitioner;
private final ColumnFamilyStore cfs;
- public Memtable(ColumnFamilyStore cfs)
+ public Memtable(ColumnFamilyStore cfs, IPartitioner partitioner)
{
this.cfs = cfs;
+ this.partitioner = partitioner;
creationTime = System.currentTimeMillis();
}
@@ -147,7 +148,7 @@ public class Memtable implements Compara
private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), StorageService.getPartitioner());
+ SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), partitioner);
DataOutputBuffer buffer = new DataOutputBuffer();
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Jul 8 15:17:20 2010
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.io.IOError;
+import java.lang.management.ManagementFactory;
import java.util.*;
import java.io.IOException;
import java.io.File;
@@ -28,17 +29,17 @@ import java.util.concurrent.Future;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Config;
+
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.SSTableDeletingReference;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
-import java.net.InetAddress;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.db.filter.*;
@@ -48,7 +49,7 @@ import org.cliffc.high_scale_lib.NonBloc
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Table
+public class Table
{
public static final String SYSTEM_TABLE = "system";
@@ -84,6 +85,7 @@ public class Table
// cache application CFs since Range queries ask for them a _lot_
private SortedSet<String> applicationColumnFamilies;
private final TimerTask flushTask;
+ private final Object[] indexLocks;
public static Table open(String table)
{
@@ -215,6 +217,9 @@ public class Table
private Table(String table)
{
name = table;
+ indexLocks = new Object[DatabaseDescriptor.getConcurrentWriters() * 8];
+ for (int i = 0; i < indexLocks.length; i++)
+ indexLocks[i] = new Object();
// create data directories.
for (String dataDir : DatabaseDescriptor.getAllDataFileLocations())
{
@@ -233,11 +238,25 @@ public class Table
throw new IOError(ex);
}
}
-
- for (CFMetaData cfm : DatabaseDescriptor.getTableDefinition(table).cfMetaData().values())
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ for (CFMetaData cfm : new ArrayList<CFMetaData>(DatabaseDescriptor.getTableDefinition(table).cfMetaData().values()))
{
- columnFamilyStores.put(cfm.cfId, ColumnFamilyStore.createColumnFamilyStore(table, cfm.cfName));
- }
+ ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(table, cfm.cfName);
+ columnFamilyStores.put(cfm.cfId, cfs);
+ try
+ {
+ ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilyStores,keyspace=" + table + ",columnfamily=" + cfm.cfName);
+ if (mbs.isRegistered(mbeanName))
+ mbs.unregisterMBean(mbeanName);
+ mbs.registerMBean(cfs, mbeanName);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
// check 10x as often as the lifetime, so we can exceed lifetime by 10% at most
int checkMs = DatabaseDescriptor.getMemtableLifetimeMS() / 10;
@@ -312,28 +331,76 @@ public class Table
try
{
if (writeCommitLog)
- {
CommitLog.instance().add(mutation, serializedMutation);
- }
DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
- Memtable memtableToFlush;
ColumnFamilyStore cfs = columnFamilyStores.get(columnFamily.id());
if (cfs == null)
{
logger.error("Attempting to mutate non-existant column family " + columnFamily.id());
+ continue;
+ }
+
+ ColumnFamily oldIndexedColumns;
+ SortedSet<byte[]> mutatedIndexedColumns = null;
+ for (byte[] column : cfs.getIndexedColumns())
+ {
+ if (columnFamily.getColumnNames().contains(column))
+ {
+ if (mutatedIndexedColumns == null)
+ mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
+ mutatedIndexedColumns.add(column);
+ }
+ }
+
+ if (mutatedIndexedColumns == null)
+ {
+ // just update the actual value, no extra synchronization
+ applyCF(cfs, key, columnFamily, memtablesToFlush);
}
else
{
- if ((memtableToFlush=cfs.apply(key, columnFamily)) != null)
- memtablesToFlush.put(cfs, memtableToFlush);
-
- ColumnFamily cachedRow = cfs.getRawCachedRow(key);
- if (cachedRow != null)
- cachedRow.addAll(columnFamily);
+ synchronized (indexLocks[Arrays.hashCode(mutation.key()) % indexLocks.length])
+ {
+ // read old indexed values
+ QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
+ oldIndexedColumns = cfs.getColumnFamily(filter);
+
+ // apply the mutation
+ applyCF(cfs, key, columnFamily, memtablesToFlush);
+
+ // add new index entries
+ for (byte[] columnName : mutatedIndexedColumns)
+ {
+ IColumn column = columnFamily.getColumn(columnName);
+ DecoratedKey valueKey = cfs.getIndexKeyFor(columnName, column.value());
+ ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
+ cf.addColumn(new Column(mutation.key(), ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+ applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf, memtablesToFlush);
+ }
+
+ // remove the old index entries
+ if (oldIndexedColumns != null)
+ {
+ int localDeletionTime = (int)(System.currentTimeMillis() / 1000);
+ for (Map.Entry<byte[], IColumn> entry : oldIndexedColumns.getColumnsMap().entrySet())
+ {
+ byte[] columnName = entry.getKey();
+ IColumn column = entry.getValue();
+ DecoratedKey valueKey = cfs.getIndexKeyFor(columnName, column.value());
+ ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
+ cf.deleteColumn(mutation.key(), localDeletionTime, column.clock());
+ applyCF(cfs, valueKey, cf, memtablesToFlush);
+ }
+ }
+ }
}
+
+ ColumnFamily cachedRow = cfs.getRawCachedRow(key);
+ if (cachedRow != null)
+ cachedRow.addAll(columnFamily);
}
}
finally
@@ -346,6 +413,13 @@ public class Table
entry.getKey().maybeSwitchMemtable(entry.getValue(), writeCommitLog);
}
+ private static void applyCF(ColumnFamilyStore cfs, DecoratedKey key, ColumnFamily columnFamily, HashMap<ColumnFamilyStore, Memtable> memtablesToFlush)
+ {
+ Memtable memtableToFlush = cfs.apply(key, columnFamily);
+ if (memtableToFlush != null)
+ memtablesToFlush.put(cfs, memtableToFlush);
+ }
+
public List<Future<?>> flush() throws IOException
{
List<Future<?>> futures = new ArrayList<Future<?>>();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Jul 8 15:17:20 2010
@@ -31,9 +31,13 @@ import org.apache.cassandra.utils.Reduci
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class QueryFilter
{
+ private static Logger logger = LoggerFactory.getLogger(QueryFilter.class);
+
public final DecoratedKey key;
public final QueryPath path;
private final IFilter filter;
Added: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java Thu Jul 8 15:17:20 2010
@@ -0,0 +1,26 @@
+package org.apache.cassandra.db.marshal;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
+ * Not intended for user-defined CFs, and will in fact error out if used with such. */
+public class LocalByPartionerType<T extends Token> extends AbstractType
+{
+ private final IPartitioner<T> partitioner;
+
+ public LocalByPartionerType(IPartitioner<T> partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ public String getString(byte[] bytes)
+ {
+ return null;
+ }
+
+ public int compare(byte[] o1, byte[] o2)
+ {
+ return partitioner.decorateKey(o1).compareTo(partitioner.decorateKey(o2));
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Thu Jul 8 15:17:20 2010
@@ -92,10 +92,11 @@ public class AddColumnFamily extends Mig
{
throw new IOException(ex);
}
+ Table.open(cfm.tableName); // make sure it's init-ed w/ the old definitions first, since we're going to call initCf on the new one manually
+ DatabaseDescriptor.setTableDefinition(ksm, newVersion);
if (!clientMode)
Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
- DatabaseDescriptor.setTableDefinition(ksm, newVersion);
-
+
if (!clientMode)
// force creation of a new commit log segment.
CommitLog.instance().forceNewSegment();
Added: cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/LocalPartitioner.java Thu Jul 8 15:17:20 2010
@@ -0,0 +1,61 @@
+package org.apache.cassandra.dht;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class LocalPartitioner implements IPartitioner<LocalToken>
+{
+ private final AbstractType comparator;
+
+ public LocalPartitioner(AbstractType comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ public DecoratedKey<LocalToken> convertFromDiskFormat(byte[] key)
+ {
+ return decorateKey(key);
+ }
+
+ public byte[] convertToDiskFormat(DecoratedKey<LocalToken> key)
+ {
+ return key.token.token;
+ }
+
+ public DecoratedKey<LocalToken> decorateKey(byte[] key)
+ {
+ return new DecoratedKey<LocalToken>(getToken(key), key);
+ }
+
+ public LocalToken midpoint(LocalToken left, LocalToken right)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public LocalToken getMinimumToken()
+ {
+ return new LocalToken(comparator, ArrayUtils.EMPTY_BYTE_ARRAY);
+ }
+
+ public LocalToken getToken(byte[] key)
+ {
+ return new LocalToken(comparator, key);
+ }
+
+ public LocalToken getRandomToken()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Token.TokenFactory getTokenFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean preservesOrder()
+ {
+ return true;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java?rev=961795&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/LocalToken.java Thu Jul 8 15:17:20 2010
@@ -0,0 +1,66 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.dht;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class LocalToken extends Token<byte[]>
+{
+ private final AbstractType comparator;
+
+ public LocalToken(AbstractType comparator, byte... token)
+ {
+ super(token);
+ this.comparator = comparator;
+ }
+
+ @Override
+ public String toString()
+ {
+ return comparator.getString(token);
+ }
+
+ @Override
+ public int compareTo(Token<byte[]> o)
+ {
+ return comparator.compare(token, o.token);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ return prime + Arrays.hashCode(token);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (!(obj instanceof LocalToken))
+ return false;
+ LocalToken other = (LocalToken) obj;
+ return Arrays.equals(token, other.token);
+ }
+
+}
Copied: cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java (from r961780, cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?p2=cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java&p1=cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java&r1=961780&r2=961795&rev=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java Thu Jul 8 15:17:20 2010
@@ -18,10 +18,7 @@
package org.apache.cassandra.service;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RangeSliceCommand;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -29,21 +26,17 @@ import org.apache.cassandra.net.Messagin
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RangeSliceVerbHandler implements IVerbHandler
+public class IndexScanVerbHandler implements IVerbHandler
{
-
- private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
public void doVerb(Message message)
{
try
{
- RangeSliceCommand command = RangeSliceCommand.read(message);
+ IndexScanCommand command = IndexScanCommand.read(message);
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- RangeSliceReply reply = new RangeSliceReply(cfs.getRangeSlice(command.super_column,
- command.range,
- command.max_keys,
- QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+ RangeSliceReply reply = new RangeSliceReply(cfs.scan(command.index_clause, QueryFilter.getFilter(command.predicate, cfs.getComparator())));
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Thu Jul 8 15:17:20 2010
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
public class RangeSliceVerbHandler implements IVerbHandler
{
- private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
public void doVerb(Message message)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Jul 8 15:17:20 2010
@@ -28,19 +28,19 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Multimap;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Multimap;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
@@ -54,7 +54,6 @@ import org.apache.cassandra.thrift.Consi
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
public class StorageProxy implements StorageProxyMBean
@@ -757,6 +756,28 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyMicros();
}
+ public static List<Row> scan(IndexScanCommand command, ConsistencyLevel consistency_level)
+ throws IOException, TimeoutException
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ Token startToken = command.index_clause.start_key == null ? p.getMinimumToken() : p.getToken(command.index_clause.start_key);
+ List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, startToken);
+ // TODO iterate through endpoints in token order like getRangeSlice
+ Message message = command.getMessage();
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, endpoints);
+ AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.keyspace);
+ QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, command.keyspace);
+ MessagingService.instance.sendRR(message, endpoints.get(0), handler);
+ try
+ {
+ return handler.get();
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
static class weakReadLocalCallable implements Callable<Object>
{
private ReadCommand command;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jul 8 15:17:20 2010
@@ -115,7 +115,7 @@ public class StorageService implements I
DEFINITIONS_UPDATE_RESPONSE,
TRUNCATE,
SCHEMA_CHECK,
- ;
+ INDEX_SCAN;
// remember to add new verbs at the end, since we serialize by ordinal
}
public static final Verb[] VERBS = Verb.values();
@@ -223,6 +223,7 @@ public class StorageService implements I
MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.READ, new ReadVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.INDEX_SCAN, new IndexScanVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler() );
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Thu Jul 8 15:17:20 2010
@@ -490,6 +490,12 @@ public class CassandraServer implements
String keyspace = keySpace.get();
checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
+ return getRangeSlicesInternal(keyspace, column_parent, range, predicate, consistency_level);
+ }
+
+ private List<KeySlice> getRangeSlicesInternal(String keyspace, ColumnParent column_parent, KeyRange range, SlicePredicate predicate, ConsistencyLevel consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException
+ {
ThriftValidation.validateColumnParent(keyspace, column_parent);
ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
ThriftValidation.validateKeyRange(range);
@@ -522,6 +528,11 @@ public class CassandraServer implements
throw new RuntimeException(e);
}
+ return thriftifyKeySlices(rows, column_parent, predicate);
+ }
+
+ private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate)
+ {
List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed;
for (Row row : rows)
@@ -533,6 +544,71 @@ public class CassandraServer implements
return keySlices;
}
+ public List<KeySlice> scan(ColumnParent column_parent, RowPredicate row_predicate, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("scan");
+
+ checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
+
+ if (row_predicate.keys != null)
+ {
+ Map<byte[], List<ColumnOrSuperColumn>> rowMap = multigetSliceInternal(keySpace.get(), row_predicate.keys, column_parent, column_predicate, consistency_level);
+ List<KeySlice> rows = new ArrayList<KeySlice>(rowMap.size());
+ for (Map.Entry<byte[], List<ColumnOrSuperColumn>> entry : rowMap.entrySet())
+ {
+ rows.add(new KeySlice(entry.getKey(), entry.getValue()));
+ }
+ return rows;
+ }
+
+ if (row_predicate.key_range != null)
+ {
+ return getRangeSlicesInternal(keySpace.get(), column_parent, row_predicate.key_range, column_predicate, consistency_level);
+ }
+
+ if (row_predicate.index_clause != null)
+ {
+ return scanIndexInternal(keySpace.get(), column_parent, row_predicate.index_clause, column_predicate, consistency_level);
+ }
+
+ throw new InvalidRequestException("row predicate must specify keys, key_range, or index_clause");
+ }
+
+ private List<KeySlice> scanIndexInternal(String keyspace, ColumnParent column_parent, IndexClause index_clause, SlicePredicate predicate, ConsistencyLevel consistency_level)
+ throws InvalidRequestException, TimedOutException
+ {
+ ThriftValidation.validateColumnParent(keyspace, column_parent);
+ ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
+ ThriftValidation.validateIndexClauses(keyspace, column_parent.column_family, index_clause);
+
+ List<Row> rows = null;
+ try
+ {
+ rows = StorageProxy.scan(new IndexScanCommand(keyspace, column_parent.column_family, index_clause, predicate), consistency_level);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
+ return thriftifyKeySlices(rows, column_parent, predicate);
+ }
+
+ public List<KeyCount> scan_count(ColumnParent column_parent, RowPredicate row_predicate, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ List<KeySlice> rows = scan(column_parent, row_predicate, column_predicate, consistency_level);
+ List<KeyCount> rowCounts = new ArrayList<KeyCount>(rows.size());
+ for (KeySlice slice : rows)
+ {
+ rowCounts.add(new KeyCount(slice.key, slice.columns.size()));
+ }
+ return rowCounts;
+ }
+
public Set<String> describe_keyspaces() throws TException
{
return DatabaseDescriptor.getTables();
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Thu Jul 8 15:17:20 2010
@@ -22,6 +22,7 @@ package org.apache.cassandra.thrift;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -370,4 +371,17 @@ public class ThriftValidation
throw new InvalidRequestException("maxRows must be positive");
}
}
+
+ public static void validateIndexClauses(String keyspace, String columnFamily, IndexClause index_clause)
+ throws InvalidRequestException
+ {
+ if (index_clause.expressions.isEmpty())
+ throw new InvalidRequestException("index clause list may not be empty");
+ Set<byte[]> indexedColumns = Table.open(keyspace).getColumnFamilyStore(columnFamily).getIndexedColumns();
+ for (IndexExpression expression : index_clause.expressions)
+ {
+ if (!indexedColumns.contains(expression.column_name))
+ throw new InvalidRequestException("Unable to scan unindexed column");
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jul 8 15:17:20 2010
@@ -511,4 +511,11 @@ public class FBUtilities
return scpurl.getFile();
}
+
+ public static long timestampMicros()
+ {
+ // we use microsecond resolution for compatibility with other client libraries, even though
+ // we can't actually get microsecond precision.
+ return System.currentTimeMillis() * 1000;
+ }
}
Modified: cassandra/trunk/test/system/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/__init__.py?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/system/__init__.py (original)
+++ cassandra/trunk/test/system/__init__.py Thu Jul 8 15:17:20 2010
@@ -151,7 +151,8 @@ class ThriftTester(BaseTester):
Cassandra.CfDef('Keyspace1', 'Super1', column_type='Super', subcomparator_type='LongType', row_cache_size=1000, key_cache_size=0),
Cassandra.CfDef('Keyspace1', 'Super2', column_type='Super', subcomparator_type='LongType'),
Cassandra.CfDef('Keyspace1', 'Super3', column_type='Super', subcomparator_type='LongType'),
- Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type')
+ Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type'),
+ Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
])
keyspace2 = Cassandra.KsDef('Keyspace2', 'org.apache.cassandra.locator.RackUnawareStrategy', 1,
@@ -174,10 +175,8 @@ class ThriftTester(BaseTester):
Cassandra.CfDef('Keyspace4', 'Super3', column_type='Super', subcomparator_type='BytesType'),
Cassandra.CfDef('Keyspace4', 'Super4', column_type='Super', subcomparator_type='TimeUUIDType')
])
- self.client.system_add_keyspace(keyspace1)
- self.client.system_add_keyspace(keyspace2)
- self.client.system_add_keyspace(keyspace3)
- self.client.system_add_keyspace(keyspace4)
+ for ks in [keyspace1, keyspace2, keyspace3, keyspace4]:
+ self.client.system_add_keyspace(ks)
class AvroTester(BaseTester):
client = None
Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Thu Jul 8 15:17:20 2010
@@ -1054,7 +1054,7 @@ class TestMutations(ThriftTester):
kspaces = client.describe_keyspaces()
assert len(kspaces) == 5, kspaces # ['system', 'Keyspace2', 'Keyspace3', 'Keyspace1', 'Keyspace4']
ks1 = client.describe_keyspace("Keyspace1")
- assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4'])
+ assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4', 'Indexed1'])
sysks = client.describe_keyspace("system")
def test_describe(self):
@@ -1205,7 +1205,26 @@ class TestMutations(ThriftTester):
def req():
client.describe_ring('system')
_expect_exception(req, InvalidRequestException)
+
+ def test_index_scan(self):
+ _set_keyspace('Keyspace1')
+ client.insert('key1', ColumnParent('Indexed1'), Column('birthdate', _i64(1), Clock(0)), ConsistencyLevel.ONE)
+ client.insert('key2', ColumnParent('Indexed1'), Column('birthdate', _i64(2), Clock(0)), ConsistencyLevel.ONE)
+ client.insert('key3', ColumnParent('Indexed1'), Column('b', _i64(3), Clock(0)), ConsistencyLevel.ONE)
+
+ cp = ColumnParent('Indexed1')
+ expr = IndexExpression('birthdate', IndexOperator.EQ, _i64(1))
+ rp = RowPredicate(index_clause=IndexClause([expr]))
+ sp = SlicePredicate(slice_range=SliceRange('', ''))
+ result = client.scan(cp, rp, sp, ConsistencyLevel.ONE)
+ assert len(result) == 1, result
+ assert result[0].key == 'key1'
+ assert len(result[0].columns) == 1, result[0].columns
+
+ expr.column_name = 'b'
+ _expect_exception(lambda: client.scan(cp, rp, sp, ConsistencyLevel.ONE), InvalidRequestException)
+
class TestTruncate(ThriftTester):
def test_truncate(self):
_set_keyspace('Keyspace1')
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Thu Jul 8 15:17:20 2010
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,10 @@ import org.apache.cassandra.CleanupHelpe
import org.apache.cassandra.Util;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import java.net.InetAddress;
@@ -159,6 +164,21 @@ public class ColumnFamilyStoreTest exten
assert Arrays.equals(result.get(0).key.key, "key2".getBytes());
}
+ @Test
+ public void testIndexScan() throws IOException
+ {
+ RowMutation rm;
+ rm = new RowMutation("Keyspace1", "k".getBytes());
+ rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
+ rm.apply();
+
+ IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L));
+ IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+ IFilter filter = new IdentityQueryFilter();
+ List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").scan(clause, filter);
+ assert rows != null && rows.size() > 0;
+ }
+
private ColumnFamilyStore insertKey1Key2() throws IOException, ExecutionException, InterruptedException
{
List<RowMutation> rms = new LinkedList<RowMutation>();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=961795&r1=961794&r2=961795&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Thu Jul 8 15:17:20 2010
@@ -195,17 +195,18 @@ public class DefsTest extends CleanupHel
// any write should fail.
rm = new RowMutation(ks.name, dk.key);
+ boolean success = true;
try
{
rm.add(new QueryPath("Standard1", null, "col0".getBytes()), "value0".getBytes(), new TimestampClock(1L));
rm.apply();
- assert false : "This mutation should have failed since the CF no longer exists.";
}
catch (Throwable th)
{
- assert th instanceof IllegalArgumentException;
+ success = false;
}
-
+ assert !success : "This mutation should have failed since the CF no longer exists.";
+
// verify that the files are gone.
assert DefsTable.getFiles(cfm.tableName, cfm.cfName).size() == 0;
}
@@ -308,17 +309,18 @@ public class DefsTest extends CleanupHel
// write should fail.
rm = new RowMutation(ks.name, dk.key);
+ boolean success = true;
try
{
rm.add(new QueryPath("Standard1", null, "col0".getBytes()), "value0".getBytes(), new TimestampClock(1L));
rm.apply();
- throw new AssertionError("This mutation should have failed since the CF no longer exists.");
}
catch (Throwable th)
{
- assert th instanceof IllegalArgumentException;
+ success = false;
}
-
+ assert !success : "This mutation should have failed since the CF no longer exists.";
+
// reads should fail too.
try
{
@@ -375,16 +377,17 @@ public class DefsTest extends CleanupHel
// write on old should fail.
rm = new RowMutation(oldKs.name, "any key will do".getBytes());
+ boolean success = true;
try
{
rm.add(new QueryPath(cfName, null, "col0".getBytes()), "value0".getBytes(), new TimestampClock(1L));
rm.apply();
- throw new AssertionError("This mutation should have failed since the CF/Table no longer exists.");
}
catch (Throwable th)
{
- assert th instanceof IllegalArgumentException;
+ success = false;
}
+ assert !success : "This mutation should have failed since the CF/Table no longer exists.";
// write on new should work.
rm = new RowMutation(newKsName, dk.key);