You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/24 23:33:05 UTC
svn commit: r1074329 [1/2] - in /cassandra/trunk: ./ bin/ contrib/
contrib/stress/src/org/apache/cassandra/contrib/stress/
contrib/stress/src/org/apache/cassandra/contrib/stress/operations/
contrib/stress/src/org/apache/cassandra/contrib/stress/util/ i...
Author: jbellis
Date: Thu Feb 24 22:33:03 2011
New Revision: 1074329
URL: http://svn.apache.org/viewvc?rev=1074329&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Operation.java
- copied unchanged from r1074322, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Operation.java
Removed:
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/bin/cassandra.bat
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
cassandra/trunk/src/java/org/apache/cassandra/utils/LatencyTracker.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7:1026516-1073884
+/cassandra/branches/cassandra-0.7:1026516-1074322
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Feb 24 22:33:03 2011
@@ -37,7 +37,14 @@
modification operations (CASSANDRA-2222)
* fix for reversed slice queries on large rows (CASSANDRA-2212)
* fat clients were writing local data (CASSANDRA-2223)
- * update memtable_throughput to be a long (CASSANDRA-2158)
+ * turn off string interning in json2sstable (CASSANDRA-2189)
+ * set DEFAULT_MEMTABLE_LIFETIME_IN_MINS to 24h
+ * improve detection and cleanup of partially-written sstables
+ (CASSANDRA-2206)
+ * fix supercolumn de/serialization when subcolumn comparator is different
+ from supercolumn's (CASSANDRA-2104)
+ * fix starting up on Windows when CASSANDRA_HOME contains whitespace
+ (CASSANDRA-2237)
0.7.2
Modified: cassandra/trunk/bin/cassandra.bat
URL: http://svn.apache.org/viewvc/cassandra/trunk/bin/cassandra.bat?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/bin/cassandra.bat (original)
+++ cassandra/trunk/bin/cassandra.bat Thu Feb 24 22:33:03 2011
@@ -43,7 +43,7 @@ set JAVA_OPTS=^
REM ***** CLASSPATH library setting *****
REM Ensure that any user defined CLASSPATH variables are not used on startup
-set CLASSPATH=%CASSANDRA_HOME%\conf
+set CLASSPATH="%CASSANDRA_HOME%\conf"
REM For each jar in the CASSANDRA_HOME lib directory call append to build the CLASSPATH variable.
for %%i in ("%CASSANDRA_HOME%\lib\*.jar") do call :append "%%i"
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1073884
+/cassandra/branches/cassandra-0.7/contrib:1026516-1074322
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Thu Feb 24 22:33:03 2011
@@ -20,8 +20,8 @@ package org.apache.cassandra.contrib.str
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.*;
@@ -38,9 +38,9 @@ public class Session
// command line options
public static final Options availableOptions = new Options();
- public final AtomicIntegerArray operationCount;
- public final AtomicIntegerArray keyCount;
- public final AtomicLongArray latencies;
+ public final AtomicInteger operations;
+ public final AtomicInteger keys;
+ public final AtomicLong latency;
static
{
@@ -93,7 +93,7 @@ public class Session
private PrintStream out = System.out;
private IndexType indexType = null;
- private Stress.Operation operation = Stress.Operation.INSERT;
+ private Stress.Operations operation = Stress.Operations.INSERT;
private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
@@ -183,7 +183,7 @@ public class Session
unframed = Boolean.parseBoolean(cmd.getOptionValue("m"));
if (cmd.hasOption("o"))
- operation = Stress.Operation.valueOf(cmd.getOptionValue("o").toUpperCase());
+ operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase());
if (cmd.hasOption("u"))
superColumns = Integer.parseInt(cmd.getOptionValue("u"));
@@ -248,9 +248,9 @@ public class Session
mean = numKeys / 2;
sigma = numKeys * STDev;
- operationCount = new AtomicIntegerArray(threads);
- keyCount = new AtomicIntegerArray(threads);
- latencies = new AtomicLongArray(threads);
+ operations = new AtomicInteger();
+ keys = new AtomicInteger();
+ latency = new AtomicLong();
}
public int getCardinality()
@@ -323,7 +323,7 @@ public class Session
return ignoreErrors;
}
- public Stress.Operation getOperation()
+ public Stress.Operations getOperation()
{
return operation;
}
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java Thu Feb 24 22:33:03 2011
@@ -18,15 +18,18 @@
package org.apache.cassandra.contrib.stress;
import org.apache.cassandra.contrib.stress.operations.*;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
+import org.apache.cassandra.thrift.Cassandra;
import org.apache.commons.cli.Option;
import java.io.PrintStream;
import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
public final class Stress
{
- public static enum Operation
+ public static enum Operations
{
INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET
}
@@ -34,9 +37,15 @@ public final class Stress
public static Session session;
public static Random randomizer = new Random();
+ /**
+ * Producer-Consumer model: 1 producer, N consumers
+ */
+ private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+
public static void main(String[] arguments) throws Exception
{
- int epoch, total, oldTotal, latency, keyCount, oldKeyCount, oldLatency;
+ long latency, oldLatency;
+ int epoch, total, oldTotal, keyCount, oldKeyCount;
try
{
@@ -49,51 +58,52 @@ public final class Stress
}
// creating keyspace and column families
- if (session.getOperation() == Stress.Operation.INSERT)
+ if (session.getOperation() == Stress.Operations.INSERT)
{
session.createKeySpaces();
}
int threadCount = session.getThreads();
- Thread[] threads = new Thread[threadCount];
- PrintStream out = session.getOutputStream();
+ Thread[] consumers = new Thread[threadCount];
+ PrintStream out = session.getOutputStream();
+
+ out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+
+ int itemsPerThread = session.getKeysPerThread();
+ int modulo = session.getNumKeys() % threadCount;
// creating required type of the threads for the test
- try
- {
- for (int i = 0; i < threadCount; i++)
- {
- threads[i] = createOperation(i);
- }
- }
- catch (Exception e)
+ for (int i = 0; i < threadCount; i++)
{
- System.err.println(e.getMessage());
- return;
+ if (i == threadCount - 1)
+ itemsPerThread += modulo; // last one is going to handle N + modulo items
+
+ consumers[i] = new Consumer(itemsPerThread);
}
+ new Producer().start();
+
// starting worker threads
for (int i = 0; i < threadCount; i++)
{
- threads[i].start();
+ consumers[i].start();
}
// initialization of the values
boolean terminate = false;
- epoch = total = latency = keyCount = 0;
+ latency = 0;
+ epoch = total = keyCount = 0;
int interval = session.getProgressInterval();
int epochIntervals = session.getProgressInterval() * 10;
long testStartTime = System.currentTimeMillis();
- out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
-
while (!terminate)
{
Thread.sleep(100);
int alive = 0;
- for (Thread thread : threads)
+ for (Thread thread : consumers)
if (thread.isAlive()) alive++;
if (alive == 0)
@@ -109,20 +119,9 @@ public final class Stress
oldLatency = latency;
oldKeyCount = keyCount;
- int currentTotal = 0, currentKeyCount = 0, currentLatency = 0;
-
- for (Thread t : threads)
- {
- OperationThread thread = (OperationThread) t;
-
- currentTotal += session.operationCount.get(thread.index);
- currentKeyCount += session.keyCount.get(thread.index);
- currentLatency += session.latencies.get(thread.index);
- }
-
- total = currentTotal;
- keyCount = currentKeyCount;
- latency = currentLatency;
+ total = session.operations.get();
+ keyCount = session.keys.get();
+ latency = session.latency.get();
int opDelta = total - oldTotal;
int keyDelta = keyCount - oldKeyCount;
@@ -136,7 +135,7 @@ public final class Stress
}
}
- private static Thread createOperation(int index)
+ private static Operation createOperation(int index)
{
switch (session.getOperation())
{
@@ -174,4 +173,58 @@ public final class Stress
option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
}
}
+
+ /**
+ * Produces exactly N items (awaits each to be consumed)
+ */
+ private static class Producer extends Thread
+ {
+ public void run()
+ {
+ for (int i = 0; i < session.getNumKeys(); i++)
+ {
+ try
+ {
+ operations.put(createOperation(i));
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println("Producer error - " + e.getMessage());
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Each consumes exactly N items from queue
+ */
+ private static class Consumer extends Thread
+ {
+ private final int items;
+
+ public Consumer(int toConsume)
+ {
+ items = toConsume;
+ }
+
+ public void run()
+ {
+ Cassandra.Client client = session.getClient();
+
+ for (int i = 0; i < items; i++)
+ {
+ try
+ {
+ operations.take().run(client); // running job
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+ }
+ }
+ }
+
}
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Thu Feb 24 22:33:03 2011
@@ -17,23 +17,23 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-public class IndexedRangeSlicer extends OperationThread
+public class IndexedRangeSlicer extends Operation
{
public IndexedRangeSlicer(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
String format = "%0" + session.getTotalKeysLength() + "d";
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
@@ -46,64 +46,59 @@ public class IndexedRangeSlicer extends
ByteBuffer columnName = ByteBuffer.wrap("C1".getBytes());
- for (int i = range.begins(); i < range.size(); i++)
- {
- int received = 0;
+ int received = 0;
- String startOffset = "0";
- ByteBuffer value = ByteBuffer.wrap(values.get(i % values.size()).getBytes());
+ String startOffset = "0";
+ ByteBuffer value = ByteBuffer.wrap(values.get(index % values.size()).getBytes());
- IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
+ IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
- while (received < expectedPerValue)
- {
- IndexClause clause = new IndexClause(Arrays.asList(expression), ByteBuffer.wrap(startOffset.getBytes()),
- session.getKeysPerCall());
+ while (received < expectedPerValue)
+ {
+ IndexClause clause = new IndexClause(Arrays.asList(expression),
+ ByteBuffer.wrap(startOffset.getBytes()),
+ session.getKeysPerCall());
- List<KeySlice> results = null;
- long start = System.currentTimeMillis();
+ List<KeySlice> results = null;
+ long start = System.currentTimeMillis();
- boolean success = false;
- String exceptionMessage = null;
+ boolean success = false;
+ String exceptionMessage = null;
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
- try
- {
- results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
+ try
+ {
+ results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
}
-
- if (!success)
+ catch (Exception e)
{
- System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
- index,
- session.getRetryTimes(),
- startOffset,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ startOffset,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
- received += results.size();
+ received += results.size();
- // convert max key found back to an integer, and increment it
- startOffset = String.format(format, (1 + getMaxKey(results)));
+ // convert max key found back to an integer, and increment it
+ startOffset = String.format(format, (1 + getMaxKey(results)));
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, results.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
- }
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(results.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
}
}
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Thu Feb 24 22:33:03 2011
@@ -17,26 +17,27 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class Inserter extends OperationThread
+public class Inserter extends Operation
{
+
public Inserter(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
List<String> values = generateValues();
List<Column> columns = new ArrayList<Column>();
@@ -48,8 +49,10 @@ public class Inserter extends OperationT
// columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
for (int i = 0; i < session.getColumnsPerKey(); i++)
{
- byte[] columnName = ("C" + Integer.toString(i)).getBytes();
- columns.add(new Column(ByteBuffer.wrap(columnName), ByteBuffer.wrap(new byte[] {}), System.currentTimeMillis()));
+ String columnName = ("C" + Integer.toString(i));
+ String columnValue = values.get(index % values.size());
+
+ columns.add(new Column(ByteBufferUtil.bytes(columnName), ByteBufferUtil.bytes(columnValue), System.currentTimeMillis()));
}
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
@@ -62,58 +65,47 @@ public class Inserter extends OperationT
}
}
- for (int i : range)
- {
- String rawKey = String.format(format, i);
- ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
- Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-
- record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
- ? getSuperColumnsMutationMap(superColumns)
- : getColumnsMutationMap(columns));
+ String rawKey = String.format(format, index);
+ Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
- String value = values.get(i % values.size());
+ record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super
+ ? getSuperColumnsMutationMap(superColumns)
+ : getColumnsMutationMap(columns));
- for (Column c : columns)
- c.value = ByteBuffer.wrap(value.getBytes());
+ long start = System.currentTimeMillis();
- long start = System.currentTimeMillis();
+ boolean success = false;
+ String exceptionMessage = null;
- boolean success = false;
- String exceptionMessage = null;
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
- for (int t = 0; t < session.getRetryTimes(); t++)
+ try
{
- if (success)
- break;
-
- try
- {
- client.batch_mutate(record, session.getConsistencyLevel());
- success = true;
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
+ client.batch_mutate(record, session.getConsistencyLevel());
+ success = true;
}
-
- if (!success)
+ catch (Exception e)
{
- System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
- session.getRetryTimes(),
- rawKey,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- break;
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n",
+ index,
+ session.getRetryTimes(),
+ rawKey,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
+
+ session.operations.getAndIncrement();
+ session.keys.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
}
private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns)
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Thu Feb 24 22:33:03 2011
@@ -17,90 +17,39 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class MultiGetter extends OperationThread
+public class MultiGetter extends Operation
{
public MultiGetter(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
ByteBuffer.wrap(new byte[] {}),
false, session.getColumnsPerKey()));
int offset = index * session.getKeysPerThread();
- Map<ByteBuffer,List<ColumnOrSuperColumn>> results = null;
- int count = (((index + 1) * session.getKeysPerThread()) - offset) / session.getKeysPerCall();
+ Map<ByteBuffer,List<ColumnOrSuperColumn>> results;
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
- for (int i = 0; i < count; i++)
- {
- List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
-
- long start = System.currentTimeMillis();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- }
- }
-
- if (!success)
- {
- System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
- index,
- session.getRetryTimes(),
- keys,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
- }
-
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, keys.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-
- offset += session.getKeysPerCall();
- }
- }
- }
- else
- {
- ColumnParent parent = new ColumnParent("Standard1");
+ List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
- for (int i = 0; i < count; i++)
+ for (int j = 0; j < session.getSuperColumns(); j++)
{
- List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+ ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
long start = System.currentTimeMillis();
@@ -120,29 +69,68 @@ public class MultiGetter extends Operati
catch (Exception e)
{
exceptionMessage = getExceptionMessage(e);
- success = false;
}
}
if (!success)
{
- System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
- index,
- session.getRetryTimes(),
- keys,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
+ error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, keys.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(keys.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
offset += session.getKeysPerCall();
}
}
+ else
+ {
+ ColumnParent parent = new ColumnParent("Standard1");
+
+ List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+
+ long start = System.currentTimeMillis();
+
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
+ }
+ }
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(keys.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
+
+ offset += session.getKeysPerCall();
+ }
}
private List<ByteBuffer> generateKeys(int start, int limit)
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Thu Feb 24 22:33:03 2011
@@ -17,15 +17,17 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class RangeSlicer extends OperationThread
+public class RangeSlicer extends Operation
{
public RangeSlicer(int index)
@@ -33,87 +35,29 @@ public class RangeSlicer extends Operati
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
String format = "%0" + session.getTotalKeysLength() + "d";
// initial values
- int current = range.begins();
- int limit = range.limit();
- int count = session.getColumnsPerKey();
- int last = current + session.getKeysPerCall();
+ int count = session.getColumnsPerKey();
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[] {}),
ByteBuffer.wrap(new byte[] {}),
- false, count));
+ false,
+ count));
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
- while (current < limit)
- {
- byte[] start = String.format(format, current).getBytes();
- byte[] end = String.format(format, last).getBytes();
-
- List<KeySlice> slices = new ArrayList<KeySlice>();
- KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
-
- for (int i = 0; i < session.getSuperColumns(); i++)
- {
- String superColumnName = "S" + Integer.toString(i);
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
-
- long startTime = System.currentTimeMillis();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- try
- {
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
- success = (slices.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
- index,
- session.getRetryTimes(),
- new String(start),
- new String(end),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+ byte[] start = String.format(format, index).getBytes();
- if (!session.ignoreErrors())
- return;
- }
-
- session.operationCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
- }
+ List<KeySlice> slices = new ArrayList<KeySlice>();
+ KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
- current += slices.size() + 1;
- last = current + slices.size() + 1;
- session.keyCount.getAndAdd(index, slices.size());
- }
- }
- else
- {
- ColumnParent parent = new ColumnParent("Standard1");
-
- while (current < limit)
+ for (int i = 0; i < session.getSuperColumns(); i++)
{
- byte[] start = String.format(format, current).getBytes();
- byte[] end = String.format(format, last).getBytes();
-
- List<KeySlice> slices = new ArrayList<KeySlice>();
- KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
+ String superColumnName = "S" + Integer.toString(i);
+ ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
long startTime = System.currentTimeMillis();
@@ -122,9 +66,6 @@ public class RangeSlicer extends Operati
for (int t = 0; t < session.getRetryTimes(); t++)
{
- if (success)
- break;
-
try
{
slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
@@ -139,24 +80,62 @@ public class RangeSlicer extends Operati
if (!success)
{
- System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
- index,
- session.getRetryTimes(),
- new String(start),
- new String(end),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
+ error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
- current += slices.size() + 1;
- last = current + slices.size() + 1;
+ session.operations.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - startTime);
+ }
+
+ session.keys.getAndAdd(slices.size());
+ }
+ else
+ {
+ ColumnParent parent = new ColumnParent("Standard1");
+
+ byte[] start = String.format(format, index).getBytes();
+
+ List<KeySlice> slices = new ArrayList<KeySlice>();
+ KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+
+ long startTime = System.currentTimeMillis();
+
+ boolean success = false;
+ String exceptionMessage = null;
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, slices.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ success = (slices.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
+ }
}
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(slices.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - startTime);
}
}
}
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Thu Feb 24 22:33:03 2011
@@ -17,25 +17,22 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
import java.io.IOException;
-import java.lang.AssertionError;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.List;
-public class Reader extends OperationThread
+public class Reader extends Operation
{
public Reader(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
SliceRange sliceRange = new SliceRange();
@@ -50,75 +47,23 @@ public class Reader extends OperationThr
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
- runSuperColumnReader(predicate);
+ runSuperColumnReader(predicate, client);
}
else
{
- runColumnReader(predicate);
+ runColumnReader(predicate, client);
}
}
- private void runSuperColumnReader(SlicePredicate predicate)
+ private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
{
- for (int i = 0; i < session.getKeysPerThread(); i++)
- {
- byte[] rawKey = generateKey();
- ByteBuffer key = ByteBuffer.wrap(rawKey);
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- String superColumn = 'S' + Integer.toString(j);
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
-
- long start = System.currentTimeMillis();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
- success = (columns.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
- session.getRetryTimes(),
- new String(rawKey),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
- }
-
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
- }
- }
- }
-
- private void runColumnReader(SlicePredicate predicate)
- {
- ColumnParent parent = new ColumnParent("Standard1");
+ byte[] rawKey = generateKey();
+ ByteBuffer key = ByteBuffer.wrap(rawKey);
- for (int i = 0; i < session.getKeysPerThread(); i++)
+ for (int j = 0; j < session.getSuperColumns(); j++)
{
- byte[] key = generateKey();
- ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+ String superColumn = 'S' + Integer.toString(j);
+ ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
long start = System.currentTimeMillis();
@@ -133,7 +78,7 @@ public class Reader extends OperationThr
try
{
List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
success = (columns.size() != 0);
}
catch (Exception e)
@@ -145,18 +90,61 @@ public class Reader extends OperationThr
if (!success)
{
- System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
- session.getRetryTimes(),
- new String(key),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+ error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(rawKey),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ session.operations.getAndIncrement();
+ session.keys.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
+ }
+ }
+
+ private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
+ {
+ ColumnParent parent = new ColumnParent("Standard1");
+
+ byte[] key = generateKey();
+ ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+
+ long start = System.currentTimeMillis();
+
+ boolean success = false;
+ String exceptionMessage = null;
- if (!session.ignoreErrors())
- return;
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ List<ColumnOrSuperColumn> columns;
+ columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ success = (columns.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(key),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
+
+ session.operations.getAndIncrement();
+ session.keys.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
}
+
}
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1074322
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1074322
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1074322
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1074322
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 24 22:33:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1073884
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1074322
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Feb 24 22:33:03 2011
@@ -57,7 +57,7 @@ public final class CFMetaData
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
- public final static int DEFAULT_MEMTABLE_LIFETIME_IN_MINS = 60;
+ public final static int DEFAULT_MEMTABLE_LIFETIME_IN_MINS = 60 * 24;
public final static int DEFAULT_MEMTABLE_THROUGHPUT_IN_MB = sizeMemtableThroughput();
public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS = sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Thu Feb 24 22:33:03 2011
@@ -125,8 +125,7 @@ public class BinaryMemtable implements I
private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
{
logger.info("Writing " + this);
- String path = cfs.getFlushPath();
- SSTableWriter writer = new SSTableWriter(path, sortedKeys.size(), cfs.metadata, cfs.partitioner);
+ SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size());
for (DecoratedKey key : sortedKeys)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Feb 24 22:33:03 2011
@@ -450,7 +450,7 @@ public class ColumnFamilyStore implement
long count = 0;
for (SSTableReader sstable : ssTables)
{
- sum += sstable.getEstimatedRowSize().median();
+ sum += sstable.getEstimatedRowSize().mean();
count++;
}
return count > 0 ? sum / count : 0;
@@ -462,7 +462,7 @@ public class ColumnFamilyStore implement
int count = 0;
for (SSTableReader sstable : ssTables)
{
- sum += sstable.getEstimatedColumnCount().median();
+ sum += sstable.getEstimatedColumnCount().mean();
count++;
}
return count > 0 ? (int) (sum / count) : 0;
@@ -1073,12 +1073,12 @@ public class ColumnFamilyStore implement
public long[] getRecentSSTablesPerReadHistogram()
{
- return recentSSTablesPerRead.get(true);
+ return recentSSTablesPerRead.getBuckets(true);
}
public long[] getSSTablesPerReadHistogram()
{
- return sstablesPerRead.get(false);
+ return sstablesPerRead.getBuckets(false);
}
public long getReadCount()
@@ -2052,7 +2052,7 @@ public class ColumnFamilyStore implement
for (SSTableReader sstable : ssTables)
{
- long[] rowSize = sstable.getEstimatedRowSize().get(false);
+ long[] rowSize = sstable.getEstimatedRowSize().getBuckets(false);
for (int i = 0; i < histogram.length; i++)
histogram[i] += rowSize[i];
@@ -2067,7 +2067,7 @@ public class ColumnFamilyStore implement
for (SSTableReader sstable : ssTables)
{
- long[] columnSize = sstable.getEstimatedColumnCount().get(false);
+ long[] columnSize = sstable.getEstimatedColumnCount().getBuckets(false);
for (int i = 0; i < histogram.length; i++)
histogram[i] += columnSize[i];
@@ -2160,4 +2160,14 @@ public class ColumnFamilyStore implement
return intern(name);
}
+
+ public SSTableWriter createFlushWriter(long estimatedRows) throws IOException
+ {
+ return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner);
+ }
+
+ public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException
+ {
+ return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu Feb 24 22:33:03 2011
@@ -445,8 +445,7 @@ public class CompactionManager implement
return 0;
}
- String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
- writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner);
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
@@ -507,15 +506,13 @@ public class CompactionManager implement
private void doScrub(ColumnFamilyStore cfs) throws IOException
{
assert !cfs.isIndex();
- Table table = cfs.table;
- Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
for (final SSTableReader sstable : cfs.getSSTables())
{
logger.info("Scrubbing " + sstable);
// Calculate the expected compacted filesize
- String compactionFileLocation = table.getDataFileLocation(sstable.length());
+ String compactionFileLocation = cfs.table.getDataFileLocation(sstable.length());
if (compactionFileLocation == null)
throw new IOException("disk full");
@@ -707,8 +704,7 @@ public class CompactionManager implement
if (writer == null)
{
FileUtils.createDirectory(compactionFileLocation);
- String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
- writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner);
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation);
}
return writer;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Feb 24 22:33:03 2011
@@ -155,7 +155,7 @@ public class Memtable implements Compara
private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), cfs.metadata, cfs.partitioner);
+ SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size());
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
writer.append(entry.getKey(), entry.getValue());
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java Thu Feb 24 22:33:03 2011
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Comparator;
import org.apache.cassandra.db.IColumn;
+import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
/**
* Specifies a Comparator for a specific type of ByteBuffer.
@@ -37,6 +38,28 @@ import org.apache.cassandra.db.IColumn;
*/
public abstract class AbstractType implements Comparator<ByteBuffer>
{
+ public final Comparator<IndexInfo> indexComparator;
+ public final Comparator<IndexInfo> indexReverseComparator;
+
+ protected AbstractType()
+ {
+ final AbstractType that = this;
+ indexComparator = new Comparator<IndexInfo>()
+ {
+ public int compare(IndexInfo o1, IndexInfo o2)
+ {
+ return that.compare(o1.lastName, o2.lastName);
+ }
+ };
+ indexReverseComparator = new Comparator<IndexInfo>()
+ {
+ public int compare(IndexInfo o1, IndexInfo o2)
+ {
+ return that.compare(o1.firstName, o2.firstName);
+ }
+ };
+ }
+
/** get a string representation of the bytes suitable for log messages */
public abstract String getString(ByteBuffer bytes);
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Thu Feb 24 22:33:03 2011
@@ -151,21 +151,7 @@ public class IndexHelper
public static Comparator<IndexInfo> getComparator(final AbstractType nameComparator, boolean reversed)
{
- return reversed
- ? new Comparator<IndexInfo>()
- {
- public int compare(IndexInfo o1, IndexInfo o2)
- {
- return nameComparator.compare(o1.firstName, o2.firstName);
- }
- }
- : new Comparator<IndexInfo>()
- {
- public int compare(IndexInfo o1, IndexInfo o2)
- {
- return nameComparator.compare(o1.lastName, o2.lastName);
- }
- };
+ return reversed ? nameComparator.indexReverseComparator : nameComparator.indexComparator;
}
public static class IndexInfo
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Feb 24 22:33:03 2011
@@ -587,7 +587,7 @@ public class SSTableReader extends SSTab
{
return metadata.cfType == ColumnFamilyType.Standard
? Column.serializer()
- : SuperColumn.serializer(getColumnComparator());
+ : SuperColumn.serializer(metadata.subcolumnComparator);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Feb 24 22:33:03 2011
@@ -22,9 +22,12 @@ package org.apache.cassandra.io.sstable;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import com.google.common.collect.Sets;
+
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -206,8 +209,10 @@ public class SSTableWriter extends SSTab
Descriptor newdesc = tmpdesc.asTemporary(false);
try
{
- for (Component component : components)
+ // do -Data last because -Data present should mean the sstable was completely renamed before crash
+ for (Component component : Sets.difference(components, Collections.singleton(Component.DATA)))
FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+ FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Thu Feb 24 22:33:03 2011
@@ -58,11 +58,6 @@ public class BufferedRandomAccessFile ex
// `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer
private long bufferOffset, bufferEnd, current = 0;
- // max buffer size is set according to (int size) parameter in the
- // constructor
- // or in directIO() method to the DEFAULT_DIRECT_BUFFER_SIZE
- private long maxBufferSize;
-
// constant, used for caching purpose, -1 if file is open in "rw" mode
// otherwise this will hold cached file length
private final long fileLength;
@@ -88,7 +83,7 @@ public class BufferedRandomAccessFile ex
*/
public BufferedRandomAccessFile(String name, String mode) throws IOException
{
- this(new File(name), mode, 0);
+ this(new File(name), mode, DEFAULT_BUFFER_SIZE);
}
public BufferedRandomAccessFile(String name, String mode, int bufferSize) throws IOException
@@ -103,7 +98,7 @@ public class BufferedRandomAccessFile ex
*/
public BufferedRandomAccessFile(File file, String mode) throws IOException
{
- this(file, mode, 0);
+ this(file, mode, DEFAULT_BUFFER_SIZE);
}
public BufferedRandomAccessFile(File file, String mode, int bufferSize) throws IOException
@@ -120,10 +115,10 @@ public class BufferedRandomAccessFile ex
channel = super.getChannel();
filePath = file.getAbsolutePath();
- maxBufferSize = Math.max(bufferSize, DEFAULT_BUFFER_SIZE);
-
// allocating required size of the buffer
- buffer = ByteBuffer.allocate((int) maxBufferSize);
+ if (bufferSize <= 0)
+ throw new IllegalArgumentException("bufferSize must be positive");
+ buffer = ByteBuffer.allocate(bufferSize);
// if in read-only mode, caching file size
fileLength = (mode.equals("r")) ? this.channel.size() : -1;
@@ -211,7 +206,7 @@ public class BufferedRandomAccessFile ex
channel.position(bufferOffset); // setting channel position
long bytesRead = channel.read(buffer); // reading from that position
- hitEOF = (bytesRead < maxBufferSize); // buffer is not fully loaded with
+ hitEOF = (bytesRead < buffer.capacity()); // buffer is not fully loaded with
// data
bufferEnd = bufferOffset + bytesRead;
@@ -284,13 +279,11 @@ public class BufferedRandomAccessFile ex
if (length > bufferEnd && hitEOF)
return -1;
- final int left = (int) maxBufferSize - buffer.position();
+ final int left = buffer.capacity() - buffer.position();
if (current < bufferOffset || left < length)
- {
reBuffer();
- }
- length = Math.min(length, (int) (maxBufferSize - buffer.position()));
+ length = Math.min(length, buffer.capacity() - buffer.position());
buffer.get(buff, offset, length);
current += length;
@@ -341,15 +334,13 @@ public class BufferedRandomAccessFile ex
*/
private int writeAtMost(byte[] buff, int offset, int length) throws IOException
{
- final int left = (int) maxBufferSize - buffer.position();
+ final int left = buffer.capacity() - buffer.position();
if (current < bufferOffset || left < length)
- {
reBuffer();
- }
// logic is the following: we need to add bytes to the end of the buffer
// starting from current buffer position and return this length
- length = Math.min(length, (int) (maxBufferSize - buffer.position()));
+ length = Math.min(length, buffer.capacity() - buffer.position());
buffer.put(buff, offset, length);
current += length;
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1074329&r1=1074328&r2=1074329&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Feb 24 22:33:03 2011
@@ -39,151 +39,180 @@ import org.apache.thrift.transport.TTran
/**
* Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
* This allows passing an executor so you have more control over the actual
* behaviour of the tasks being run.
- *
+ * <p/>
* Newer version of Thrift should make this obsolete.
*/
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
-private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
-// Executor service for handling client connections
-private ExecutorService executorService_;
+ // Executor service for handling client connections
+ private ExecutorService executorService_;
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
- public int stopTimeoutVal = 60;
- public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
- TServerSocket tServerSocket,
- TTransportFactory inTransportFactory,
- TTransportFactory outTransportFactory,
- TProtocolFactory tProtocolFactory,
- TProtocolFactory tProtocolFactory2,
- Options options,
- ExecutorService executorService) {
-
- super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
- tProtocolFactory, tProtocolFactory2);
- options_ = options;
- executorService_ = executorService;
-}
-
-
-public void serve() {
- try {
- serverTransport_.listen();
- } catch (TTransportException ttx) {
- LOGGER.error("Error occurred during listening.", ttx);
- return;
- }
-
- stopped_ = false;
- while (!stopped_) {
- int failureCount = 0;
- try {
- TTransport client = serverTransport_.accept();
- WorkerProcess wp = new WorkerProcess(client);
- executorService_.execute(wp);
- } catch (TTransportException ttx) {
- if (!stopped_) {
- ++failureCount;
- LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
- }
- }
- }
-
- executorService_.shutdown();
-
- // Loop until awaitTermination finally does return without a interrupted
- // exception. If we don't do this, then we'll shut down prematurely. We want
- // to let the executorService clear it's task queue, closing client sockets
- // appropriately.
- long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
- long now = System.currentTimeMillis();
- while (timeoutMS >= 0) {
- try {
- executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
- break;
- } catch (InterruptedException ix) {
- long newnow = System.currentTimeMillis();
- timeoutMS -= (newnow - now);
- now = newnow;
- }
- }
-}
-
-public void stop() {
- stopped_ = true;
- serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
- /**
- * Client that this services.
- */
- private TTransport client_;
-
- /**
- * Default constructor.
- *
- * @param client Transport to process
- */
- private WorkerProcess(TTransport client) {
- client_ = client;
- }
-
- /**
- * Loops on processing a client forever
- */
- public void run() {
- TProcessor processor = null;
- TTransport inputTransport = null;
- TTransport outputTransport = null;
- TProtocol inputProtocol = null;
- TProtocol outputProtocol = null;
- try {
- processor = processorFactory_.getProcessor(client_);
- inputTransport = inputTransportFactory_.getTransport(client_);
- outputTransport = outputTransportFactory_.getTransport(client_);
- inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
- // we check stopped_ first to make sure we're not supposed to be shutting
- // down. this is necessary for graceful shutdown.
- while (!stopped_ && processor.process(inputProtocol, outputProtocol))
- {
- inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
- }
- } catch (TTransportException ttx) {
- // Assume the client died and continue silently
- } catch (TException tx) {
- LOGGER.error("Thrift error occurred during processing of message.", tx);
- } catch (Exception x) {
- LOGGER.error("Error occurred during processing of message.", x);
- }
-
- if (inputTransport != null) {
- inputTransport.close();
- }
-
- if (outputTransport != null) {
- outputTransport.close();
- }
- }
-}
+ // Flag for stopping the server
+ private volatile boolean stopped_;
+
+ // Server options
+ private Options options_;
+
+ // Customizable server options
+ public static class Options
+ {
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ }
+
+
+ public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+ TServerSocket tServerSocket,
+ TTransportFactory inTransportFactory,
+ TTransportFactory outTransportFactory,
+ TProtocolFactory tProtocolFactory,
+ TProtocolFactory tProtocolFactory2,
+ Options options,
+ ExecutorService executorService)
+ {
+
+ super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
+ tProtocolFactory, tProtocolFactory2);
+ options_ = options;
+ executorService_ = executorService;
+ }
+
+
+ public void serve()
+ {
+ try
+ {
+ serverTransport_.listen();
+ }
+ catch (TTransportException ttx)
+ {
+ LOGGER.error("Error occurred during listening.", ttx);
+ return;
+ }
+
+ stopped_ = false;
+ while (!stopped_)
+ {
+ int failureCount = 0;
+ try
+ {
+ TTransport client = serverTransport_.accept();
+ WorkerProcess wp = new WorkerProcess(client);
+ executorService_.execute(wp);
+ }
+ catch (TTransportException ttx)
+ {
+ if (!stopped_)
+ {
+ ++failureCount;
+ LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+ }
+ }
+ }
+
+ executorService_.shutdown();
+
+ // Loop until awaitTermination finally does return without a interrupted
+ // exception. If we don't do this, then we'll shut down prematurely. We want
+ // to let the executorService clear it's task queue, closing client sockets
+ // appropriately.
+ long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+ long now = System.currentTimeMillis();
+ while (timeoutMS >= 0)
+ {
+ try
+ {
+ executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (InterruptedException ix)
+ {
+ long newnow = System.currentTimeMillis();
+ timeoutMS -= (newnow - now);
+ now = newnow;
+ }
+ }
+ }
+
+ public void stop()
+ {
+ stopped_ = true;
+ serverTransport_.interrupt();
+ }
+
+ private class WorkerProcess implements Runnable
+ {
+
+ /**
+ * Client that this services.
+ */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client)
+ {
+ client_ = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run()
+ {
+ TProcessor processor = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ try
+ {
+ processor = processorFactory_.getProcessor(client_);
+ inputTransport = inputTransportFactory_.getTransport(client_);
+ outputTransport = outputTransportFactory_.getTransport(client_);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ // we check stopped_ first to make sure we're not supposed to be shutting
+ // down. this is necessary for graceful shutdown.
+ while (!stopped_ && processor.process(inputProtocol, outputProtocol))
+ {
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ }
+ }
+ catch (TTransportException ttx)
+ {
+ // Assume the client died and continue silently
+ }
+ catch (TException tx)
+ {
+ LOGGER.error("Thrift error occurred during processing of message.", tx);
+ }
+ catch (Exception x)
+ {
+ LOGGER.error("Error occurred during processing of message.", x);
+ }
+
+ if (inputTransport != null)
+ {
+ inputTransport.close();
+ }
+
+ if (outputTransport != null)
+ {
+ outputTransport.close();
+ }
+ }
+ }
}