You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/05 15:43:23 UTC

svn commit: r1132406 [2/2] - in /cassandra/trunk: ./ contrib/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ debian/ doc/cql/ drivers/java/src/org/apache/cassandra/cql/jdbc/ drivers/java/test/org/apache/cassandra/cql/ drivers/py/cql/ interface/t...

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Jun  5 13:43:22 2011
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeoutExcep
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.db.CounterColumn;
+import org.apache.cassandra.db.context.CounterContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +47,8 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.migration.*;
-import org.apache.cassandra.db.migration.avro.CfDef;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -67,7 +67,7 @@ public class QueryProcessor
     throws InvalidRequestException, TimedOutException, UnavailableException
     {
         QueryPath queryPath = new QueryPath(select.getColumnFamily());
-        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
 
         // ...of a list of column names
@@ -161,7 +161,7 @@ public class QueryProcessor
         }
         AbstractBounds bounds = new Bounds(startToken, finishToken);
         
-        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
         // XXX: Our use of Thrift structs internally makes me Sad. :(
         SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
         validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -214,7 +214,7 @@ public class QueryProcessor
     private static List<org.apache.cassandra.db.Row> getIndexedSlices(String keyspace, SelectStatement select)
     throws TimedOutException, UnavailableException, InvalidRequestException
     {
-        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
         // XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
         SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
         validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -261,7 +261,7 @@ public class QueryProcessor
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         String keyspace = clientState.getKeyspace();
-        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+        List<IMutation> rowMutations = new ArrayList<IMutation>();
         List<String> cfamsSeen = new ArrayList<String>();
 
         for (UpdateStatement update : updateStatements)
@@ -491,7 +491,7 @@ public class QueryProcessor
             case SELECT:
                 SelectStatement select = (SelectStatement)statement.statement;
                 clientState.hasColumnFamilyAccess(select.getColumnFamily(), Permission.READ);
-                metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+                metadata = validateColumnFamily(keyspace, select.getColumnFamily());
                 validateSelect(keyspace, select);
                 
                 List<org.apache.cassandra.db.Row> rows;
@@ -640,7 +640,7 @@ public class QueryProcessor
                 {
                     KsDef ksd = new KsDef(create.getName(),
                                           create.getStrategyClass(),
-                                          Collections.<org.apache.cassandra.thrift.CfDef>emptyList())
+                                          Collections.<CfDef>emptyList())
                                 .setStrategy_options(create.getStrategyOptions());
                     ThriftValidation.validateKsDef(ksd);
                     applyMigrationOnStage(new AddKeyspace(KSMetaData.fromThrift(ksd)));
@@ -694,37 +694,58 @@ public class QueryProcessor
                                                                                       createIdx.getColumnFamily()));
                 if (oldCfm == null)
                     throw new InvalidRequestException("No such column family: " + createIdx.getColumnFamily());
-                
+
+                boolean columnExists = false;
                 ByteBuffer columnName = createIdx.getColumnName().getByteBuffer();
-                ColumnDefinition columnDef = oldCfm.getColumn_metadata().get(columnName);
-                
-                // Meta-data for this column already exists
-                if (columnDef != null)
+                // mutating oldCfm directly would be bad, but mutating a Thrift copy is fine.  This also
+                // sets us up to use validateCfDef to check for index name collisions.
+                CfDef cf_def = CFMetaData.convertToThrift(oldCfm);
+                for (ColumnDef cd : cf_def.column_metadata)
                 {
-                    // This column is already indexed, stop, drop, and roll.
-                    if (columnDef.getIndexType() != null)
-                        throw new InvalidRequestException("Index exists");
-                    // Add index attrs to the existing definition
-                    columnDef.setIndexName(createIdx.getIndexName());
-                    columnDef.setIndexType(org.apache.cassandra.thrift.IndexType.KEYS);
+                    if (cd.name.equals(columnName))
+                    {
+                        if (cd.index_type != null)
+                            throw new InvalidRequestException("Index already exists");
+                        logger.debug("Updating column {} definition for index {}", oldCfm.comparator.getString(columnName), createIdx.getIndexName());
+                        cd.setIndex_type(IndexType.KEYS);
+                        cd.setIndex_name(createIdx.getIndexName());
+                        columnExists = true;
+                        break;
+                    }
                 }
-                // No meta-data, create a new column definition from scratch.
-                else
+                if (!columnExists)
+                    throw new InvalidRequestException("No column definition found for column " + oldCfm.comparator.getString(columnName));
+
+                CFMetaData.addDefaultIndexNames(cf_def);
+                ThriftValidation.validateCfDef(cf_def, oldCfm);
+                try
                 {
-                    columnDef = new ColumnDefinition(columnName,
-                                                     DatabaseDescriptor.getValueValidator(keyspace,
-                                                                                          createIdx.getColumnFamily(),
-                                                                                          columnName),
-                                                     org.apache.cassandra.thrift.IndexType.KEYS,
-                                                     createIdx.getIndexName());
+                    applyMigrationOnStage(new UpdateColumnFamily(CFMetaData.convertToAvro(cf_def)));
+                }
+                catch (ConfigurationException e)
+                {
+                    InvalidRequestException ex = new InvalidRequestException(e.toString());
+                    ex.initCause(e);
+                    throw ex;
+                }
+                catch (IOException e)
+                {
+                    InvalidRequestException ex = new InvalidRequestException(e.toString());
+                    ex.initCause(e);
+                    throw ex;
                 }
                 
-                CfDef cfamilyDef = CFMetaData.convertToAvro(oldCfm);
-                cfamilyDef.column_metadata.add(columnDef.deflate());
-                
+                result.type = CqlResultType.VOID;
+                return result;
+
+            case DROP_INDEX:
+                DropIndexStatement dropIdx = (DropIndexStatement)statement.statement;
+                clientState.hasColumnFamilyListAccess(Permission.WRITE);
+                validateSchemaAgreement();
+
                 try
                 {
-                    applyMigrationOnStage(new UpdateColumnFamily(cfamilyDef));
+                    applyMigrationOnStage(dropIdx.generateMutation(clientState.getKeyspace()));
                 }
                 catch (ConfigurationException e)
                 {
@@ -738,10 +759,10 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                
+
                 result.type = CqlResultType.VOID;
                 return result;
-                
+
             case DROP_KEYSPACE:
                 String deleteKeyspace = (String)statement.statement;
                 clientState.hasKeyspaceListAccess(Permission.WRITE);
@@ -791,7 +812,35 @@ public class QueryProcessor
                 
                 result.type = CqlResultType.VOID;
                 return result;
-                
+
+            case ALTER_TABLE:
+                AlterTableStatement alterTable = (AlterTableStatement) statement.statement;
+
+                System.out.println(alterTable);
+
+                validateColumnFamily(keyspace, alterTable.columnFamily);
+                clientState.hasColumnFamilyAccess(alterTable.columnFamily, Permission.WRITE);
+                validateSchemaAgreement();
+
+                try
+                {
+                    applyMigrationOnStage(new UpdateColumnFamily(alterTable.getCfDef(keyspace)));
+                }
+                catch (ConfigurationException e)
+                {
+                    InvalidRequestException ex = new InvalidRequestException(e.getMessage());
+                    ex.initCause(e);
+                    throw ex;
+                }
+                catch (IOException e)
+                {
+                    InvalidRequestException ex = new InvalidRequestException(e.getMessage());
+                    ex.initCause(e);
+                    throw ex;
+                }
+
+                result.type = CqlResultType.VOID;
+                return result;
         }
         
         return null;    // We should never get here.
@@ -813,7 +862,7 @@ public class QueryProcessor
             {
                 if (c.isMarkedForDelete())
                     continue;
-                thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+                thriftColumns.add(thriftify(c));
             }
         }
         else
@@ -840,16 +889,25 @@ public class QueryProcessor
                 {
                     throw new AssertionError(e);
                 }
+
                 IColumn c = row.cf.getColumn(name);
                 if (c == null || c.isMarkedForDelete())
                     thriftColumns.add(new Column().setName(name));
                 else
-                    thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+                    thriftColumns.add(thriftify(c));
             }
         }
         return thriftColumns;
     }
 
+    private static Column thriftify(IColumn c)
+    {
+        ByteBuffer value = (c instanceof CounterColumn)
+                           ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
+                           : c.value();
+        return new Column(c.name()).setValue(value).setTimestamp(c.timestamp());
+    }
+
     private static String getKeyString(CFMetaData metadata)
     {
         String keyString;

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Sun Jun  5 13:43:22 2011
@@ -24,8 +24,8 @@ import java.util.EnumSet;
 
 public enum StatementType
 {
-    SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX,
-        DROP_KEYSPACE, DROP_COLUMNFAMILY;
+    SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX, DROP_INDEX,
+        DROP_KEYSPACE, DROP_COLUMNFAMILY, ALTER_TABLE;
     
     // Statement types that don't require a keyspace to be set.
     private static final EnumSet<StatementType> topLevel = EnumSet.of(USE, CREATE_KEYSPACE, DROP_KEYSPACE);

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Sun Jun  5 13:43:22 2011
@@ -26,6 +26,8 @@ import java.util.*;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -35,6 +37,7 @@ import org.apache.cassandra.thrift.Inval
 
 import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
 
+import static org.apache.cassandra.cql.Operation.OperationType;
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
 /**
@@ -43,7 +46,7 @@ import static org.apache.cassandra.thrif
  */
 public class UpdateStatement extends AbstractModification
 {
-    private Map<Term, Term> columns;
+    private Map<Term, Operation> columns;
     private List<Term> columnNames, columnValues;
     private List<Term> keys;
     
@@ -57,7 +60,7 @@ public class UpdateStatement extends Abs
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
     public UpdateStatement(String columnFamily,
-                           Map<Term, Term> columns,
+                           Map<Term, Operation> columns,
                            List<Term> keys,
                            Attributes attrs)
     {
@@ -113,17 +116,28 @@ public class UpdateStatement extends Abs
     }
 
     /** {@inheritDoc} */
-    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
     {
         return prepareRowMutations(keyspace, clientState, null);
     }
 
     /** {@inheritDoc} */
-    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
+    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
     {
         List<String> cfamsSeen = new ArrayList<String>();
 
-        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+        boolean hasCommutativeOperation = false;
+
+        for (Map.Entry<Term, Operation> column : getColumns().entrySet())
+        {
+            if (!column.getValue().isUnary())
+                hasCommutativeOperation = true;
+
+            if (hasCommutativeOperation && column.getValue().isUnary())
+                throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+        }
+
+        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, hasCommutativeOperation);
 
         // Avoid unnecessary authorizations.
         if (!(cfamsSeen.contains(columnFamily)))
@@ -132,7 +146,7 @@ public class UpdateStatement extends Abs
             cfamsSeen.add(columnFamily);
         }
 
-        List<RowMutation> rowMutations = new LinkedList<RowMutation>();
+        List<IMutation> rowMutations = new LinkedList<IMutation>();
 
         for (Term key: keys)
         {
@@ -154,43 +168,61 @@ public class UpdateStatement extends Abs
      *
      * @throws InvalidRequestException on the wrong request
      */
-    private RowMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
-    {
-        RowMutation rm = new RowMutation(keyspace, key);
-
-        mutationForKey(rm, keyspace, metadata, timestamp);
-
-        return rm;
-    }
-
-    /** {@inheritDoc} */
-    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp) throws InvalidRequestException
-    {
-        return mutationForKey(keyspace, key, validateColumnFamily(keyspace, columnFamily, false), timestamp);
-    }
-
-    /** {@inheritDoc} */
-    public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException
-    {
-        mutationForKey(mutation, keyspace, validateColumnFamily(keyspace, columnFamily, false), timestamp);
-    }
-
-    private void mutationForKey(RowMutation mutation, String keyspace, CFMetaData metadata, Long timestamp) throws InvalidRequestException
+    private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
     {
         AbstractType<?> comparator = getComparator(keyspace);
 
-        for (Map.Entry<Term, Term> column : getColumns().entrySet())
+        // if true we need to wrap RowMutation into CounterMutation
+        boolean hasCounterColumn = false;
+        RowMutation rm = new RowMutation(keyspace, key);
+
+        for (Map.Entry<Term, Operation> column : getColumns().entrySet())
         {
             ByteBuffer colName = column.getKey().getByteBuffer(comparator);
-            ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
+            Operation op = column.getValue();
 
-            validateColumn(metadata, colName, colValue);
+            if (op.isUnary())
+            {
+                if (hasCounterColumn)
+                    throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+
+                ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName));
+
+                validateColumn(metadata, colName, colValue);
+                rm.add(new QueryPath(columnFamily, null, colName),
+                       colValue,
+                       (timestamp == null) ? getTimestamp() : timestamp,
+                       getTimeToLive());
+            }
+            else
+            {
+                hasCounterColumn = true;
+
+                if (!column.getKey().getText().equals(op.a.getText()))
+                    throw new InvalidRequestException("Only expressions like X = X + <long> are supported.");
+
+                long value;
+
+                try
+                {
+                    value = Long.parseLong(op.b.getText());
+
+                    if (op.type == OperationType.MINUS)
+                    {
+                        value *= -1;
+                    }
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.",
+                                                      op.b.getText()));
+                }
 
-            mutation.add(new QueryPath(columnFamily, null, colName),
-                         colValue,
-                         (timestamp == null) ? getTimestamp() : timestamp,
-                         getTimeToLive());
+                rm.addCounter(new QueryPath(columnFamily, null, colName), value);
+            }
         }
+
+        return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
     }
 
     public String getColumnFamily()
@@ -203,8 +235,8 @@ public class UpdateStatement extends Abs
     {
         return keys;
     }
-
-    public Map<Term, Term> getColumns() throws InvalidRequestException
+    
+    public Map<Term, Operation> getColumns() throws InvalidRequestException
     {
         // Created from an UPDATE
         if (columns != null)
@@ -218,11 +250,11 @@ public class UpdateStatement extends Abs
         if (columnNames.size() < 1)
             throw new InvalidRequestException("no columns specified for INSERT");
         
-        columns = new HashMap<Term, Term>();
+        columns = new HashMap<Term, Operation>();
         
         for (int i = 0; i < columnNames.size(); i++)
-            columns.put(columnNames.get(i), columnValues.get(i));
-        
+            columns.put(columnNames.get(i), new Operation(columnValues.get(i)));
+
         return columns;
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun Jun  5 13:43:22 2011
@@ -511,7 +511,7 @@ public class ColumnFamilyStore implement
         if (cfm != null) // secondary indexes aren't stored in DD.
         {
             for (ColumnDefinition def : cfm.getColumn_metadata().values())
-                scrubDataDirectories(table, cfm.indexName(def));
+                scrubDataDirectories(table, cfm.indexColumnFamilyName(def));
         }
     }
 
@@ -1802,8 +1802,12 @@ public class ColumnFamilyStore implement
      */
     public Future<?> truncate() throws IOException
     {
-        // snapshot will also flush, but we want to truncate the most possible, and anything in a flush written
-        // after truncateAt won't be truncated.
+        // We have two goals here:
+        // - truncate should delete everything written before truncate was invoked
+        // - but not delete anything that isn't part of the snapshot we create.
+        // We accomplish this by first flushing manually, then snapshotting, and
+        // recording the timestamp IN BETWEEN those actions. Any sstables created
+        // with this timestamp or greater time, will not be marked for delete.
         try
         {
             forceBlockingFlush();
@@ -1812,33 +1816,20 @@ public class ColumnFamilyStore implement
         {
             throw new RuntimeException(e);
         }
-
-        final long truncatedAt = System.currentTimeMillis();
-        snapshot(Table.getTimestampedSnapshotName("before-truncate"));
-
-        Runnable runnable = new WrappedRunnable()
+        // sleep a little to make sure that our truncatedAt comes after any sstable
+        // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
+        try
         {
-            public void runMayThrow() throws InterruptedException, IOException
-            {
-                // putting markCompacted on the commitlogUpdater thread ensures it will run
-                // after any compactions that were in progress when truncate was called, are finished
-                for (ColumnFamilyStore cfs : concatWithIndexes())
-                {
-                    List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
-                    for (SSTableReader sstable : cfs.getSSTables())
-                    {
-                        if (!sstable.newSince(truncatedAt))
-                            truncatedSSTables.add(sstable);
-                    }
-                    cfs.data.markCompacted(truncatedSSTables);
-                }
-
-                // Invalidate row cache
-                invalidateRowCache();
-            }
-        };
+            Thread.sleep(100);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        long truncatedAt = System.currentTimeMillis();
+        snapshot(Table.getTimestampedSnapshotName("before-truncate"));
 
-        return postFlushExecutor.submit(runnable);
+        return CompactionManager.instance.submitTruncate(this, truncatedAt);
     }
 
     // if this errors out, we are in a world of hurt.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sun Jun  5 13:43:22 2011
@@ -47,6 +47,7 @@ import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MemoryInputStream;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
@@ -1149,6 +1150,30 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
+    public Future<?> submitTruncate(final ColumnFamilyStore main, final long truncatedAt)
+    {
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws InterruptedException, IOException
+            {
+                for (ColumnFamilyStore cfs : main.concatWithIndexes())
+                {
+                    List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
+                    for (SSTableReader sstable : cfs.getSSTables())
+                    {
+                        if (!sstable.newSince(truncatedAt))
+                            truncatedSSTables.add(sstable);
+                    }
+                    cfs.markCompacted(truncatedSSTables);
+                }
+
+                main.invalidateRowCache();
+            }
+        };
+
+        return executor.submit(runnable);
+    }
+
     private static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
         return cfs.isIndex()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Sun Jun  5 13:43:22 2011
@@ -22,20 +22,17 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.collect.Iterables;
-import org.apache.commons.collections.set.UnmodifiableSet;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
-
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
@@ -451,18 +448,17 @@ public class DataTracker
         public final Set<SSTableReader> sstables;
         public final Set<SSTableReader> compacting;
 
-        public View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting)
+        View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting)
         {
             this.memtable = memtable;
-            this.memtablesPendingFlush = pendingFlush instanceof UnmodifiableSet ? pendingFlush : Collections.unmodifiableSet(pendingFlush);
-            this.sstables = sstables instanceof UnmodifiableSet ? sstables : Collections.unmodifiableSet(sstables);
-            this.compacting = compacting instanceof UnmodifiableSet ? compacting : Collections.unmodifiableSet(compacting);
+            this.memtablesPendingFlush = pendingFlush;
+            this.sstables = sstables;
+            this.compacting = compacting;
         }
 
         public View switchMemtable(Memtable newMemtable)
         {
-            Set<Memtable> newPending = new HashSet<Memtable>(memtablesPendingFlush);
-            newPending.add(memtable);
+            Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
             return new View(newMemtable, newPending, sstables, compacting);
         }
 
@@ -473,32 +469,27 @@ public class DataTracker
 
         public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
         {
-            Set<Memtable> newPendings = new HashSet<Memtable>(memtablesPendingFlush);
-            Set<SSTableReader> newSSTables = new HashSet<SSTableReader>(sstables);
-            newPendings.remove(flushedMemtable);
-            newSSTables.add(newSSTable);
-            return new View(memtable, newPendings, newSSTables, compacting);
+            Set<Memtable> newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable)));
+            Set<SSTableReader> newSSTables = ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
+            return new View(memtable, newPending, newSSTables, compacting);
         }
 
         public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
         {
-            Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
-            Iterables.addAll(sstablesNew, replacements);
-            sstablesNew.removeAll(oldSSTables);
-            return new View(memtable, memtablesPendingFlush, sstablesNew, compacting);
+            Sets.SetView<SSTableReader> remaining = Sets.difference(sstables, ImmutableSet.copyOf(oldSSTables));
+            Set<SSTableReader> newSSTables = ImmutableSet.<SSTableReader>builder().addAll(remaining).addAll(replacements).build();
+            return new View(memtable, memtablesPendingFlush, newSSTables, compacting);
         }
 
         public View markCompacting(Collection<SSTableReader> tomark)
         {
-            Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
-            compactingNew.addAll(tomark);
+            Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(sstables).addAll(tomark).build();
             return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
         }
 
         public View unmarkCompacting(Collection<SSTableReader> tounmark)
         {
-            Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
-            compactingNew.removeAll(tounmark);
+            Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
             return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sun Jun  5 13:43:22 2011
@@ -731,23 +731,6 @@ public class Table
         return Iterables.transform(DatabaseDescriptor.getTables(), transformer);
     }
 
-    /**
-     * Performs a synchronous truncate operation, effectively deleting all data
-     * from the column family cfname
-     * @param cfname
-     * @throws IOException
-     * @throws ExecutionException
-     * @throws InterruptedException
-     */
-    public void truncate(String cfname) throws InterruptedException, ExecutionException, IOException
-    {
-        logger.debug("Truncating...");
-        ColumnFamilyStore cfs = getColumnFamilyStore(cfname);
-        // truncate, blocking
-        cfs.truncate().get();
-        logger.debug("Truncation done.");
-    }
-
     @Override
     public String toString()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Sun Jun  5 13:43:22 2011
@@ -22,7 +22,6 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +46,8 @@ public class TruncateVerbHandler impleme
 
             try
             {
-                Table.open(t.keyspace).truncate(t.columnFamily);
+                ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
+                cfs.truncate().get();
             }
             catch (Exception e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Sun Jun  5 13:43:22 2011
@@ -38,6 +38,8 @@ public class IncomingTcpConnection exten
 {
     private static Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
 
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    
     private Socket socket;
 
     public IncomingTcpConnection(Socket socket)
@@ -97,8 +99,13 @@ public class IncomingTcpConnection exten
                 {
                     int size = input.readInt();
                     byte[] contentBytes = new byte[size];
-                    input.readFully(contentBytes);
-                    
+                    // readFully allocates a direct buffer the size of the chunk it is asked to read,
+                    // so we cap that at CHUNK_SIZE.  See https://issues.apache.org/jira/browse/CASSANDRA-2654
+                    int remainder = size % CHUNK_SIZE;
+                    for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
+                        input.readFully(contentBytes, offset, CHUNK_SIZE);
+                    input.readFully(contentBytes, size - remainder, remainder);
+
                     if (version > MessagingService.version_)
                         logger.info("Received connection from newer protocol version. Ignorning message.");
                     else

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Sun Jun  5 13:43:22 2011
@@ -799,7 +799,8 @@ public class CassandraServer implements 
     {
         logger.debug("add_column_family");
         state().hasColumnFamilyListAccess(Permission.WRITE);
-        ThriftValidation.validateCfDef(cf_def);
+        CFMetaData.addDefaultIndexNames(cf_def);
+        ThriftValidation.validateCfDef(cf_def, null);
         validateSchemaAgreement();
 
         try
@@ -866,10 +867,11 @@ public class CassandraServer implements 
         try
         {
             Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size());
-            for (CfDef cfDef : ks_def.cf_defs)
+            for (CfDef cf_def : ks_def.cf_defs)
             {
-                ThriftValidation.validateCfDef(cfDef);
-                cfDefs.add(CFMetaData.fromThrift(cfDef));
+                CFMetaData.addDefaultIndexNames(cf_def);
+                ThriftValidation.validateCfDef(cf_def, null);
+                cfDefs.add(CFMetaData.fromThrift(cf_def));
             }
 
             ThriftValidation.validateKsDef(ks_def);
@@ -953,11 +955,10 @@ public class CassandraServer implements 
     {
         logger.debug("update_column_family");
         state().hasColumnFamilyListAccess(Permission.WRITE);
-        ThriftValidation.validateCfDef(cf_def);
         if (cf_def.keyspace == null || cf_def.name == null)
             throw new InvalidRequestException("Keyspace and CF name must be set.");
         CFMetaData oldCfm = DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace, cf_def.name));
-        if (oldCfm == null) 
+        if (oldCfm == null)
             throw new InvalidRequestException("Could not find column family definition to modify.");
         validateSchemaAgreement();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Sun Jun  5 13:43:22 2011
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
@@ -513,7 +514,7 @@ public class ThriftValidation
             throw new InvalidRequestException("No indexed columns present in index clause with operator EQ");
     }
 
-    public static void validateCfDef(CfDef cf_def) throws InvalidRequestException
+    public static void validateCfDef(CfDef cf_def, CFMetaData old) throws InvalidRequestException
     {
         try
         {
@@ -533,6 +534,22 @@ public class ThriftValidation
                 }
             }
 
+            if (cf_def.key_alias != null)
+            {
+                if (!cf_def.key_alias.hasRemaining())
+                    throw new InvalidRequestException("key_alias may not be empty");
+                try
+                {
+                    // it's hard to use a key in a select statement if we can't type it.
+                    // for now let's keep it simple and require ascii.
+                    AsciiType.instance.validate(cf_def.key_alias);
+                }
+                catch (MarshalException e)
+                {
+                    throw new InvalidRequestException("Key aliases must be ascii");
+                }
+            }
+
             ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
             if (cfType == null)
                 throw new InvalidRequestException("invalid column type " + cf_def.column_type);
@@ -550,16 +567,17 @@ public class ThriftValidation
                                     ? TypeParser.parse(cf_def.comparator_type)
                                     : TypeParser.parse(cf_def.subcomparator_type);
 
+            // initialize a set of names NOT in the CF under consideration
             Set<String> indexNames = new HashSet<String>();
-            for (ColumnDef c : cf_def.column_metadata)
+            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
-                // Ensure that given idx_names and auto_generated idx_names cannot collide
-                CFMetaData cfm = CFMetaData.fromThrift(cf_def);
-                String idxName = cfm.indexName(ColumnDefinition.fromColumnDef(c));
-                if (indexNames.contains(idxName))
-                    throw new InvalidRequestException("Duplicate index names " + idxName);
-                indexNames.add(idxName);
+                if (!cfs.getColumnFamilyName().equals(cf_def.name))
+                    for (ColumnDefinition cd : cfs.metadata.getColumn_metadata().values())
+                        indexNames.add(cd.getIndexName());
+            }
 
+            for (ColumnDef c : cf_def.column_metadata)
+            {
                 TypeParser.parse(c.validation_class);
 
                 try
@@ -572,11 +590,31 @@ public class ThriftValidation
                                                                     ByteBufferUtil.bytesToHex(c.name), cf_def.comparator_type));
                 }
 
-                if ((c.index_name != null) && (c.index_type == null))
-                    throw new ConfigurationException("index_name cannot be set without index_type");
-
-                if (cfType == ColumnFamilyType.Super && c.index_type != null)
-                    throw new InvalidRequestException("Secondary indexes are not supported on supercolumns");
+                if (c.index_type == null)
+                {
+                    if (c.index_name != null)
+                        throw new ConfigurationException("index_name cannot be set without index_type");
+                }
+                else
+                {
+                    if (cfType == ColumnFamilyType.Super)
+                        throw new InvalidRequestException("Secondary indexes are not supported on supercolumns");
+                    assert c.index_name != null; // should have a default set by now if none was provided
+                    if (!Migration.isLegalName(c.index_name))
+                        throw new InvalidRequestException("Illegal index name " + c.index_name);
+                    // check index names against this CF _and_ globally
+                    if (indexNames.contains(c.index_name))
+                        throw new InvalidRequestException("Duplicate index name " + c.index_name);
+                    indexNames.add(c.index_name);
+
+                    ColumnDefinition oldCd = old == null ? null : old.getColumnDefinition(c.name);
+                    if (oldCd != null && oldCd.getIndexType() != null)
+                    {
+                        assert oldCd.getIndexName() != null;
+                        if (!oldCd.getIndexName().equals(c.index_name))
+                            throw new InvalidRequestException("Cannot modify index name");
+                    }
+                }
             }
             validateMinMaxCompactionThresholds(cf_def);
             validateMemtableSettings(cf_def);

Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Sun Jun  5 13:43:22 2011
@@ -75,6 +75,10 @@ def load_sample(dbconn):
         CREATE COLUMNFAMILY IndexedA (KEY text PRIMARY KEY, birthdate int)
             WITH comparator = ascii AND default_validation = ascii;
     """)
+    dbconn.execute("""
+        CREATE COLUMNFAMILY CounterCF (KEY text PRIMARY KEY, count_me counter)
+            WITH comparator = ascii AND default_validation = counter;
+    """)
     dbconn.execute("CREATE INDEX ON IndexedA (birthdate)")
 
     query = "UPDATE StandardString1 SET :c1 = :v1, :c2 = :v2 WHERE KEY = :key"
@@ -526,7 +530,7 @@ class TestCql(ThriftTester):
         "creating column indexes"
         cursor = init()
         cursor.execute("USE Keyspace1")
-        cursor.execute("CREATE COLUMNFAMILY CreateIndex1 (KEY text PRIMARY KEY)")
+        cursor.execute("CREATE COLUMNFAMILY CreateIndex1 (KEY text PRIMARY KEY, items text, stuff int)")
         cursor.execute("CREATE INDEX namedIndex ON CreateIndex1 (items)")
         cursor.execute("CREATE INDEX ON CreateIndex1 (stuff)")
 
@@ -535,10 +539,9 @@ class TestCql(ThriftTester):
         cfam = [i for i in ksdef.cf_defs if i.name == "CreateIndex1"][0]
         items = [i for i in cfam.column_metadata if i.name == "items"][0]
         stuff = [i for i in cfam.column_metadata if i.name == "stuff"][0]
-        assert items.index_name == "namedIndex", "missing index (or name)"
+        assert items.index_name == "namedIndex", items.index_name
         assert items.index_type == 0, "missing index"
-        assert not stuff.index_name, \
-            "index_name should be unset, not %s" % stuff.index_name
+        assert stuff.index_name != None, "index_name should be set"
         assert stuff.index_type == 0, "missing index"
 
         # already indexed
@@ -546,6 +549,34 @@ class TestCql(ThriftTester):
                       cursor.execute,
                       "CREATE INDEX ON CreateIndex1 (stuff)")
 
+    def test_drop_indexes(self):
+        "droping indexes on columns"
+        cursor = init()
+        cursor.execute("""CREATE KEYSPACE DropIndexTests WITH strategy_options:replication_factor = '1'
+                                                            AND strategy_class = 'SimpleStrategy';""")
+        cursor.execute("USE DropIndexTests")
+        cursor.execute("CREATE COLUMNFAMILY IndexedCF (KEY text PRIMARY KEY, n text)")
+        cursor.execute("CREATE INDEX namedIndex ON IndexedCF (n)")
+
+        ksdef = thrift_client.describe_keyspace("DropIndexTests")
+        columns = ksdef.cf_defs[0].column_metadata
+
+        assert columns[0].index_name == "namedIndex"
+        assert columns[0].index_type == 0
+
+        # testing "DROP INDEX <INDEX_NAME>"
+        cursor.execute("DROP INDEX namedIndex")
+
+        ksdef = thrift_client.describe_keyspace("DropIndexTests")
+        columns = ksdef.cf_defs[0].column_metadata
+
+        assert columns[0].index_type == None
+        assert columns[0].index_name == None
+
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "DROP INDEX undefIndex")
+
     def test_time_uuid(self):
         "store and retrieve time-based (type 1) uuids"
         cursor = init()
@@ -1006,3 +1037,151 @@ class TestCql(ThriftTester):
 
         r = cursor.fetchone()
         assert len(r) == 1, "expected 0 results, got %d" % len(r)
+
+    def test_alter_table_statement(self):
+        "test ALTER TABLE statement"
+        cursor = init()
+        cursor.execute("""
+               CREATE KEYSPACE AlterTableKS WITH strategy_options:replication_factor = '1'
+                   AND strategy_class = 'SimpleStrategy';
+        """)
+        cursor.execute("USE AlterTableKS;")
+
+        cursor.execute("""
+            CREATE COLUMNFAMILY NewCf1 (KEY varint PRIMARY KEY) WITH default_validation = ascii;
+        """)
+
+        # TODO: temporary (until this can be done with CQL).
+        ksdef = thrift_client.describe_keyspace("AlterTableKS")
+        assert len(ksdef.cf_defs) == 1, \
+            "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+        cfam = ksdef.cf_defs[0]
+
+        assert len(cfam.column_metadata) == 0
+
+        # testing "add a new column"
+        cursor.execute("ALTER TABLE NewCf1 ADD name varchar")
+
+        ksdef = thrift_client.describe_keyspace("AlterTableKS")
+        assert len(ksdef.cf_defs) == 1, \
+            "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+        columns = ksdef.cf_defs[0].column_metadata
+
+        assert len(columns) == 1
+        assert columns[0].name == 'name'
+        assert columns[0].validation_class == 'org.apache.cassandra.db.marshal.UTF8Type'
+
+        # testing "alter a column type"
+        cursor.execute("ALTER TABLE NewCf1 ALTER name TYPE ascii")
+
+        ksdef = thrift_client.describe_keyspace("AlterTableKS")
+        assert len(ksdef.cf_defs) == 1, \
+            "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+        columns = ksdef.cf_defs[0].column_metadata
+
+        assert len(columns) == 1
+        assert columns[0].name == 'name'
+        assert columns[0].validation_class == 'org.apache.cassandra.db.marshal.AsciiType'
+
+        # alter column with unknown validator
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "ALTER TABLE NewCf1 ADD name utf8")
+
+        # testing 'drop an existing column'
+        cursor.execute("ALTER TABLE NewCf1 DROP name")
+
+        ksdef = thrift_client.describe_keyspace("AlterTableKS")
+        assert len(ksdef.cf_defs) == 1, \
+            "expected 1 column family total, found %d" % len(ksdef.cf_defs)
+        columns = ksdef.cf_defs[0].column_metadata
+
+        assert len(columns) == 0
+
+        # add column with unknown validator
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "ALTER TABLE NewCf1 ADD name utf8")
+
+        # alter not existing column
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "ALTER TABLE NewCf1 ALTER name TYPE uuid")
+
+        # drop not existing column
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "ALTER TABLE NewCf1 DROP name")
+    
+    def test_counter_column_support(self):
+        "update statement should be able to work with counter columns"
+        cursor = init()
+
+        # increment counter
+        cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 2, \
+               "unrecognized value '%s'" % r[1]
+
+        cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 4, \
+               "unrecognized value '%s'" % r[1]
+
+        # decrement counter
+        cursor.execute("UPDATE CounterCF SET count_me = count_me - 4 WHERE key = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 0, \
+               "unrecognized value '%s'" % r[1]
+
+        cursor.execute("SELECT * FROM CounterCF")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 0, \
+               "unrecognized value '%s'" % r[1]
+
+        # deleting a counter column
+        cursor.execute("DELETE count_me FROM CounterCF WHERE KEY = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+        assert len(colnames) == 1
+
+        r = cursor.fetchone()
+        assert len(r) == 1
+
+        # can't mix counter and normal statements
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "UPDATE CounterCF SET count_me = count_me + 2, x = 'a' WHERE key = 'counter1'")
+
+        # column names must match
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "UPDATE CounterCF SET count_me = count_not_me + 2 WHERE key = 'counter1'")

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Sun Jun  5 13:43:22 2011
@@ -1421,7 +1421,7 @@ class TestMutations(ThriftTester):
         client.system_update_column_family(modified_cf)
 
         # Add a second indexed CF ...
-        birthdate_coldef = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, 'birthdate_index')
+        birthdate_coldef = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, 'birthdate2_index')
         age_coldef = ColumnDef('age', 'BytesType', IndexType.KEYS, 'age_index')
         cfdef = CfDef('Keyspace1', 'BlankCF2', column_metadata=[birthdate_coldef, age_coldef])
         client.system_add_column_family(cfdef)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Sun Jun  5 13:43:22 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang.NotImplementedException;
 
 import com.google.common.base.Charsets;
@@ -249,7 +250,7 @@ public class SchemaLoader
                     {{
                         ByteBuffer cName = ByteBuffer.wrap("birthdate".getBytes(Charsets.UTF_8));
                         IndexType keys = withIdxType ? IndexType.KEYS : null;
-                        put(cName, new ColumnDefinition(cName, LongType.instance, keys, null));
+                        put(cName, new ColumnDefinition(cName, LongType.instance, keys, ByteBufferUtil.bytesToHex(cName)));
                     }});
     }
     private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp)

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Sun Jun  5 13:43:22 2011
@@ -18,6 +18,8 @@
 package org.apache.cassandra.stress;
 
 import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +36,7 @@ import org.apache.thrift.transport.TFram
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 
-public class Session
+public class Session implements Serializable
 {
     // command line options
     public static final Options availableOptions = new Options();
@@ -74,6 +76,7 @@ public class Session
         availableOptions.addOption("O",  "strategy-properties",  true,   "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
         availableOptions.addOption("W",  "no-replicate-on-write",false,  "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work");
         availableOptions.addOption("V",  "average-size-values",  false,  "Generate column values of average rather than specific size");
+        availableOptions.addOption("T",  "send-to",              true,   "Send this as a request to the stress daemon at specified address.");
     }
 
     private int numKeys          = 1000 * 1000;
@@ -95,7 +98,7 @@ public class Session
     private boolean replicateOnWrite = true;
     private boolean ignoreErrors  = false;
 
-    private PrintStream out = System.out;
+    private final String outFileName;
 
     private IndexType indexType = null;
     private Stress.Operations operation = Stress.Operations.INSERT;
@@ -110,6 +113,8 @@ public class Session
     protected int   mean;
     protected float sigma;
 
+    public final InetAddress sendToDaemon;
+
     public Session(String[] arguments) throws IllegalArgumentException
     {
         float STDev = 0.1f;
@@ -181,17 +186,7 @@ public class Session
             if (cmd.hasOption("r"))
                 random = true;
 
-            if (cmd.hasOption("f"))
-            {
-                try
-                {
-                    out = new PrintStream(new FileOutputStream(cmd.getOptionValue("f")));
-                }
-                catch (FileNotFoundException e)
-                {
-                    System.out.println(e.getMessage());
-                }
-            }
+            outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null;
 
             if (cmd.hasOption("p"))
                 port = Integer.parseInt(cmd.getOptionValue("p"));
@@ -264,6 +259,17 @@ public class Session
                 replicateOnWrite = false;
 
             averageSizeValues = cmd.hasOption("V");
+
+            try
+            {
+                sendToDaemon = cmd.hasOption("send-to")
+                                ? InetAddress.getByName(cmd.getOptionValue("send-to"))
+                                : null;
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
         catch (ParseException e)
         {
@@ -360,7 +366,14 @@ public class Session
 
     public PrintStream getOutputStream()
     {
-        return out;
+        try
+        {
+            return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName));
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e.getMessage(), e);
+        }
     }
 
     public int getProgressInterval()
@@ -432,16 +445,16 @@ public class Session
         try
         {
             client.system_add_keyspace(keyspace);
-            out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
+            System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
             Thread.sleep(nodes.length * 1000); // seconds
         }
         catch (InvalidRequestException e)
         {
-            out.println("Unable to create stress keyspace: " + e.getWhy());
+            System.err.println("Unable to create stress keyspace: " + e.getWhy());
         }
         catch (Exception e)
         {
-            out.println(e.getMessage());
+            System.err.println(e.getMessage());
         }
     }
 

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java Sun Jun  5 13:43:22 2011
@@ -17,15 +17,12 @@
  */
 package org.apache.cassandra.stress;
 
-import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.commons.cli.Option;
 
-import java.io.PrintStream;
+import java.io.*;
+import java.net.Socket;
+import java.net.SocketException;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
 
 public final class Stress
 {
@@ -36,17 +33,10 @@ public final class Stress
 
     public static Session session;
     public static Random randomizer = new Random();
-
-    /**
-     * Producer-Consumer model: 1 producer, N consumers
-     */
-    private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+    private static volatile boolean stopped = false;
 
     public static void main(String[] arguments) throws Exception
     {
-        long latency, oldLatency;
-        int epoch, total, oldTotal, keyCount, oldKeyCount;
-
         try
         {
             session = new Session(arguments);
@@ -57,111 +47,49 @@ public final class Stress
             return;
         }
 
-        // creating keyspace and column families
-        if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD)
-        {
-            session.createKeySpaces();
-        }
-
-        int threadCount  = session.getThreads();
-        Thread[] consumers = new Thread[threadCount];
-        PrintStream out = session.getOutputStream();
-
-        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+        PrintStream outStream = session.getOutputStream();
 
-        int itemsPerThread = session.getKeysPerThread();
-        int modulo = session.getNumKeys() % threadCount;
-
-        // creating required type of the threads for the test
-        for (int i = 0; i < threadCount; i++)
+        if (session.sendToDaemon != null)
         {
-            if (i == threadCount - 1)
-                itemsPerThread += modulo; // last one is going to handle N + modulo items
-
-            consumers[i] = new Consumer(itemsPerThread);
-        }
-
-        new Producer().start();
-
-        // starting worker threads
-        for (int i = 0; i < threadCount; i++)
-        {
-            consumers[i].start();
-        }
+            Socket socket = new Socket(session.sendToDaemon, 2159);
 
-        // initialization of the values
-        boolean terminate = false;
-        latency = 0;
-        epoch = total = keyCount = 0;
+            ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
+            BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
 
-        int interval = session.getProgressInterval();
-        int epochIntervals = session.getProgressInterval() * 10;
-        long testStartTime = System.currentTimeMillis();
+            Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out));
 
-        while (!terminate)
-        {
-            Thread.sleep(100);
-
-            int alive = 0;
-            for (Thread thread : consumers)
-                if (thread.isAlive()) alive++;
+            out.writeObject(session);
 
-            if (alive == 0)
-                terminate = true;
+            String line;
 
-            epoch++;
-
-            if (terminate || epoch > epochIntervals)
+            try
             {
-                epoch = 0;
-
-                oldTotal    = total;
-                oldLatency  = latency;
-                oldKeyCount = keyCount;
-
-                total    = session.operations.get();
-                keyCount = session.keys.get();
-                latency  = session.latency.get();
+                while (!socket.isClosed() && (line = inp.readLine()) != null)
+                {
+                    if (line.equals("END"))
+                    {
+                        out.writeInt(1);
+                        break;
+                    }
 
-                int opDelta  = total - oldTotal;
-                int keyDelta = keyCount - oldKeyCount;
-                double latencyDelta = latency - oldLatency;
+                    outStream.println(line);
+                }
+            }
+            catch (SocketException e)
+            {
+                if (!stopped)
+                    e.printStackTrace();
+            }
 
-                long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000;
-                String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN";
+            out.close();
+            inp.close();
 
-                out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
-            }
+            socket.close();
         }
-    }
-
-    private static Operation createOperation(int index)
-    {
-        switch (session.getOperation())
+        else
         {
-            case READ:
-                return new Reader(index);
-
-            case COUNTER_GET:
-                return new CounterGetter(index);
-
-            case INSERT:
-                return new Inserter(index);
-
-            case COUNTER_ADD:
-                return new CounterAdder(index);
-
-            case RANGE_SLICE:
-                return new RangeSlicer(index);
-
-            case INDEXED_RANGE_SLICE:
-                return new IndexedRangeSlicer(index);
-
-            case MULTI_GET:
-                return new MultiGetter(index);
+            new StressAction(session, outStream).run();
         }
-
-        throw new UnsupportedOperationException();
     }
 
     /**
@@ -180,56 +108,35 @@ public final class Stress
         }
     }
 
-    /**
-     * Produces exactly N items (awaits each to be consumed)
-     */
-    private static class Producer extends Thread
+    private static class ShutDown extends Thread
     {
-        public void run()
-        {
-            for (int i = 0; i < session.getNumKeys(); i++)
-            {
-                try
-                {
-                    operations.put(createOperation(i % session.getNumDifferentKeys()));
-                }
-                catch (InterruptedException e)
-                {
-                    System.err.println("Producer error - " + e.getMessage());
-                    return;
-                }
-            }
-        }
-    }
-
-    /**
-     * Each consumes exactly N items from queue
-     */
-    private static class Consumer extends Thread
-    {
-        private final int items;
+        private final Socket socket;
+        private final ObjectOutputStream out;
 
-        public Consumer(int toConsume)
+        public ShutDown(Socket socket, ObjectOutputStream out)
         {
-            items = toConsume;
+            this.out = out;
+            this.socket = socket;
         }
 
         public void run()
         {
-            Cassandra.Client client = session.getClient();
-
-            for (int i = 0; i < items; i++)
+            try
             {
-                try
-                {
-                    operations.take().run(client); // running job
-                }
-                catch (Exception e)
+                if (!socket.isClosed())
                 {
-                    System.err.println(e.getMessage());
-                    System.exit(-1);
+                    System.out.println("Control-C caught. Canceling running action and shutting down...");
+
+                    out.writeInt(1);
+                    out.close();
+
+                    stopped = true;
                 }
             }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
         }
     }
 

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
 
 public class CounterAdder extends Operation
 {
-    public CounterAdder(int index)
+    public CounterAdder(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -27,9 +28,9 @@ import java.util.List;
 
 public class CounterGetter extends Operation
 {
-    public CounterGetter(int index)
+    public CounterGetter(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -30,9 +31,9 @@ public class IndexedRangeSlicer extends 
 {
     private static List<ByteBuffer> values = null;
 
-    public IndexedRangeSlicer(int index)
+    public IndexedRangeSlicer(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -33,9 +34,9 @@ public class Inserter extends Operation
 {
     private static List<ByteBuffer> values;
 
-    public Inserter(int index)
+    public Inserter(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
 
 public class MultiGetter extends Operation
 {
-    public MultiGetter(int index)
+    public MultiGetter(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -30,9 +31,9 @@ import java.util.List;
 public class RangeSlicer extends Operation
 {
 
-    public RangeSlicer(int index)
+    public RangeSlicer(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Sun Jun  5 13:43:22 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -29,9 +30,9 @@ import static com.google.common.base.Cha
 
 public class Reader extends Operation
 {
-    public Reader(int index)
+    public Reader(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java?rev=1132406&r1=1132405&r2=1132406&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Sun Jun  5 13:43:22 2011
@@ -46,6 +46,12 @@ public abstract class Operation
         session = Stress.session;
     }
 
+    public Operation(Session client, int idx)
+    {
+        index = idx;
+        session = client;
+    }
+
     /**
      * Run operation
      * @param client Cassandra Thrift client connection
@@ -101,18 +107,18 @@ public abstract class Operation
      * key generator using Gauss or Random algorithm
      * @return byte[] representation of the key string
      */
-    protected static byte[] generateKey()
+    protected byte[] generateKey()
     {
-        return (Stress.session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
+        return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
     }
 
     /**
      * Random key generator
      * @return byte[] representation of the key string
      */
-    private static byte[] generateRandomKey()
+    private byte[] generateRandomKey()
     {
-        String format = "%0" + Stress.session.getTotalKeysLength() + "d";
+        String format = "%0" + session.getTotalKeysLength() + "d";
         return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8);
     }
 
@@ -120,9 +126,8 @@ public abstract class Operation
      * Gauss key generator
      * @return byte[] representation of the key string
      */
-    private static byte[] generateGaussKey()
+    private byte[] generateGaussKey()
     {
-        Session session = Stress.session;
         String format = "%0" + session.getTotalKeysLength() + "d";
 
         for (;;)