You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/24 23:33:05 UTC

svn commit: r1074329 [1/2] - in /cassandra/trunk: ./ bin/ contrib/ contrib/stress/src/org/apache/cassandra/contrib/stress/ contrib/stress/src/org/apache/cassandra/contrib/stress/operations/ contrib/stress/src/org/apache/cassandra/contrib/stress/util/ i...

Author: jbellis
Date: Thu Feb 24 22:33:03 2011
New Revision: 1074329

URL: http://svn.apache.org/viewvc?rev=1074329&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Operation.java
      - copied unchanged from r1074322, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Operation.java
Removed:
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/bin/cassandra.bat
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/LatencyTracker.java
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7:1026516-1073884
+/cassandra/branches/cassandra-0.7:1026516-1074322
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Feb 24 22:33:03 2011
@@ -37,7 +37,14 @@
    modification operations (CASSANDRA-2222)
  * fix for reversed slice queries on large rows (CASSANDRA-2212)
  * fat clients were writing local data (CASSANDRA-2223)
- * update memtable_throughput to be a long (CASSANDRA-2158)
+ * turn off string interning in json2sstable (CASSANDRA-2189)
+ * set DEFAULT_MEMTABLE_LIFETIME_IN_MINS to 24h
+ * improve detection and cleanup of partially-written sstables 
+   (CASSANDRA-2206)
+ * fix supercolumn de/serialization when subcolumn comparator is different
+   from supercolumn's (CASSANDRA-2104)
+ * fix starting up on Windows when CASSANDRA_HOME contains whitespace
+   (CASSANDRA-2237)
 
 
 0.7.2

Modified: cassandra/trunk/bin/cassandra.bat
URL: http://svn.apache.org/viewvc/cassandra/trunk/bin/cassandra.bat?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/bin/cassandra.bat (original)
+++ cassandra/trunk/bin/cassandra.bat Thu Feb 24 22:33:03 2011
@@ -43,7 +43,7 @@ set JAVA_OPTS=^
 REM ***** CLASSPATH library setting *****
 
 REM Ensure that any user defined CLASSPATH variables are not used on startup
-set CLASSPATH=%CASSANDRA_HOME%\conf
+set CLASSPATH="%CASSANDRA_HOME%\conf"
 
 REM For each jar in the CASSANDRA_HOME lib directory call append to build the CLASSPATH variable.
 for %%i in ("%CASSANDRA_HOME%\lib\*.jar") do call :append "%%i"

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1073884
+/cassandra/branches/cassandra-0.7/contrib:1026516-1074322
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Thu Feb 24 22:33:03 2011
@@ -20,8 +20,8 @@ package org.apache.cassandra.contrib.str
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.cli.*;
 
@@ -38,9 +38,9 @@ public class Session
     // command line options
     public static final Options availableOptions = new Options();
 
-    public final AtomicIntegerArray operationCount;
-    public final AtomicIntegerArray keyCount;
-    public final AtomicLongArray latencies;
+    public final AtomicInteger operations;
+    public final AtomicInteger keys;
+    public final AtomicLong    latency;
 
     static
     {
@@ -93,7 +93,7 @@ public class Session
     private PrintStream out = System.out;
 
     private IndexType indexType = null;
-    private Stress.Operation operation = Stress.Operation.INSERT;
+    private Stress.Operations operation = Stress.Operations.INSERT;
     private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
     private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
     private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
@@ -183,7 +183,7 @@ public class Session
                 unframed = Boolean.parseBoolean(cmd.getOptionValue("m"));
 
             if (cmd.hasOption("o"))
-                operation = Stress.Operation.valueOf(cmd.getOptionValue("o").toUpperCase());
+                operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase());
 
             if (cmd.hasOption("u"))
                 superColumns = Integer.parseInt(cmd.getOptionValue("u"));
@@ -248,9 +248,9 @@ public class Session
         mean  = numKeys / 2;
         sigma = numKeys * STDev;
 
-        operationCount = new AtomicIntegerArray(threads);
-        keyCount = new AtomicIntegerArray(threads);
-        latencies = new AtomicLongArray(threads);
+        operations = new AtomicInteger();
+        keys = new AtomicInteger();
+        latency = new AtomicLong();
     }
 
     public int getCardinality()
@@ -323,7 +323,7 @@ public class Session
         return ignoreErrors;
     }
 
-    public Stress.Operation getOperation()
+    public Stress.Operations getOperation()
     {
         return operation;
     }

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java Thu Feb 24 22:33:03 2011
@@ -18,15 +18,18 @@
 package org.apache.cassandra.contrib.stress;
 
 import org.apache.cassandra.contrib.stress.operations.*;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
+import org.apache.cassandra.thrift.Cassandra;
 import org.apache.commons.cli.Option;
 
 import java.io.PrintStream;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 
 public final class Stress
 {
-    public static enum Operation
+    public static enum Operations
     {
         INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET
     }
@@ -34,9 +37,15 @@ public final class Stress
     public static Session session;
     public static Random randomizer = new Random();
 
+    /**
+     * Producer-Consumer model: 1 producer, N consumers
+     */
+    private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+
     public static void main(String[] arguments) throws Exception
     {
-        int epoch, total, oldTotal, latency, keyCount, oldKeyCount, oldLatency;
+        long latency, oldLatency;
+        int epoch, total, oldTotal, keyCount, oldKeyCount;
 
         try
         {
@@ -49,51 +58,52 @@ public final class Stress
         }
 
         // creating keyspace and column families
-        if (session.getOperation() == Stress.Operation.INSERT)
+        if (session.getOperation() == Stress.Operations.INSERT)
         {
             session.createKeySpaces();
         }
 
         int threadCount  = session.getThreads();
-        Thread[] threads = new Thread[threadCount];
-        PrintStream out  = session.getOutputStream();
+        Thread[] consumers = new Thread[threadCount];
+        PrintStream out = session.getOutputStream();
+
+        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+
+        int itemsPerThread = session.getKeysPerThread();
+        int modulo = session.getNumKeys() % threadCount;
 
         // creating required type of the threads for the test
-        try
-        {
-            for (int i = 0; i < threadCount; i++)
-            {
-                threads[i] = createOperation(i);
-            }
-        }
-        catch (Exception e)
+        for (int i = 0; i < threadCount; i++)
         {
-            System.err.println(e.getMessage());
-            return;
+            if (i == threadCount - 1)
+                itemsPerThread += modulo; // last one is going to handle N + modulo items
+
+            consumers[i] = new Consumer(itemsPerThread);
         }
 
+        new Producer().start();
+
         // starting worker threads
         for (int i = 0; i < threadCount; i++)
         {
-            threads[i].start();
+            consumers[i].start();
         }
 
         // initialization of the values
         boolean terminate = false;
-        epoch = total = latency = keyCount = 0;
+        latency = 0;
+        epoch = total = keyCount = 0;
 
         int interval = session.getProgressInterval();
         int epochIntervals = session.getProgressInterval() * 10;
         long testStartTime = System.currentTimeMillis();
 
-        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
-
         while (!terminate)
         {
             Thread.sleep(100);
 
             int alive = 0;
-            for (Thread thread : threads)
+            for (Thread thread : consumers)
                 if (thread.isAlive()) alive++;
 
             if (alive == 0)
@@ -109,20 +119,9 @@ public final class Stress
                 oldLatency  = latency;
                 oldKeyCount = keyCount;
 
-                int currentTotal = 0, currentKeyCount = 0, currentLatency = 0;
-
-                for (Thread t : threads)
-                {
-                    OperationThread thread = (OperationThread) t;
-
-                    currentTotal    += session.operationCount.get(thread.index);
-                    currentKeyCount += session.keyCount.get(thread.index);
-                    currentLatency  += session.latencies.get(thread.index);
-                }
-
-                total    = currentTotal;
-                keyCount = currentKeyCount;
-                latency  = currentLatency;
+                total    = session.operations.get();
+                keyCount = session.keys.get();
+                latency  = session.latency.get();
 
                 int opDelta  = total - oldTotal;
                 int keyDelta = keyCount - oldKeyCount;
@@ -136,7 +135,7 @@ public final class Stress
         }
     }
 
-    private static Thread createOperation(int index)
+    private static Operation createOperation(int index)
     {
         switch (session.getOperation())
         {
@@ -174,4 +173,58 @@ public final class Stress
                                                             option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
         }
     }
+
+    /**
+     * Produces exactly N items (awaits each to be consumed)
+     */
+    private static class Producer extends Thread
+    {
+        public void run()
+        {
+            for (int i = 0; i < session.getNumKeys(); i++)
+            {
+                try
+                {
+                    operations.put(createOperation(i));
+                }
+                catch (InterruptedException e)
+                {
+                    System.err.println("Producer error - " + e.getMessage());
+                    return;
+                }
+            }
+        }
+    }
+
+    /**
+     * Each consumes exactly N items from queue
+     */
+    private static class Consumer extends Thread
+    {
+        private final int items;
+
+        public Consumer(int toConsume)
+        {
+            items = toConsume;
+        }
+
+        public void run()
+        {
+            Cassandra.Client client = session.getClient();
+
+            for (int i = 0; i < items; i++)
+            {
+                try
+                {
+                    operations.take().run(client); // running job
+                }
+                catch (Exception e)
+                {
+                    System.err.println(e.getMessage());
+                    System.exit(-1);
+                }
+            }
+        }
+    }
+
 }

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Thu Feb 24 22:33:03 2011
@@ -17,23 +17,23 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
-public class IndexedRangeSlicer extends OperationThread
+public class IndexedRangeSlicer extends Operation
 {
     public IndexedRangeSlicer(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         String format = "%0" + session.getTotalKeysLength() + "d";
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
@@ -46,64 +46,59 @@ public class IndexedRangeSlicer extends 
 
         ByteBuffer columnName = ByteBuffer.wrap("C1".getBytes());
 
-        for (int i = range.begins(); i < range.size(); i++)
-        {
-            int received = 0;
+        int received = 0;
 
-            String startOffset = "0";
-            ByteBuffer value = ByteBuffer.wrap(values.get(i % values.size()).getBytes());
+        String startOffset = "0";
+        ByteBuffer value = ByteBuffer.wrap(values.get(index % values.size()).getBytes());
 
-            IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
+        IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
 
-            while (received < expectedPerValue)
-            {
-                IndexClause clause = new IndexClause(Arrays.asList(expression), ByteBuffer.wrap(startOffset.getBytes()),
-                                                                                session.getKeysPerCall());
+        while (received < expectedPerValue)
+        {
+            IndexClause clause = new IndexClause(Arrays.asList(expression),
+                                                 ByteBuffer.wrap(startOffset.getBytes()),
+                                                 session.getKeysPerCall());
 
-                List<KeySlice> results = null;
-                long start = System.currentTimeMillis();
+            List<KeySlice> results = null;
+            long start = System.currentTimeMillis();
 
-                boolean success = false;
-                String exceptionMessage = null;
+            boolean success = false;
+            String exceptionMessage = null;
 
-                for (int t = 0; t < session.getRetryTimes(); t++)
-                {
-                    if (success)
-                        break;
+            for (int t = 0; t < session.getRetryTimes(); t++)
+            {
+                if (success)
+                    break;
 
-                    try
-                    {
-                        results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
-                        success = (results.size() != 0);
-                    }
-                    catch (Exception e)
-                    {
-                        exceptionMessage = getExceptionMessage(e);
-                        success = false;
-                    }
+                try
+                {
+                    results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+                    success = (results.size() != 0);
                 }
-
-                if (!success)
+                catch (Exception e)
                 {
-                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
-                                                                                              index,
-                                                                                              session.getRetryTimes(),
-                                                                                              startOffset,
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
                 }
+            }
+
+            if (!success)
+            {
+                error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    startOffset,
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
 
-                received += results.size();
+            received += results.size();
 
-                // convert max key found back to an integer, and increment it
-                startOffset = String.format(format, (1 + getMaxKey(results)));
+            // convert max key found back to an integer, and increment it
+            startOffset = String.format(format, (1 + getMaxKey(results)));
 
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndAdd(index, results.size());
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-            }
+            session.operations.getAndIncrement();
+            session.keys.getAndAdd(results.size());
+            session.latency.getAndAdd(System.currentTimeMillis() - start);
         }
     }
 

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Thu Feb 24 22:33:03 2011
@@ -17,26 +17,27 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class Inserter extends OperationThread
+public class Inserter extends Operation
 {
+
     public Inserter(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         List<String> values  = generateValues();
         List<Column> columns = new ArrayList<Column>();
@@ -48,8 +49,10 @@ public class Inserter extends OperationT
         // columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
         for (int i = 0; i < session.getColumnsPerKey(); i++)
         {
-            byte[] columnName = ("C" + Integer.toString(i)).getBytes();
-            columns.add(new Column(ByteBuffer.wrap(columnName), ByteBuffer.wrap(new byte[] {}), System.currentTimeMillis()));
+            String columnName  = ("C" + Integer.toString(i));
+            String columnValue = values.get(index % values.size());
+
+            columns.add(new Column(ByteBufferUtil.bytes(columnName), ByteBufferUtil.bytes(columnValue), System.currentTimeMillis()));
         }
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
@@ -62,58 +65,47 @@ public class Inserter extends OperationT
             }
         }
 
-        for (int i : range)
-        {
-            String rawKey = String.format(format, i);
-            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
-            Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-
-            record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
-                                                          ? getSuperColumnsMutationMap(superColumns)
-                                                          : getColumnsMutationMap(columns));
+        String rawKey = String.format(format, index);
+        Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
 
-            String value = values.get(i % values.size());
+        record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super
+                                                                                ? getSuperColumnsMutationMap(superColumns)
+                                                                                : getColumnsMutationMap(columns));
 
-            for (Column c : columns)
-                c.value = ByteBuffer.wrap(value.getBytes());
+        long start = System.currentTimeMillis();
 
-            long start = System.currentTimeMillis();
+        boolean success = false;
+        String exceptionMessage = null;
 
-            boolean success = false;
-            String exceptionMessage = null;
+        for (int t = 0; t < session.getRetryTimes(); t++)
+        {
+            if (success)
+                break;
 
-            for (int t = 0; t < session.getRetryTimes(); t++)
+            try
             {
-                if (success)
-                    break;
-
-                try
-                {
-                    client.batch_mutate(record, session.getConsistencyLevel());
-                    success = true;
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
+                client.batch_mutate(record, session.getConsistencyLevel());
+                success = true;
             }
-
-            if (!success)
+            catch (Exception e)
             {
-                System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
-                                                                                                session.getRetryTimes(),
-                                                                                                rawKey,
-                                                                                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                if (!session.ignoreErrors())
-                    break;
+                exceptionMessage = getExceptionMessage(e);
+                success = false;
             }
+        }
 
-            session.operationCount.getAndIncrement(index);
-            session.keyCount.getAndIncrement(index);
-            session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+        if (!success)
+        {
+            error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n",
+                                index,
+                                session.getRetryTimes(),
+                                rawKey,
+                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
         }
+
+        session.operations.getAndIncrement();
+        session.keys.getAndIncrement();
+        session.latency.getAndAdd(System.currentTimeMillis() - start);
     }
 
     private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns)

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Thu Feb 24 22:33:03 2011
@@ -17,90 +17,39 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class MultiGetter extends OperationThread
+public class MultiGetter extends Operation
 {
     public MultiGetter(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
                                                                                       ByteBuffer.wrap(new byte[] {}),
                                                                                       false, session.getColumnsPerKey()));
 
         int offset = index * session.getKeysPerThread();
-        Map<ByteBuffer,List<ColumnOrSuperColumn>> results = null;
-        int count  = (((index + 1) * session.getKeysPerThread()) - offset) / session.getKeysPerCall();
+        Map<ByteBuffer,List<ColumnOrSuperColumn>> results;
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
-            for (int i = 0; i < count; i++)
-            {
-                List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
-                for (int j = 0; j < session.getSuperColumns(); j++)
-                {
-                    ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
-
-                    long start = System.currentTimeMillis();
-
-                    boolean success = false;
-                    String exceptionMessage = null;
-
-                    for (int t = 0; t < session.getRetryTimes(); t++)
-                    {
-                        if (success)
-                            break;
-
-                        try
-                        {
-                            results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
-                            success = (results.size() != 0);
-                        }
-                        catch (Exception e)
-                        {
-                            exceptionMessage = getExceptionMessage(e);
-                        }
-                    }
-
-                    if (!success)
-                    {
-                        System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
-                                                                                              index,
-                                                                                              session.getRetryTimes(),
-                                                                                              keys,
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                        if (!session.ignoreErrors())
-                            return;
-                    }
-
-                    session.operationCount.getAndIncrement(index);
-                    session.keyCount.getAndAdd(index, keys.size());
-                    session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-
-                    offset += session.getKeysPerCall();
-                }
-            }
-        }
-        else
-        {
-            ColumnParent parent = new ColumnParent("Standard1");
+            List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
 
-            for (int i = 0; i < count; i++)
+            for (int j = 0; j < session.getSuperColumns(); j++)
             {
-                List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+                ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
 
                 long start = System.currentTimeMillis();
 
@@ -120,29 +69,68 @@ public class MultiGetter extends Operati
                     catch (Exception e)
                     {
                         exceptionMessage = getExceptionMessage(e);
-                        success = false;
                     }
                 }
 
                 if (!success)
                 {
-                    System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
-                                                                                        index,
-                                                                                        session.getRetryTimes(),
-                                                                                        keys,
-                                                                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
+                    error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                        index,
+                                        session.getRetryTimes(),
+                                        keys,
+                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
                 }
 
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndAdd(index, keys.size());
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+                session.operations.getAndIncrement();
+                session.keys.getAndAdd(keys.size());
+                session.latency.getAndAdd(System.currentTimeMillis() - start);
 
                 offset += session.getKeysPerCall();
             }
         }
+        else
+        {
+            ColumnParent parent = new ColumnParent("Standard1");
+
+            List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+
+            long start = System.currentTimeMillis();
+
+            boolean success = false;
+            String exceptionMessage = null;
+
+            for (int t = 0; t < session.getRetryTimes(); t++)
+            {
+                if (success)
+                    break;
+
+                try
+                {
+                    results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                    success = (results.size() != 0);
+                }
+                catch (Exception e)
+                {
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
+                }
+            }
+
+            if (!success)
+            {
+                error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    keys,
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
+
+            session.operations.getAndIncrement();
+            session.keys.getAndAdd(keys.size());
+            session.latency.getAndAdd(System.currentTimeMillis() - start);
+
+            offset += session.getKeysPerCall();
+        }
     }
 
     private List<ByteBuffer> generateKeys(int start, int limit)

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Thu Feb 24 22:33:03 2011
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class RangeSlicer extends OperationThread
+public class RangeSlicer extends Operation
 {
 
     public RangeSlicer(int index)
@@ -33,87 +35,29 @@ public class RangeSlicer extends Operati
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         String format = "%0" + session.getTotalKeysLength() + "d";
 
         // initial values
-        int current = range.begins();
-        int limit   = range.limit();
-        int count   = session.getColumnsPerKey();
-        int last    = current + session.getKeysPerCall();
+        int count = session.getColumnsPerKey();
 
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[] {}),
                                                                                       ByteBuffer.wrap(new byte[] {}),
-                                                                                      false, count));
+                                                                                      false,
+                                                                                      count));
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
-            while (current < limit)
-            {
-                byte[] start = String.format(format, current).getBytes();
-                byte[] end   = String.format(format, last).getBytes();
-
-                List<KeySlice> slices = new ArrayList<KeySlice>();
-                KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
-
-                for (int i = 0; i < session.getSuperColumns(); i++)
-                {
-                    String superColumnName = "S" + Integer.toString(i);
-                    ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
-
-                    long startTime = System.currentTimeMillis();
-
-                    boolean success = false;
-                    String exceptionMessage = null;
-
-                    for (int t = 0; t < session.getRetryTimes(); t++)
-                    {
-                        try
-                        {
-                            slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
-                            success = (slices.size() != 0);
-                        }
-                        catch (Exception e)
-                        {
-                            exceptionMessage = getExceptionMessage(e);
-                            success = false;
-                        }
-                    }
-
-                    if (!success)
-                    {
-                        System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
-                                                                                            index,
-                                                                                            session.getRetryTimes(),
-                                                                                            new String(start),
-                                                                                            new String(end),
-                                                                                            (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+            byte[] start = String.format(format, index).getBytes();
 
-                        if (!session.ignoreErrors())
-                            return;
-                    }
-
-                    session.operationCount.getAndIncrement(index);
-                    session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
-                }
+            List<KeySlice> slices = new ArrayList<KeySlice>();
+            KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
-                current += slices.size() + 1;
-                last = current + slices.size() + 1;
-                session.keyCount.getAndAdd(index, slices.size());
-            }
-        }
-        else
-        {
-            ColumnParent parent = new ColumnParent("Standard1");
-
-            while (current < limit)
+            for (int i = 0; i < session.getSuperColumns(); i++)
             {
-                byte[] start = String.format(format, current).getBytes();
-                byte[] end   = String.format(format, last).getBytes();
-
-                List<KeySlice> slices = new ArrayList<KeySlice>();
-                KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
+                String superColumnName = "S" + Integer.toString(i);
+                ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
 
                 long startTime = System.currentTimeMillis();
 
@@ -122,9 +66,6 @@ public class RangeSlicer extends Operati
 
                 for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    if (success)
-                        break;
-
                     try
                     {
                         slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
@@ -139,24 +80,62 @@ public class RangeSlicer extends Operati
 
                 if (!success)
                 {
-                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
-                                                                                              index,
-                                                                                              session.getRetryTimes(),
-                                                                                              new String(start),
-                                                                                              new String(end),
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
+                    error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n",
+                                        index,
+                                        session.getRetryTimes(),
+                                        new String(start),
+                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
                 }
 
-                current += slices.size() + 1;
-                last = current + slices.size() + 1;
+                session.operations.getAndIncrement();
+                session.latency.getAndAdd(System.currentTimeMillis() - startTime);
+            }
+
+            session.keys.getAndAdd(slices.size());
+        }
+        else
+        {
+            ColumnParent parent = new ColumnParent("Standard1");
+
+            byte[] start = String.format(format, index).getBytes();
+
+            List<KeySlice> slices = new ArrayList<KeySlice>();
+            KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+
+            long startTime = System.currentTimeMillis();
+
+            boolean success = false;
+            String exceptionMessage = null;
 
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndAdd(index, slices.size());
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
+            for (int t = 0; t < session.getRetryTimes(); t++)
+            {
+                if (success)
+                    break;
+
+                try
+                {
+                    slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                    success = (slices.size() != 0);
+                }
+                catch (Exception e)
+                {
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
+                }
             }
+
+            if (!success)
+            {
+                error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    new String(start),
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
+
+            session.operations.getAndIncrement();
+            session.keys.getAndAdd(slices.size());
+            session.latency.getAndAdd(System.currentTimeMillis() - startTime);
         }
     }
 }

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Thu Feb 24 22:33:03 2011
@@ -17,25 +17,22 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 import java.io.IOException;
-import java.lang.AssertionError;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.List;
 
-public class Reader extends OperationThread
+public class Reader extends Operation
 {
     public Reader(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         SliceRange sliceRange = new SliceRange();
 
@@ -50,75 +47,23 @@ public class Reader extends OperationThr
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
-            runSuperColumnReader(predicate);
+            runSuperColumnReader(predicate, client);
         }
         else
         {
-            runColumnReader(predicate);
+            runColumnReader(predicate, client);
         }
     }
 
-    private void runSuperColumnReader(SlicePredicate predicate)
+    private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
     {
-        for (int i = 0; i < session.getKeysPerThread(); i++)
-        {
-            byte[] rawKey = generateKey();
-            ByteBuffer key = ByteBuffer.wrap(rawKey);
-
-            for (int j = 0; j < session.getSuperColumns(); j++)
-            {
-                String superColumn = 'S' + Integer.toString(j);
-                ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
-
-                long start = System.currentTimeMillis();
-
-                boolean success = false;
-                String exceptionMessage = null;
-
-                for (int t = 0; t < session.getRetryTimes(); t++)
-                {
-                    if (success)
-                        break;
-
-                    try
-                    {
-                        List<ColumnOrSuperColumn> columns;
-                        columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
-                        success = (columns.size() != 0);
-                    }
-                    catch (Exception e)
-                    {
-                        exceptionMessage = getExceptionMessage(e);
-                        success = false;
-                    }
-                }
-
-                if (!success)
-                {
-                    System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
-                                                                                                  session.getRetryTimes(),
-                                                                                                  new String(rawKey),
-                                                                                                  (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
-                }
-
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndIncrement(index);
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-            }
-        }
-    }
-
-    private void runColumnReader(SlicePredicate predicate)
-    {
-        ColumnParent parent = new ColumnParent("Standard1");
+        byte[] rawKey = generateKey();
+        ByteBuffer key = ByteBuffer.wrap(rawKey);
 
-        for (int i = 0; i < session.getKeysPerThread(); i++)
+        for (int j = 0; j < session.getSuperColumns(); j++)
         {
-            byte[] key = generateKey();
-            ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+            String superColumn = 'S' + Integer.toString(j);
+            ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
 
             long start = System.currentTimeMillis();
 
@@ -133,7 +78,7 @@ public class Reader extends OperationThr
                 try
                 {
                     List<ColumnOrSuperColumn> columns;
-                    columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                    columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
                     success = (columns.size() != 0);
                 }
                 catch (Exception e)
@@ -145,18 +90,61 @@ public class Reader extends OperationThr
 
             if (!success)
             {
-                System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
-                                                                                              session.getRetryTimes(),
-                                                                                              new String(key),
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+                error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    new String(rawKey),
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
+
+            session.operations.getAndIncrement();
+            session.keys.getAndIncrement();
+            session.latency.getAndAdd(System.currentTimeMillis() - start);
+        }
+    }
+
+    private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
+    {
+        ColumnParent parent = new ColumnParent("Standard1");
+
+        byte[] key = generateKey();
+        ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+
+        long start = System.currentTimeMillis();
+
+        boolean success = false;
+        String exceptionMessage = null;
 
-                if (!session.ignoreErrors())
-                    return;
+        for (int t = 0; t < session.getRetryTimes(); t++)
+        {
+            if (success)
+                break;
+
+            try
+            {
+                List<ColumnOrSuperColumn> columns;
+                columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                success = (columns.size() != 0);
+            }
+            catch (Exception e)
+            {
+                exceptionMessage = getExceptionMessage(e);
+                success = false;
             }
+        }
 
-            session.operationCount.getAndIncrement(index);
-            session.keyCount.getAndIncrement(index);
-            session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+        if (!success)
+        {
+            error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+                                index,
+                                session.getRetryTimes(),
+                                new String(key),
+                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
         }
+
+        session.operations.getAndIncrement();
+        session.keys.getAndIncrement();
+        session.latency.getAndAdd(System.currentTimeMillis() - start);
     }
+
 }

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1074322
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1074322
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1074322
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1074322
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1074322
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Feb 24 22:33:03 2011
@@ -57,7 +57,7 @@ public final class CFMetaData
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
     public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
-    public final static int DEFAULT_MEMTABLE_LIFETIME_IN_MINS = 60;
+    public final static int DEFAULT_MEMTABLE_LIFETIME_IN_MINS = 60 * 24;
     public final static int DEFAULT_MEMTABLE_THROUGHPUT_IN_MB = sizeMemtableThroughput();
     public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS = sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Thu Feb 24 22:33:03 2011
@@ -125,8 +125,7 @@ public class BinaryMemtable implements I
     private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
     {
         logger.info("Writing " + this);
-        String path = cfs.getFlushPath();
-        SSTableWriter writer = new SSTableWriter(path, sortedKeys.size(), cfs.metadata, cfs.partitioner);
+        SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size());
 
         for (DecoratedKey key : sortedKeys)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Feb 24 22:33:03 2011
@@ -450,7 +450,7 @@ public class ColumnFamilyStore implement
         long count = 0;
         for (SSTableReader sstable : ssTables)
         {
-            sum += sstable.getEstimatedRowSize().median();
+            sum += sstable.getEstimatedRowSize().mean();
             count++;
         }
         return count > 0 ? sum / count : 0;
@@ -462,7 +462,7 @@ public class ColumnFamilyStore implement
         int count = 0;
         for (SSTableReader sstable : ssTables)
         {
-            sum += sstable.getEstimatedColumnCount().median();
+            sum += sstable.getEstimatedColumnCount().mean();
             count++;
         }
         return count > 0 ? (int) (sum / count) : 0;
@@ -1073,12 +1073,12 @@ public class ColumnFamilyStore implement
 
     public long[] getRecentSSTablesPerReadHistogram()
     {
-        return recentSSTablesPerRead.get(true);
+        return recentSSTablesPerRead.getBuckets(true);
     }
 
     public long[] getSSTablesPerReadHistogram()
     {
-        return sstablesPerRead.get(false);
+        return sstablesPerRead.getBuckets(false);
     }
 
     public long getReadCount()
@@ -2052,7 +2052,7 @@ public class ColumnFamilyStore implement
 
         for (SSTableReader sstable : ssTables)
         {
-            long[] rowSize = sstable.getEstimatedRowSize().get(false);
+            long[] rowSize = sstable.getEstimatedRowSize().getBuckets(false);
 
             for (int i = 0; i < histogram.length; i++)
                 histogram[i] += rowSize[i];
@@ -2067,7 +2067,7 @@ public class ColumnFamilyStore implement
 
         for (SSTableReader sstable : ssTables)
         {
-            long[] columnSize = sstable.getEstimatedColumnCount().get(false);
+            long[] columnSize = sstable.getEstimatedColumnCount().getBuckets(false);
 
             for (int i = 0; i < histogram.length; i++)
                 histogram[i] += columnSize[i];
@@ -2160,4 +2160,14 @@ public class ColumnFamilyStore implement
 
         return intern(name);
     }
+
+    public SSTableWriter createFlushWriter(long estimatedRows) throws IOException
+    {
+        return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner);
+    }
+
+    public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException
+    {
+        return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu Feb 24 22:33:03 2011
@@ -445,8 +445,7 @@ public class CompactionManager implement
                 return 0;
             }
 
-            String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
-            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner);
+            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation);
             while (nni.hasNext())
             {
                 AbstractCompactedRow row = nni.next();
@@ -507,15 +506,13 @@ public class CompactionManager implement
     private void doScrub(ColumnFamilyStore cfs) throws IOException
     {
         assert !cfs.isIndex();
-        Table table = cfs.table;
-        Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
 
         for (final SSTableReader sstable : cfs.getSSTables())
         {
             logger.info("Scrubbing " + sstable);
 
             // Calculate the expected compacted filesize
-            String compactionFileLocation = table.getDataFileLocation(sstable.length());
+            String compactionFileLocation = cfs.table.getDataFileLocation(sstable.length());
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
@@ -707,8 +704,7 @@ public class CompactionManager implement
         if (writer == null)
         {
             FileUtils.createDirectory(compactionFileLocation);
-            String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
-            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner);
+            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation);
         }
         return writer;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Feb 24 22:33:03 2011
@@ -155,7 +155,7 @@ public class Memtable implements Compara
     private SSTableReader writeSortedContents() throws IOException
     {
         logger.info("Writing " + this);
-        SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), cfs.metadata, cfs.partitioner);
+        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size());
 
         for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
             writer.append(entry.getKey(), entry.getValue());

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java Thu Feb 24 22:33:03 2011
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Comparator;
 
 import org.apache.cassandra.db.IColumn;
+import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 
 /**
  * Specifies a Comparator for a specific type of ByteBuffer.
@@ -37,6 +38,28 @@ import org.apache.cassandra.db.IColumn;
  */
 public abstract class AbstractType implements Comparator<ByteBuffer>
 {
+    public final Comparator<IndexInfo> indexComparator;
+    public final Comparator<IndexInfo> indexReverseComparator;
+
+    protected AbstractType()
+    {
+        final AbstractType that = this;
+        indexComparator = new Comparator<IndexInfo>()
+        {
+            public int compare(IndexInfo o1, IndexInfo o2)
+            {
+                return that.compare(o1.lastName, o2.lastName);
+            }
+        };
+        indexReverseComparator = new Comparator<IndexInfo>()
+        {
+            public int compare(IndexInfo o1, IndexInfo o2)
+            {
+                return that.compare(o1.firstName, o2.firstName);
+            }
+        };
+    }
+
     /** get a string representation of the bytes suitable for log messages */
     public abstract String getString(ByteBuffer bytes);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Thu Feb 24 22:33:03 2011
@@ -151,21 +151,7 @@ public class IndexHelper
 
     public static Comparator<IndexInfo> getComparator(final AbstractType nameComparator, boolean reversed)
     {
-        return reversed
-              ? new Comparator<IndexInfo>()
-                {
-                    public int compare(IndexInfo o1, IndexInfo o2)
-                    {
-                        return nameComparator.compare(o1.firstName, o2.firstName);
-                    }
-                }
-              : new Comparator<IndexInfo>()
-                {
-                    public int compare(IndexInfo o1, IndexInfo o2)
-                    {
-                        return nameComparator.compare(o1.lastName, o2.lastName);
-                    }
-                };
+        return reversed ? nameComparator.indexReverseComparator : nameComparator.indexComparator;
     }
 
     public static class IndexInfo

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Feb 24 22:33:03 2011
@@ -587,7 +587,7 @@ public class SSTableReader extends SSTab
     {
         return metadata.cfType == ColumnFamilyType.Standard
                ? Column.serializer()
-               : SuperColumn.serializer(getColumnComparator());
+               : SuperColumn.serializer(metadata.subcolumnComparator);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Feb 24 22:33:03 2011
@@ -22,9 +22,12 @@ package org.apache.cassandra.io.sstable;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
+
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -206,8 +209,10 @@ public class SSTableWriter extends SSTab
         Descriptor newdesc = tmpdesc.asTemporary(false);
         try
         {
-            for (Component component : components)
+            // do -Data last because -Data present should mean the sstable was completely renamed before crash
+            for (Component component : Sets.difference(components, Collections.singleton(Component.DATA)))
                 FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+            FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Thu Feb 24 22:33:03 2011
@@ -58,11 +58,6 @@ public class BufferedRandomAccessFile ex
     // `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer
     private long bufferOffset, bufferEnd, current = 0;
 
-    // max buffer size is set according to (int size) parameter in the
-    // constructor
-    // or in directIO() method to the DEFAULT_DIRECT_BUFFER_SIZE
-    private long maxBufferSize;
-
     // constant, used for caching purpose, -1 if file is open in "rw" mode
     // otherwise this will hold cached file length
     private final long fileLength;
@@ -88,7 +83,7 @@ public class BufferedRandomAccessFile ex
      */
     public BufferedRandomAccessFile(String name, String mode) throws IOException
     {
-        this(new File(name), mode, 0);
+        this(new File(name), mode, DEFAULT_BUFFER_SIZE);
     }
 
     public BufferedRandomAccessFile(String name, String mode, int bufferSize) throws IOException
@@ -103,7 +98,7 @@ public class BufferedRandomAccessFile ex
      */
     public BufferedRandomAccessFile(File file, String mode) throws IOException
     {
-        this(file, mode, 0);
+        this(file, mode, DEFAULT_BUFFER_SIZE);
     }
 
     public BufferedRandomAccessFile(File file, String mode, int bufferSize) throws IOException
@@ -120,10 +115,10 @@ public class BufferedRandomAccessFile ex
         channel = super.getChannel();
         filePath = file.getAbsolutePath();
 
-        maxBufferSize = Math.max(bufferSize, DEFAULT_BUFFER_SIZE);
-
         // allocating required size of the buffer
-        buffer = ByteBuffer.allocate((int) maxBufferSize);
+        if (bufferSize <= 0)
+            throw new IllegalArgumentException("bufferSize must be positive");
+        buffer = ByteBuffer.allocate(bufferSize);
 
         // if in read-only mode, caching file size
         fileLength = (mode.equals("r")) ? this.channel.size() : -1;
@@ -211,7 +206,7 @@ public class BufferedRandomAccessFile ex
         channel.position(bufferOffset); // setting channel position
         long bytesRead = channel.read(buffer); // reading from that position
 
-        hitEOF = (bytesRead < maxBufferSize); // buffer is not fully loaded with
+        hitEOF = (bytesRead < buffer.capacity()); // buffer is not fully loaded with
                                               // data
         bufferEnd = bufferOffset + bytesRead;
 
@@ -284,13 +279,11 @@ public class BufferedRandomAccessFile ex
         if (length > bufferEnd && hitEOF)
             return -1;
 
-        final int left = (int) maxBufferSize - buffer.position();
+        final int left = buffer.capacity() - buffer.position();
         if (current < bufferOffset || left < length)
-        {
             reBuffer();
-        }
 
-        length = Math.min(length, (int) (maxBufferSize - buffer.position()));
+        length = Math.min(length, buffer.capacity() - buffer.position());
         buffer.get(buff, offset, length);
         current += length;
 
@@ -341,15 +334,13 @@ public class BufferedRandomAccessFile ex
      */
     private int writeAtMost(byte[] buff, int offset, int length) throws IOException
     {
-        final int left = (int) maxBufferSize - buffer.position();
+        final int left = buffer.capacity() - buffer.position();
         if (current < bufferOffset || left < length)
-        {
             reBuffer();
-        }
 
         // logic is the following: we need to add bytes to the end of the buffer
         // starting from current buffer position and return this length
-        length = Math.min(length, (int) (maxBufferSize - buffer.position()));
+        length = Math.min(length, buffer.capacity() - buffer.position());
 
         buffer.put(buff, offset, length);
         current += length;

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Feb 24 22:33:03 2011
@@ -39,151 +39,180 @@ import org.apache.thrift.transport.TTran
 
 /**
  * Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
  * This allows passing an executor so you have more control over the actual
  * behaviour of the tasks being run.
- *
+ * <p/>
  * Newer version of Thrift should make this obsolete.
  */
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
 
-private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
 
-// Executor service for handling client connections
-private ExecutorService executorService_;
+    // Executor service for handling client connections
+    private ExecutorService executorService_;
 
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
-	public int minWorkerThreads = 5;
-	public int maxWorkerThreads = Integer.MAX_VALUE;
-	public int stopTimeoutVal = 60;
-	public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
-        TServerSocket tServerSocket,
-        TTransportFactory inTransportFactory,
-        TTransportFactory outTransportFactory,
-        TProtocolFactory tProtocolFactory,
-        TProtocolFactory tProtocolFactory2,
-        Options options,
-        ExecutorService executorService) {
-    
-    super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
-            tProtocolFactory, tProtocolFactory2);
-    options_ = options;
-    executorService_ = executorService;
-}
-
-
-public void serve() {
-	try {
-	serverTransport_.listen();
-	} catch (TTransportException ttx) {
-	LOGGER.error("Error occurred during listening.", ttx);
-	return;
-	}
-
-	stopped_ = false;
-	while (!stopped_) {
-	int failureCount = 0;
-	try {
-		TTransport client = serverTransport_.accept();
-		WorkerProcess wp = new WorkerProcess(client);
-		executorService_.execute(wp);
-	} catch (TTransportException ttx) {
-		if (!stopped_) {
-		++failureCount;
-		LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
-		}
-	}
-	}
-
-	executorService_.shutdown();
-
-	// Loop until awaitTermination finally does return without a interrupted
-	// exception. If we don't do this, then we'll shut down prematurely. We want
-	// to let the executorService clear it's task queue, closing client sockets
-	// appropriately.
-	long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
-	long now = System.currentTimeMillis();
-	while (timeoutMS >= 0) {
-	try {
-		executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
-		break;
-	} catch (InterruptedException ix) {
-		long newnow = System.currentTimeMillis();
-		timeoutMS -= (newnow - now);
-		now = newnow;
-	}
-	}
-}
-
-public void stop() {
-	stopped_ = true;
-	serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
-	/**
-	 * Client that this services.
-	 */
-	private TTransport client_;
-
-	/**
-	 * Default constructor.
-	 *
-	 * @param client Transport to process
-	 */
-	private WorkerProcess(TTransport client) {
-	client_ = client;
-	}
-
-	/**
-	 * Loops on processing a client forever
-	 */
-	public void run() {
-	TProcessor processor = null;
-	TTransport inputTransport = null;
-	TTransport outputTransport = null;
-	TProtocol inputProtocol = null;
-	TProtocol outputProtocol = null;
-	try {
-		processor = processorFactory_.getProcessor(client_);
-		inputTransport = inputTransportFactory_.getTransport(client_);
-		outputTransport = outputTransportFactory_.getTransport(client_);
-		inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-		outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-		// we check stopped_ first to make sure we're not supposed to be shutting
-		// down. this is necessary for graceful shutdown.
-		while (!stopped_ && processor.process(inputProtocol, outputProtocol)) 
-		{
-		    inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-		    outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-		}
-	} catch (TTransportException ttx) {
-		// Assume the client died and continue silently
-	} catch (TException tx) {
-		LOGGER.error("Thrift error occurred during processing of message.", tx);
-	} catch (Exception x) {
-		LOGGER.error("Error occurred during processing of message.", x);
-	}
-
-	if (inputTransport != null) {
-		inputTransport.close();
-	}
-
-	if (outputTransport != null) {
-		outputTransport.close();
-	}
-	}
-}
+    // Flag for stopping the server
+    private volatile boolean stopped_;
+
+    // Server options
+    private Options options_;
+
+    // Customizable server options
+    public static class Options
+    {
+        public int minWorkerThreads = 5;
+        public int maxWorkerThreads = Integer.MAX_VALUE;
+        public int stopTimeoutVal = 60;
+        public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    }
+
+
+    public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+                                   TServerSocket tServerSocket,
+                                   TTransportFactory inTransportFactory,
+                                   TTransportFactory outTransportFactory,
+                                   TProtocolFactory tProtocolFactory,
+                                   TProtocolFactory tProtocolFactory2,
+                                   Options options,
+                                   ExecutorService executorService)
+    {
+
+        super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
+              tProtocolFactory, tProtocolFactory2);
+        options_ = options;
+        executorService_ = executorService;
+    }
+
+
+    public void serve()
+    {
+        try
+        {
+            serverTransport_.listen();
+        }
+        catch (TTransportException ttx)
+        {
+            LOGGER.error("Error occurred during listening.", ttx);
+            return;
+        }
+
+        stopped_ = false;
+        while (!stopped_)
+        {
+            int failureCount = 0;
+            try
+            {
+                TTransport client = serverTransport_.accept();
+                WorkerProcess wp = new WorkerProcess(client);
+                executorService_.execute(wp);
+            }
+            catch (TTransportException ttx)
+            {
+                if (!stopped_)
+                {
+                    ++failureCount;
+                    LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+                }
+            }
+        }
+
+        executorService_.shutdown();
+
+        // Loop until awaitTermination finally does return without a interrupted
+        // exception. If we don't do this, then we'll shut down prematurely. We want
+        // to let the executorService clear it's task queue, closing client sockets
+        // appropriately.
+        long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+        long now = System.currentTimeMillis();
+        while (timeoutMS >= 0)
+        {
+            try
+            {
+                executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+                break;
+            }
+            catch (InterruptedException ix)
+            {
+                long newnow = System.currentTimeMillis();
+                timeoutMS -= (newnow - now);
+                now = newnow;
+            }
+        }
+    }
+
+    public void stop()
+    {
+        stopped_ = true;
+        serverTransport_.interrupt();
+    }
+
+    private class WorkerProcess implements Runnable
+    {
+
+        /**
+         * Client that this services.
+         */
+        private TTransport client_;
+
+        /**
+         * Default constructor.
+         *
+         * @param client Transport to process
+         */
+        private WorkerProcess(TTransport client)
+        {
+            client_ = client;
+        }
+
+        /**
+         * Loops on processing a client forever
+         */
+        public void run()
+        {
+            TProcessor processor = null;
+            TTransport inputTransport = null;
+            TTransport outputTransport = null;
+            TProtocol inputProtocol = null;
+            TProtocol outputProtocol = null;
+            try
+            {
+                processor = processorFactory_.getProcessor(client_);
+                inputTransport = inputTransportFactory_.getTransport(client_);
+                outputTransport = outputTransportFactory_.getTransport(client_);
+                inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+                outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+                // we check stopped_ first to make sure we're not supposed to be shutting
+                // down. this is necessary for graceful shutdown.
+                while (!stopped_ && processor.process(inputProtocol, outputProtocol))
+                {
+                    inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+                    outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+                }
+            }
+            catch (TTransportException ttx)
+            {
+                // Assume the client died and continue silently
+            }
+            catch (TException tx)
+            {
+                LOGGER.error("Thrift error occurred during processing of message.", tx);
+            }
+            catch (Exception x)
+            {
+                LOGGER.error("Error occurred during processing of message.", x);
+            }
+
+            if (inputTransport != null)
+            {
+                inputTransport.close();
+            }
+
+            if (outputTransport != null)
+            {
+                outputTransport.close();
+            }
+        }
+    }
 }