You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/05 15:43:23 UTC
svn commit: r1132406 [2/2] - in /cassandra/trunk: ./ contrib/
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ debian/ doc/cql/
drivers/java/src/org/apache/cassandra/cql/jdbc/
drivers/java/test/org/apache/cassandra/cql/ drivers/py/cql/ interface/t...
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Jun 5 13:43:22 2011
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeoutExcep
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
+import org.apache.cassandra.db.CounterColumn;
+import org.apache.cassandra.db.context.CounterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +47,8 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.migration.*;
-import org.apache.cassandra.db.migration.avro.CfDef;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
@@ -67,7 +67,7 @@ public class QueryProcessor
throws InvalidRequestException, TimedOutException, UnavailableException
{
QueryPath queryPath = new QueryPath(select.getColumnFamily());
- CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
List<ReadCommand> commands = new ArrayList<ReadCommand>();
// ...of a list of column names
@@ -161,7 +161,7 @@ public class QueryProcessor
}
AbstractBounds bounds = new Bounds(startToken, finishToken);
- CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
// XXX: Our use of Thrift structs internally makes me Sad. :(
SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -214,7 +214,7 @@ public class QueryProcessor
private static List<org.apache.cassandra.db.Row> getIndexedSlices(String keyspace, SelectStatement select)
throws TimedOutException, UnavailableException, InvalidRequestException
{
- CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
// XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -261,7 +261,7 @@ public class QueryProcessor
throws InvalidRequestException, UnavailableException, TimedOutException
{
String keyspace = clientState.getKeyspace();
- List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+ List<IMutation> rowMutations = new ArrayList<IMutation>();
List<String> cfamsSeen = new ArrayList<String>();
for (UpdateStatement update : updateStatements)
@@ -491,7 +491,7 @@ public class QueryProcessor
case SELECT:
SelectStatement select = (SelectStatement)statement.statement;
clientState.hasColumnFamilyAccess(select.getColumnFamily(), Permission.READ);
- metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ metadata = validateColumnFamily(keyspace, select.getColumnFamily());
validateSelect(keyspace, select);
List<org.apache.cassandra.db.Row> rows;
@@ -640,7 +640,7 @@ public class QueryProcessor
{
KsDef ksd = new KsDef(create.getName(),
create.getStrategyClass(),
- Collections.<org.apache.cassandra.thrift.CfDef>emptyList())
+ Collections.<CfDef>emptyList())
.setStrategy_options(create.getStrategyOptions());
ThriftValidation.validateKsDef(ksd);
applyMigrationOnStage(new AddKeyspace(KSMetaData.fromThrift(ksd)));
@@ -694,37 +694,58 @@ public class QueryProcessor
createIdx.getColumnFamily()));
if (oldCfm == null)
throw new InvalidRequestException("No such column family: " + createIdx.getColumnFamily());
-
+
+ boolean columnExists = false;
ByteBuffer columnName = createIdx.getColumnName().getByteBuffer();
- ColumnDefinition columnDef = oldCfm.getColumn_metadata().get(columnName);
-
- // Meta-data for this column already exists
- if (columnDef != null)
+ // mutating oldCfm directly would be bad, but mutating a Thrift copy is fine. This also
+ // sets us up to use validateCfDef to check for index name collisions.
+ CfDef cf_def = CFMetaData.convertToThrift(oldCfm);
+ for (ColumnDef cd : cf_def.column_metadata)
{
- // This column is already indexed, stop, drop, and roll.
- if (columnDef.getIndexType() != null)
- throw new InvalidRequestException("Index exists");
- // Add index attrs to the existing definition
- columnDef.setIndexName(createIdx.getIndexName());
- columnDef.setIndexType(org.apache.cassandra.thrift.IndexType.KEYS);
+ if (cd.name.equals(columnName))
+ {
+ if (cd.index_type != null)
+ throw new InvalidRequestException("Index already exists");
+ logger.debug("Updating column {} definition for index {}", oldCfm.comparator.getString(columnName), createIdx.getIndexName());
+ cd.setIndex_type(IndexType.KEYS);
+ cd.setIndex_name(createIdx.getIndexName());
+ columnExists = true;
+ break;
+ }
}
- // No meta-data, create a new column definition from scratch.
- else
+ if (!columnExists)
+ throw new InvalidRequestException("No column definition found for column " + oldCfm.comparator.getString(columnName));
+
+ CFMetaData.addDefaultIndexNames(cf_def);
+ ThriftValidation.validateCfDef(cf_def, oldCfm);
+ try
{
- columnDef = new ColumnDefinition(columnName,
- DatabaseDescriptor.getValueValidator(keyspace,
- createIdx.getColumnFamily(),
- columnName),
- org.apache.cassandra.thrift.IndexType.KEYS,
- createIdx.getIndexName());
+ applyMigrationOnStage(new UpdateColumnFamily(CFMetaData.convertToAvro(cf_def)));
+ }
+ catch (ConfigurationException e)
+ {
+ InvalidRequestException ex = new InvalidRequestException(e.toString());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex = new InvalidRequestException(e.toString());
+ ex.initCause(e);
+ throw ex;
}
- CfDef cfamilyDef = CFMetaData.convertToAvro(oldCfm);
- cfamilyDef.column_metadata.add(columnDef.deflate());
-
+ result.type = CqlResultType.VOID;
+ return result;
+
+ case DROP_INDEX:
+ DropIndexStatement dropIdx = (DropIndexStatement)statement.statement;
+ clientState.hasColumnFamilyListAccess(Permission.WRITE);
+ validateSchemaAgreement();
+
try
{
- applyMigrationOnStage(new UpdateColumnFamily(cfamilyDef));
+ applyMigrationOnStage(dropIdx.generateMutation(clientState.getKeyspace()));
}
catch (ConfigurationException e)
{
@@ -738,10 +759,10 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
-
+
result.type = CqlResultType.VOID;
return result;
-
+
case DROP_KEYSPACE:
String deleteKeyspace = (String)statement.statement;
clientState.hasKeyspaceListAccess(Permission.WRITE);
@@ -791,7 +812,35 @@ public class QueryProcessor
result.type = CqlResultType.VOID;
return result;
-
+
+ case ALTER_TABLE:
+ AlterTableStatement alterTable = (AlterTableStatement) statement.statement;
+
+ System.out.println(alterTable);
+
+ validateColumnFamily(keyspace, alterTable.columnFamily);
+ clientState.hasColumnFamilyAccess(alterTable.columnFamily, Permission.WRITE);
+ validateSchemaAgreement();
+
+ try
+ {
+ applyMigrationOnStage(new UpdateColumnFamily(alterTable.getCfDef(keyspace)));
+ }
+ catch (ConfigurationException e)
+ {
+ InvalidRequestException ex = new InvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex = new InvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+
+ result.type = CqlResultType.VOID;
+ return result;
}
return null; // We should never get here.
@@ -813,7 +862,7 @@ public class QueryProcessor
{
if (c.isMarkedForDelete())
continue;
- thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+ thriftColumns.add(thriftify(c));
}
}
else
@@ -840,16 +889,25 @@ public class QueryProcessor
{
throw new AssertionError(e);
}
+
IColumn c = row.cf.getColumn(name);
if (c == null || c.isMarkedForDelete())
thriftColumns.add(new Column().setName(name));
else
- thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+ thriftColumns.add(thriftify(c));
}
}
return thriftColumns;
}
+ private static Column thriftify(IColumn c)
+ {
+ ByteBuffer value = (c instanceof CounterColumn)
+ ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
+ : c.value();
+ return new Column(c.name()).setValue(value).setTimestamp(c.timestamp());
+ }
+
private static String getKeyString(CFMetaData metadata)
{
String keyString;
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Sun Jun 5 13:43:22 2011
@@ -24,8 +24,8 @@ import java.util.EnumSet;
public enum StatementType
{
- SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX,
- DROP_KEYSPACE, DROP_COLUMNFAMILY;
+ SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX, DROP_INDEX,
+ DROP_KEYSPACE, DROP_COLUMNFAMILY, ALTER_TABLE;
// Statement types that don't require a keyspace to be set.
private static final EnumSet<StatementType> topLevel = EnumSet.of(USE, CREATE_KEYSPACE, DROP_KEYSPACE);
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Sun Jun 5 13:43:22 2011
@@ -26,6 +26,8 @@ import java.util.*;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -35,6 +37,7 @@ import org.apache.cassandra.thrift.Inval
import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
+import static org.apache.cassandra.cql.Operation.OperationType;
import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
/**
@@ -43,7 +46,7 @@ import static org.apache.cassandra.thrif
*/
public class UpdateStatement extends AbstractModification
{
- private Map<Term, Term> columns;
+ private Map<Term, Operation> columns;
private List<Term> columnNames, columnValues;
private List<Term> keys;
@@ -57,7 +60,7 @@ public class UpdateStatement extends Abs
* @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
public UpdateStatement(String columnFamily,
- Map<Term, Term> columns,
+ Map<Term, Operation> columns,
List<Term> keys,
Attributes attrs)
{
@@ -113,17 +116,28 @@ public class UpdateStatement extends Abs
}
/** {@inheritDoc} */
- public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+ public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
{
return prepareRowMutations(keyspace, clientState, null);
}
/** {@inheritDoc} */
- public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
+ public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
{
List<String> cfamsSeen = new ArrayList<String>();
- CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+ boolean hasCommutativeOperation = false;
+
+ for (Map.Entry<Term, Operation> column : getColumns().entrySet())
+ {
+ if (!column.getValue().isUnary())
+ hasCommutativeOperation = true;
+
+ if (hasCommutativeOperation && column.getValue().isUnary())
+ throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+ }
+
+ CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, hasCommutativeOperation);
// Avoid unnecessary authorizations.
if (!(cfamsSeen.contains(columnFamily)))
@@ -132,7 +146,7 @@ public class UpdateStatement extends Abs
cfamsSeen.add(columnFamily);
}
- List<RowMutation> rowMutations = new LinkedList<RowMutation>();
+ List<IMutation> rowMutations = new LinkedList<IMutation>();
for (Term key: keys)
{
@@ -154,43 +168,61 @@ public class UpdateStatement extends Abs
*
* @throws InvalidRequestException on the wrong request
*/
- private RowMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
- {
- RowMutation rm = new RowMutation(keyspace, key);
-
- mutationForKey(rm, keyspace, metadata, timestamp);
-
- return rm;
- }
-
- /** {@inheritDoc} */
- public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp) throws InvalidRequestException
- {
- return mutationForKey(keyspace, key, validateColumnFamily(keyspace, columnFamily, false), timestamp);
- }
-
- /** {@inheritDoc} */
- public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException
- {
- mutationForKey(mutation, keyspace, validateColumnFamily(keyspace, columnFamily, false), timestamp);
- }
-
- private void mutationForKey(RowMutation mutation, String keyspace, CFMetaData metadata, Long timestamp) throws InvalidRequestException
+ private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
{
AbstractType<?> comparator = getComparator(keyspace);
- for (Map.Entry<Term, Term> column : getColumns().entrySet())
+ // if true we need to wrap RowMutation into CounterMutation
+ boolean hasCounterColumn = false;
+ RowMutation rm = new RowMutation(keyspace, key);
+
+ for (Map.Entry<Term, Operation> column : getColumns().entrySet())
{
ByteBuffer colName = column.getKey().getByteBuffer(comparator);
- ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
+ Operation op = column.getValue();
- validateColumn(metadata, colName, colValue);
+ if (op.isUnary())
+ {
+ if (hasCounterColumn)
+ throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+
+ ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName));
+
+ validateColumn(metadata, colName, colValue);
+ rm.add(new QueryPath(columnFamily, null, colName),
+ colValue,
+ (timestamp == null) ? getTimestamp() : timestamp,
+ getTimeToLive());
+ }
+ else
+ {
+ hasCounterColumn = true;
+
+ if (!column.getKey().getText().equals(op.a.getText()))
+ throw new InvalidRequestException("Only expressions like X = X + <long> are supported.");
+
+ long value;
+
+ try
+ {
+ value = Long.parseLong(op.b.getText());
+
+ if (op.type == OperationType.MINUS)
+ {
+ value *= -1;
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.",
+ op.b.getText()));
+ }
- mutation.add(new QueryPath(columnFamily, null, colName),
- colValue,
- (timestamp == null) ? getTimestamp() : timestamp,
- getTimeToLive());
+ rm.addCounter(new QueryPath(columnFamily, null, colName), value);
+ }
}
+
+ return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
}
public String getColumnFamily()
@@ -203,8 +235,8 @@ public class UpdateStatement extends Abs
{
return keys;
}
-
- public Map<Term, Term> getColumns() throws InvalidRequestException
+
+ public Map<Term, Operation> getColumns() throws InvalidRequestException
{
// Created from an UPDATE
if (columns != null)
@@ -218,11 +250,11 @@ public class UpdateStatement extends Abs
if (columnNames.size() < 1)
throw new InvalidRequestException("no columns specified for INSERT");
- columns = new HashMap<Term, Term>();
+ columns = new HashMap<Term, Operation>();
for (int i = 0; i < columnNames.size(); i++)
- columns.put(columnNames.get(i), columnValues.get(i));
-
+ columns.put(columnNames.get(i), new Operation(columnValues.get(i)));
+
return columns;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun Jun 5 13:43:22 2011
@@ -511,7 +511,7 @@ public class ColumnFamilyStore implement
if (cfm != null) // secondary indexes aren't stored in DD.
{
for (ColumnDefinition def : cfm.getColumn_metadata().values())
- scrubDataDirectories(table, cfm.indexName(def));
+ scrubDataDirectories(table, cfm.indexColumnFamilyName(def));
}
}
@@ -1802,8 +1802,12 @@ public class ColumnFamilyStore implement
*/
public Future<?> truncate() throws IOException
{
- // snapshot will also flush, but we want to truncate the most possible, and anything in a flush written
- // after truncateAt won't be truncated.
+ // We have two goals here:
+ // - truncate should delete everything written before truncate was invoked
+ // - but not delete anything that isn't part of the snapshot we create.
+ // We accomplish this by first flushing manually, then snapshotting, and
+ // recording the timestamp IN BETWEEN those actions. Any sstables created
+ // with this timestamp or greater time, will not be marked for delete.
try
{
forceBlockingFlush();
@@ -1812,33 +1816,20 @@ public class ColumnFamilyStore implement
{
throw new RuntimeException(e);
}
-
- final long truncatedAt = System.currentTimeMillis();
- snapshot(Table.getTimestampedSnapshotName("before-truncate"));
-
- Runnable runnable = new WrappedRunnable()
+ // sleep a little to make sure that our truncatedAt comes after any sstable
+ // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
+ try
{
- public void runMayThrow() throws InterruptedException, IOException
- {
- // putting markCompacted on the commitlogUpdater thread ensures it will run
- // after any compactions that were in progress when truncate was called, are finished
- for (ColumnFamilyStore cfs : concatWithIndexes())
- {
- List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
- for (SSTableReader sstable : cfs.getSSTables())
- {
- if (!sstable.newSince(truncatedAt))
- truncatedSSTables.add(sstable);
- }
- cfs.data.markCompacted(truncatedSSTables);
- }
-
- // Invalidate row cache
- invalidateRowCache();
- }
- };
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ long truncatedAt = System.currentTimeMillis();
+ snapshot(Table.getTimestampedSnapshotName("before-truncate"));
- return postFlushExecutor.submit(runnable);
+ return CompactionManager.instance.submitTruncate(this, truncatedAt);
}
// if this errors out, we are in a world of hurt.
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sun Jun 5 13:43:22 2011
@@ -47,6 +47,7 @@ import org.apache.cassandra.io.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MemoryInputStream;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
@@ -1149,6 +1150,30 @@ public class CompactionManager implement
return executor.submit(runnable);
}
+ public Future<?> submitTruncate(final ColumnFamilyStore main, final long truncatedAt)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws InterruptedException, IOException
+ {
+ for (ColumnFamilyStore cfs : main.concatWithIndexes())
+ {
+ List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (!sstable.newSince(truncatedAt))
+ truncatedSSTables.add(sstable);
+ }
+ cfs.markCompacted(truncatedSSTables);
+ }
+
+ main.invalidateRowCache();
+ }
+ };
+
+ return executor.submit(runnable);
+ }
+
private static int getDefaultGcBefore(ColumnFamilyStore cfs)
{
return cfs.isIndex()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Sun Jun 5 13:43:22 2011
@@ -22,20 +22,17 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.Iterables;
-import org.apache.commons.collections.set.UnmodifiableSet;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.config.DatabaseDescriptor;
-
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
@@ -451,18 +448,17 @@ public class DataTracker
public final Set<SSTableReader> sstables;
public final Set<SSTableReader> compacting;
- public View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting)
+ View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting)
{
this.memtable = memtable;
- this.memtablesPendingFlush = pendingFlush instanceof UnmodifiableSet ? pendingFlush : Collections.unmodifiableSet(pendingFlush);
- this.sstables = sstables instanceof UnmodifiableSet ? sstables : Collections.unmodifiableSet(sstables);
- this.compacting = compacting instanceof UnmodifiableSet ? compacting : Collections.unmodifiableSet(compacting);
+ this.memtablesPendingFlush = pendingFlush;
+ this.sstables = sstables;
+ this.compacting = compacting;
}
public View switchMemtable(Memtable newMemtable)
{
- Set<Memtable> newPending = new HashSet<Memtable>(memtablesPendingFlush);
- newPending.add(memtable);
+ Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
return new View(newMemtable, newPending, sstables, compacting);
}
@@ -473,32 +469,27 @@ public class DataTracker
public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
{
- Set<Memtable> newPendings = new HashSet<Memtable>(memtablesPendingFlush);
- Set<SSTableReader> newSSTables = new HashSet<SSTableReader>(sstables);
- newPendings.remove(flushedMemtable);
- newSSTables.add(newSSTable);
- return new View(memtable, newPendings, newSSTables, compacting);
+ Set<Memtable> newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable)));
+ Set<SSTableReader> newSSTables = ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
+ return new View(memtable, newPending, newSSTables, compacting);
}
public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
- Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
- Iterables.addAll(sstablesNew, replacements);
- sstablesNew.removeAll(oldSSTables);
- return new View(memtable, memtablesPendingFlush, sstablesNew, compacting);
+ Sets.SetView<SSTableReader> remaining = Sets.difference(sstables, ImmutableSet.copyOf(oldSSTables));
+ Set<SSTableReader> newSSTables = ImmutableSet.<SSTableReader>builder().addAll(remaining).addAll(replacements).build();
+ return new View(memtable, memtablesPendingFlush, newSSTables, compacting);
}
public View markCompacting(Collection<SSTableReader> tomark)
{
- Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
- compactingNew.addAll(tomark);
+ Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(sstables).addAll(tomark).build();
return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
}
public View unmarkCompacting(Collection<SSTableReader> tounmark)
{
- Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
- compactingNew.removeAll(tounmark);
+ Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sun Jun 5 13:43:22 2011
@@ -731,23 +731,6 @@ public class Table
return Iterables.transform(DatabaseDescriptor.getTables(), transformer);
}
- /**
- * Performs a synchronous truncate operation, effectively deleting all data
- * from the column family cfname
- * @param cfname
- * @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
- */
- public void truncate(String cfname) throws InterruptedException, ExecutionException, IOException
- {
- logger.debug("Truncating...");
- ColumnFamilyStore cfs = getColumnFamilyStore(cfname);
- // truncate, blocking
- cfs.truncate().get();
- logger.debug("Truncation done.");
- }
-
@Override
public String toString()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Sun Jun 5 13:43:22 2011
@@ -22,7 +22,6 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,8 @@ public class TruncateVerbHandler impleme
try
{
- Table.open(t.keyspace).truncate(t.columnFamily);
+ ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
+ cfs.truncate().get();
}
catch (Exception e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Sun Jun 5 13:43:22 2011
@@ -38,6 +38,8 @@ public class IncomingTcpConnection exten
{
private static Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
+ private static final int CHUNK_SIZE = 1024 * 1024;
+
private Socket socket;
public IncomingTcpConnection(Socket socket)
@@ -97,8 +99,13 @@ public class IncomingTcpConnection exten
{
int size = input.readInt();
byte[] contentBytes = new byte[size];
- input.readFully(contentBytes);
-
+ // readFully allocates a direct buffer the size of the chunk it is asked to read,
+ // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654
+ int remainder = size % CHUNK_SIZE;
+ for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
+ input.readFully(contentBytes, offset, CHUNK_SIZE);
+ input.readFully(contentBytes, size - remainder, remainder);
+
if (version > MessagingService.version_)
logger.info("Received connection from newer protocol version. Ignorning message.");
else
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Sun Jun 5 13:43:22 2011
@@ -799,7 +799,8 @@ public class CassandraServer implements
{
logger.debug("add_column_family");
state().hasColumnFamilyListAccess(Permission.WRITE);
- ThriftValidation.validateCfDef(cf_def);
+ CFMetaData.addDefaultIndexNames(cf_def);
+ ThriftValidation.validateCfDef(cf_def, null);
validateSchemaAgreement();
try
@@ -866,10 +867,11 @@ public class CassandraServer implements
try
{
Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size());
- for (CfDef cfDef : ks_def.cf_defs)
+ for (CfDef cf_def : ks_def.cf_defs)
{
- ThriftValidation.validateCfDef(cfDef);
- cfDefs.add(CFMetaData.fromThrift(cfDef));
+ CFMetaData.addDefaultIndexNames(cf_def);
+ ThriftValidation.validateCfDef(cf_def, null);
+ cfDefs.add(CFMetaData.fromThrift(cf_def));
}
ThriftValidation.validateKsDef(ks_def);
@@ -953,11 +955,10 @@ public class CassandraServer implements
{
logger.debug("update_column_family");
state().hasColumnFamilyListAccess(Permission.WRITE);
- ThriftValidation.validateCfDef(cf_def);
if (cf_def.keyspace == null || cf_def.name == null)
throw new InvalidRequestException("Keyspace and CF name must be set.");
CFMetaData oldCfm = DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace, cf_def.name));
- if (oldCfm == null)
+ if (oldCfm == null)
throw new InvalidRequestException("Could not find column family definition to modify.");
validateSchemaAgreement();
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Sun Jun 5 13:43:22 2011
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
@@ -513,7 +514,7 @@ public class ThriftValidation
throw new InvalidRequestException("No indexed columns present in index clause with operator EQ");
}
- public static void validateCfDef(CfDef cf_def) throws InvalidRequestException
+ public static void validateCfDef(CfDef cf_def, CFMetaData old) throws InvalidRequestException
{
try
{
@@ -533,6 +534,22 @@ public class ThriftValidation
}
}
+ if (cf_def.key_alias != null)
+ {
+ if (!cf_def.key_alias.hasRemaining())
+ throw new InvalidRequestException("key_alias may not be empty");
+ try
+ {
+ // it's hard to use a key in a select statement if we can't type it.
+ // for now let's keep it simple and require ascii.
+ AsciiType.instance.validate(cf_def.key_alias);
+ }
+ catch (MarshalException e)
+ {
+ throw new InvalidRequestException("Key aliases must be ascii");
+ }
+ }
+
ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
if (cfType == null)
throw new InvalidRequestException("invalid column type " + cf_def.column_type);
@@ -550,16 +567,17 @@ public class ThriftValidation
? TypeParser.parse(cf_def.comparator_type)
: TypeParser.parse(cf_def.subcomparator_type);
+ // initialize a set of names NOT in the CF under consideration
Set<String> indexNames = new HashSet<String>();
- for (ColumnDef c : cf_def.column_metadata)
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- // Ensure that given idx_names and auto_generated idx_names cannot collide
- CFMetaData cfm = CFMetaData.fromThrift(cf_def);
- String idxName = cfm.indexName(ColumnDefinition.fromColumnDef(c));
- if (indexNames.contains(idxName))
- throw new InvalidRequestException("Duplicate index names " + idxName);
- indexNames.add(idxName);
+ if (!cfs.getColumnFamilyName().equals(cf_def.name))
+ for (ColumnDefinition cd : cfs.metadata.getColumn_metadata().values())
+ indexNames.add(cd.getIndexName());
+ }
+ for (ColumnDef c : cf_def.column_metadata)
+ {
TypeParser.parse(c.validation_class);
try
@@ -572,11 +590,31 @@ public class ThriftValidation
ByteBufferUtil.bytesToHex(c.name), cf_def.comparator_type));
}
- if ((c.index_name != null) && (c.index_type == null))
- throw new ConfigurationException("index_name cannot be set without index_type");
-
- if (cfType == ColumnFamilyType.Super && c.index_type != null)
- throw new InvalidRequestException("Secondary indexes are not supported on supercolumns");
+ if (c.index_type == null)
+ {
+ if (c.index_name != null)
+ throw new ConfigurationException("index_name cannot be set without index_type");
+ }
+ else
+ {
+ if (cfType == ColumnFamilyType.Super)
+ throw new InvalidRequestException("Secondary indexes are not supported on supercolumns");
+ assert c.index_name != null; // should have a default set by now if none was provided
+ if (!Migration.isLegalName(c.index_name))
+ throw new InvalidRequestException("Illegal index name " + c.index_name);
+ // check index names against this CF _and_ globally
+ if (indexNames.contains(c.index_name))
+ throw new InvalidRequestException("Duplicate index name " + c.index_name);
+ indexNames.add(c.index_name);
+
+ ColumnDefinition oldCd = old == null ? null : old.getColumnDefinition(c.name);
+ if (oldCd != null && oldCd.getIndexType() != null)
+ {
+ assert oldCd.getIndexName() != null;
+ if (!oldCd.getIndexName().equals(c.index_name))
+ throw new InvalidRequestException("Cannot modify index name");
+ }
+ }
}
validateMinMaxCompactionThresholds(cf_def);
validateMemtableSettings(cf_def);
Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Sun Jun 5 13:43:22 2011
@@ -75,6 +75,10 @@ def load_sample(dbconn):
CREATE COLUMNFAMILY IndexedA (KEY text PRIMARY KEY, birthdate int)
WITH comparator = ascii AND default_validation = ascii;
""")
+ dbconn.execute("""
+ CREATE COLUMNFAMILY CounterCF (KEY text PRIMARY KEY, count_me counter)
+ WITH comparator = ascii AND default_validation = counter;
+ """)
dbconn.execute("CREATE INDEX ON IndexedA (birthdate)")
query = "UPDATE StandardString1 SET :c1 = :v1, :c2 = :v2 WHERE KEY = :key"
@@ -526,7 +530,7 @@ class TestCql(ThriftTester):
"creating column indexes"
cursor = init()
cursor.execute("USE Keyspace1")
- cursor.execute("CREATE COLUMNFAMILY CreateIndex1 (KEY text PRIMARY KEY)")
+ cursor.execute("CREATE COLUMNFAMILY CreateIndex1 (KEY text PRIMARY KEY, items text, stuff int)")
cursor.execute("CREATE INDEX namedIndex ON CreateIndex1 (items)")
cursor.execute("CREATE INDEX ON CreateIndex1 (stuff)")
@@ -535,10 +539,9 @@ class TestCql(ThriftTester):
cfam = [i for i in ksdef.cf_defs if i.name == "CreateIndex1"][0]
items = [i for i in cfam.column_metadata if i.name == "items"][0]
stuff = [i for i in cfam.column_metadata if i.name == "stuff"][0]
- assert items.index_name == "namedIndex", "missing index (or name)"
+ assert items.index_name == "namedIndex", items.index_name
assert items.index_type == 0, "missing index"
- assert not stuff.index_name, \
- "index_name should be unset, not %s" % stuff.index_name
+ assert stuff.index_name != None, "index_name should be set"
assert stuff.index_type == 0, "missing index"
# already indexed
@@ -546,6 +549,34 @@ class TestCql(ThriftTester):
cursor.execute,
"CREATE INDEX ON CreateIndex1 (stuff)")
+ def test_drop_indexes(self):
+ "droping indexes on columns"
+ cursor = init()
+ cursor.execute("""CREATE KEYSPACE DropIndexTests WITH strategy_options:replication_factor = '1'
+ AND strategy_class = 'SimpleStrategy';""")
+ cursor.execute("USE DropIndexTests")
+ cursor.execute("CREATE COLUMNFAMILY IndexedCF (KEY text PRIMARY KEY, n text)")
+ cursor.execute("CREATE INDEX namedIndex ON IndexedCF (n)")
+
+ ksdef = thrift_client.describe_keyspace("DropIndexTests")
+ columns = ksdef.cf_defs[0].column_metadata
+
+ assert columns[0].index_name == "namedIndex"
+ assert columns[0].index_type == 0
+
+ # testing "DROP INDEX <INDEX_NAME>"
+ cursor.execute("DROP INDEX namedIndex")
+
+ ksdef = thrift_client.describe_keyspace("DropIndexTests")
+ columns = ksdef.cf_defs[0].column_metadata
+
+ assert columns[0].index_type == None
+ assert columns[0].index_name == None
+
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "DROP INDEX undefIndex")
+
def test_time_uuid(self):
"store and retrieve time-based (type 1) uuids"
cursor = init()
@@ -1006,3 +1037,151 @@ class TestCql(ThriftTester):
r = cursor.fetchone()
assert len(r) == 1, "expected 0 results, got %d" % len(r)
+
+ def test_alter_table_statement(self):
+ "test ALTER TABLE statement"
+ cursor = init()
+ cursor.execute("""
+ CREATE KEYSPACE AlterTableKS WITH strategy_options:replication_factor = '1'
+ AND strategy_class = 'SimpleStrategy';
+ """)
+ cursor.execute("USE AlterTableKS;")
+
+ cursor.execute("""
+ CREATE COLUMNFAMILY NewCf1 (KEY varint PRIMARY KEY) WITH default_validation = ascii;
+ """)
+
+ # TODO: temporary (until this can be done with CQL).
+ ksdef = thrift_client.describe_keyspace("AlterTableKS")
+ assert len(ksdef.cf_defs) == 1, \
+ "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+ cfam = ksdef.cf_defs[0]
+
+ assert len(cfam.column_metadata) == 0
+
+ # testing "add a new column"
+ cursor.execute("ALTER TABLE NewCf1 ADD name varchar")
+
+ ksdef = thrift_client.describe_keyspace("AlterTableKS")
+ assert len(ksdef.cf_defs) == 1, \
+ "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+ columns = ksdef.cf_defs[0].column_metadata
+
+ assert len(columns) == 1
+ assert columns[0].name == 'name'
+ assert columns[0].validation_class == 'org.apache.cassandra.db.marshal.UTF8Type'
+
+ # testing "alter a column type"
+ cursor.execute("ALTER TABLE NewCf1 ALTER name TYPE ascii")
+
+ ksdef = thrift_client.describe_keyspace("AlterTableKS")
+ assert len(ksdef.cf_defs) == 1, \
+ "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+ columns = ksdef.cf_defs[0].column_metadata
+
+ assert len(columns) == 1
+ assert columns[0].name == 'name'
+ assert columns[0].validation_class == 'org.apache.cassandra.db.marshal.AsciiType'
+
+ # alter column with unknown validator
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "ALTER TABLE NewCf1 ADD name utf8")
+
+ # testing 'drop an existing column'
+ cursor.execute("ALTER TABLE NewCf1 DROP name")
+
+ ksdef = thrift_client.describe_keyspace("AlterTableKS")
+ assert len(ksdef.cf_defs) == 1, \
+ "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+ columns = ksdef.cf_defs[0].column_metadata
+
+ assert len(columns) == 0
+
+ # add column with unknown validator
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "ALTER TABLE NewCf1 ADD name utf8")
+
+ # alter not existing column
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "ALTER TABLE NewCf1 ALTER name TYPE uuid")
+
+ # drop not existing column
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "ALTER TABLE NewCf1 DROP name")
+
+ def test_counter_column_support(self):
+ "update statement should be able to work with counter columns"
+ cursor = init()
+
+ # increment counter
+ cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 2, \
+ "unrecognized value '%s'" % r[1]
+
+ cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 4, \
+ "unrecognized value '%s'" % r[1]
+
+ # decrement counter
+ cursor.execute("UPDATE CounterCF SET count_me = count_me - 4 WHERE key = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 0, \
+ "unrecognized value '%s'" % r[1]
+
+ cursor.execute("SELECT * FROM CounterCF")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 0, \
+ "unrecognized value '%s'" % r[1]
+
+ # deleting a counter column
+ cursor.execute("DELETE count_me FROM CounterCF WHERE KEY = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+ assert len(colnames) == 1
+
+ r = cursor.fetchone()
+ assert len(r) == 1
+
+ # can't mix counter and normal statements
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "UPDATE CounterCF SET count_me = count_me + 2, x = 'a' WHERE key = 'counter1'")
+
+ # column names must match
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "UPDATE CounterCF SET count_me = count_not_me + 2 WHERE key = 'counter1'")
Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Sun Jun 5 13:43:22 2011
@@ -1421,7 +1421,7 @@ class TestMutations(ThriftTester):
client.system_update_column_family(modified_cf)
# Add a second indexed CF ...
- birthdate_coldef = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, 'birthdate_index')
+ birthdate_coldef = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, 'birthdate2_index')
age_coldef = ColumnDef('age', 'BytesType', IndexType.KEYS, 'age_index')
cfdef = CfDef('Keyspace1', 'BlankCF2', column_metadata=[birthdate_coldef, age_coldef])
client.system_add_column_family(cfdef)
Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Sun Jun 5 13:43:22 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.lang.NotImplementedException;
import com.google.common.base.Charsets;
@@ -249,7 +250,7 @@ public class SchemaLoader
{{
ByteBuffer cName = ByteBuffer.wrap("birthdate".getBytes(Charsets.UTF_8));
IndexType keys = withIdxType ? IndexType.KEYS : null;
- put(cName, new ColumnDefinition(cName, LongType.instance, keys, null));
+ put(cName, new ColumnDefinition(cName, LongType.instance, keys, ByteBufferUtil.bytesToHex(cName)));
}});
}
private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp)
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Sun Jun 5 13:43:22 2011
@@ -18,6 +18,8 @@
package org.apache.cassandra.stress;
import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +36,7 @@ import org.apache.thrift.transport.TFram
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-public class Session
+public class Session implements Serializable
{
// command line options
public static final Options availableOptions = new Options();
@@ -74,6 +76,7 @@ public class Session
availableOptions.addOption("O", "strategy-properties", true, "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
availableOptions.addOption("W", "no-replicate-on-write",false, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work");
availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size");
+ availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address.");
}
private int numKeys = 1000 * 1000;
@@ -95,7 +98,7 @@ public class Session
private boolean replicateOnWrite = true;
private boolean ignoreErrors = false;
- private PrintStream out = System.out;
+ private final String outFileName;
private IndexType indexType = null;
private Stress.Operations operation = Stress.Operations.INSERT;
@@ -110,6 +113,8 @@ public class Session
protected int mean;
protected float sigma;
+ public final InetAddress sendToDaemon;
+
public Session(String[] arguments) throws IllegalArgumentException
{
float STDev = 0.1f;
@@ -181,17 +186,7 @@ public class Session
if (cmd.hasOption("r"))
random = true;
- if (cmd.hasOption("f"))
- {
- try
- {
- out = new PrintStream(new FileOutputStream(cmd.getOptionValue("f")));
- }
- catch (FileNotFoundException e)
- {
- System.out.println(e.getMessage());
- }
- }
+ outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null;
if (cmd.hasOption("p"))
port = Integer.parseInt(cmd.getOptionValue("p"));
@@ -264,6 +259,17 @@ public class Session
replicateOnWrite = false;
averageSizeValues = cmd.hasOption("V");
+
+ try
+ {
+ sendToDaemon = cmd.hasOption("send-to")
+ ? InetAddress.getByName(cmd.getOptionValue("send-to"))
+ : null;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
}
catch (ParseException e)
{
@@ -360,7 +366,14 @@ public class Session
public PrintStream getOutputStream()
{
- return out;
+ try
+ {
+ return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
public int getProgressInterval()
@@ -432,16 +445,16 @@ public class Session
try
{
client.system_add_keyspace(keyspace);
- out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
+ System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
Thread.sleep(nodes.length * 1000); // seconds
}
catch (InvalidRequestException e)
{
- out.println("Unable to create stress keyspace: " + e.getWhy());
+ System.err.println("Unable to create stress keyspace: " + e.getWhy());
}
catch (Exception e)
{
- out.println(e.getMessage());
+ System.err.println(e.getMessage());
}
}
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java Sun Jun 5 13:43:22 2011
@@ -17,15 +17,12 @@
*/
package org.apache.cassandra.stress;
-import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.commons.cli.Option;
-import java.io.PrintStream;
+import java.io.*;
+import java.net.Socket;
+import java.net.SocketException;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
public final class Stress
{
@@ -36,17 +33,10 @@ public final class Stress
public static Session session;
public static Random randomizer = new Random();
-
- /**
- * Producer-Consumer model: 1 producer, N consumers
- */
- private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+ private static volatile boolean stopped = false;
public static void main(String[] arguments) throws Exception
{
- long latency, oldLatency;
- int epoch, total, oldTotal, keyCount, oldKeyCount;
-
try
{
session = new Session(arguments);
@@ -57,111 +47,49 @@ public final class Stress
return;
}
- // creating keyspace and column families
- if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD)
- {
- session.createKeySpaces();
- }
-
- int threadCount = session.getThreads();
- Thread[] consumers = new Thread[threadCount];
- PrintStream out = session.getOutputStream();
-
- out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+ PrintStream outStream = session.getOutputStream();
- int itemsPerThread = session.getKeysPerThread();
- int modulo = session.getNumKeys() % threadCount;
-
- // creating required type of the threads for the test
- for (int i = 0; i < threadCount; i++)
+ if (session.sendToDaemon != null)
{
- if (i == threadCount - 1)
- itemsPerThread += modulo; // last one is going to handle N + modulo items
-
- consumers[i] = new Consumer(itemsPerThread);
- }
-
- new Producer().start();
-
- // starting worker threads
- for (int i = 0; i < threadCount; i++)
- {
- consumers[i].start();
- }
+ Socket socket = new Socket(session.sendToDaemon, 2159);
- // initialization of the values
- boolean terminate = false;
- latency = 0;
- epoch = total = keyCount = 0;
+ ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
+ BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- int interval = session.getProgressInterval();
- int epochIntervals = session.getProgressInterval() * 10;
- long testStartTime = System.currentTimeMillis();
+ Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out));
- while (!terminate)
- {
- Thread.sleep(100);
-
- int alive = 0;
- for (Thread thread : consumers)
- if (thread.isAlive()) alive++;
+ out.writeObject(session);
- if (alive == 0)
- terminate = true;
+ String line;
- epoch++;
-
- if (terminate || epoch > epochIntervals)
+ try
{
- epoch = 0;
-
- oldTotal = total;
- oldLatency = latency;
- oldKeyCount = keyCount;
-
- total = session.operations.get();
- keyCount = session.keys.get();
- latency = session.latency.get();
+ while (!socket.isClosed() && (line = inp.readLine()) != null)
+ {
+ if (line.equals("END"))
+ {
+ out.writeInt(1);
+ break;
+ }
- int opDelta = total - oldTotal;
- int keyDelta = keyCount - oldKeyCount;
- double latencyDelta = latency - oldLatency;
+ outStream.println(line);
+ }
+ }
+ catch (SocketException e)
+ {
+ if (!stopped)
+ e.printStackTrace();
+ }
- long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000;
- String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN";
+ out.close();
+ inp.close();
- out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
- }
+ socket.close();
}
- }
-
- private static Operation createOperation(int index)
- {
- switch (session.getOperation())
+ else
{
- case READ:
- return new Reader(index);
-
- case COUNTER_GET:
- return new CounterGetter(index);
-
- case INSERT:
- return new Inserter(index);
-
- case COUNTER_ADD:
- return new CounterAdder(index);
-
- case RANGE_SLICE:
- return new RangeSlicer(index);
-
- case INDEXED_RANGE_SLICE:
- return new IndexedRangeSlicer(index);
-
- case MULTI_GET:
- return new MultiGetter(index);
+ new StressAction(session, outStream).run();
}
-
- throw new UnsupportedOperationException();
}
/**
@@ -180,56 +108,35 @@ public final class Stress
}
}
- /**
- * Produces exactly N items (awaits each to be consumed)
- */
- private static class Producer extends Thread
+ private static class ShutDown extends Thread
{
- public void run()
- {
- for (int i = 0; i < session.getNumKeys(); i++)
- {
- try
- {
- operations.put(createOperation(i % session.getNumDifferentKeys()));
- }
- catch (InterruptedException e)
- {
- System.err.println("Producer error - " + e.getMessage());
- return;
- }
- }
- }
- }
-
- /**
- * Each consumes exactly N items from queue
- */
- private static class Consumer extends Thread
- {
- private final int items;
+ private final Socket socket;
+ private final ObjectOutputStream out;
- public Consumer(int toConsume)
+ public ShutDown(Socket socket, ObjectOutputStream out)
{
- items = toConsume;
+ this.out = out;
+ this.socket = socket;
}
public void run()
{
- Cassandra.Client client = session.getClient();
-
- for (int i = 0; i < items; i++)
+ try
{
- try
- {
- operations.take().run(client); // running job
- }
- catch (Exception e)
+ if (!socket.isClosed())
{
- System.err.println(e.getMessage());
- System.exit(-1);
+ System.out.println("Control-C caught. Canceling running action and shutting down...");
+
+ out.writeInt(1);
+ out.close();
+
+ stopped = true;
}
}
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
}
}
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
public class CounterAdder extends Operation
{
- public CounterAdder(int index)
+ public CounterAdder(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -27,9 +28,9 @@ import java.util.List;
public class CounterGetter extends Operation
{
- public CounterGetter(int index)
+ public CounterGetter(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -30,9 +31,9 @@ public class IndexedRangeSlicer extends
{
private static List<ByteBuffer> values = null;
- public IndexedRangeSlicer(int index)
+ public IndexedRangeSlicer(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -33,9 +34,9 @@ public class Inserter extends Operation
{
private static List<ByteBuffer> values;
- public Inserter(int index)
+ public Inserter(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
public class MultiGetter extends Operation
{
- public MultiGetter(int index)
+ public MultiGetter(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -30,9 +31,9 @@ import java.util.List;
public class RangeSlicer extends Operation
{
- public RangeSlicer(int index)
+ public RangeSlicer(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Sun Jun 5 13:43:22 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -29,9 +30,9 @@ import static com.google.common.base.Cha
public class Reader extends Operation
{
- public Reader(int index)
+ public Reader(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Sun Jun 5 13:43:22 2011
@@ -46,6 +46,12 @@ public abstract class Operation
session = Stress.session;
}
+ public Operation(Session client, int idx)
+ {
+ index = idx;
+ session = client;
+ }
+
/**
* Run operation
* @param client Cassandra Thrift client connection
@@ -101,18 +107,18 @@ public abstract class Operation
* key generator using Gauss or Random algorithm
* @return byte[] representation of the key string
*/
- protected static byte[] generateKey()
+ protected byte[] generateKey()
{
- return (Stress.session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
+ return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
}
/**
* Random key generator
* @return byte[] representation of the key string
*/
- private static byte[] generateRandomKey()
+ private byte[] generateRandomKey()
{
- String format = "%0" + Stress.session.getTotalKeysLength() + "d";
+ String format = "%0" + session.getTotalKeysLength() + "d";
return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8);
}
@@ -120,9 +126,8 @@ public abstract class Operation
* Gauss key generator
* @return byte[] representation of the key string
*/
- private static byte[] generateGaussKey()
+ private byte[] generateGaussKey()
{
- Session session = Stress.session;
String format = "%0" + session.getTotalKeysLength() + "d";
for (;;)