You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 20:54:54 UTC
svn commit: r1060939 - in /cassandra/trunk: ./ conf/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/streaming/
test/distributed/org/apache/cassandra/
Author: jbellis
Date: Wed Jan 19 19:54:53 2011
New Revision: 1060939
URL: http://svn.apache.org/viewvc?rev=1060939&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra-env.sh
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1060910
+/cassandra/branches/cassandra-0.7:1026516-1060938
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 19 19:54:53 2011
@@ -20,7 +20,7 @@
* implement describeOwnership for BOP, COPP (CASSANDRA-1928)
* make read repair behave as expected for ConsistencyLevel > ONE
(CASSANDRA-982)
- * distributed test harness (CASSANDRA-1859)
+ * distributed test harness (CASSANDRA-1859, 1964)
* reduce flush lock contention (CASSANDRA-1930)
* optimize supercolumn deserialization (CASSANDRA-1891)
* fix CFMetaData.apply to only compare objects of the same class
@@ -33,6 +33,8 @@
* fixes for contrib/javautils (CASSANDRA-1979)
* check more frequently for memtable expiration (CASSANDRA-2000)
* fix writing SSTable column count statistics (CASSANDRA-1976)
+ * fix streaming of multiple CFs during bootstrap (CASSANDRA-1992)
+ * explicitly set JVM GC new generation size with -Xmn (CASSANDRA-1968)
0.7.0-final
Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Wed Jan 19 19:54:53 2011
@@ -19,25 +19,46 @@ calculate_heap_size()
case "`uname`" in
Linux)
system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'`
- MAX_HEAP_SIZE=$((system_memory_in_mb / 2))M
- return 0
+ system_cpu_cores=`cat /proc/cpuinfo | egrep '^processor(\s|\t)+:.*' | wc -l`
+ break
;;
FreeBSD)
system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
- MAX_HEAP_SIZE=$((system_memory_in_bytes / 1024 / 1024 / 2))M
- return 0
+ system_memory_in_mb=$((system_memory_in_bytes / 1024 / 1024))
+ system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
+ break
;;
*)
MAX_HEAP_SIZE=1024M
+ HEAP_NEWSIZE=256M
return 1
;;
esac
+ max_heap_size_in_mb=$((system_memory_in_mb / 2))
+ MAX_HEAP_SIZE=${max_heap_size_in_mb}M
+
+ # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size)
+ max_sensible_yg_per_core_in_mb="100"
+ max_sensible_yg_in_mb=$((max_sensible_yg_per_core_in_mb * system_cpu_cores))
+
+ desired_yg_in_mb=$((max_heap_size_in_mb / 4))
+
+ if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
+ then
+ HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
+ else
+ HEAP_NEWSIZE="${desired_yg_in_mb}M"
+ fi
+
+ return 0
}
# The amount of memory to allocate to the JVM at startup, you almost
# certainly want to adjust this for your environment. If left commented
# out, the heap size will be automatically determined by calculate_heap_size
# MAX_HEAP_SIZE="4G"
+# set this to explicity control the size of the young generation
+# HEAP_NEWSIZE="1G"
if [ "x$MAX_HEAP_SIZE" = "x" ]; then
calculate_heap_size
@@ -68,6 +89,7 @@ JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPo
# out.
JVM_OPTS="$JVM_OPTS -Xms$MAX_HEAP_SIZE"
JVM_OPTS="$JVM_OPTS -Xmx$MAX_HEAP_SIZE"
+JVM_OPTS="$JVM_OPTS -Xmn$HEAP_NEWSIZE"
JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError"
if [ "`uname`" = "Linux" ] ; then
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1060938
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1060938
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1060938
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1060938
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 19 19:54:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1060910
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1060938
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jan 19 19:54:53 2011
@@ -923,6 +923,7 @@ public class ColumnFamilyStore implement
*/
public void addSSTable(SSTableReader sstable)
{
+ assert sstable.getColumnFamilyName().equals(columnFamily);
ssTables.add(Arrays.asList(sstable));
CompactionManager.instance.submitMinorIfNeeded(this);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Jan 19 19:54:53 2011
@@ -48,7 +48,6 @@ public class StreamInSession
private final Runnable callback;
private String table;
private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>();
- private ColumnFamilyStore cfs;
private PendingFile current;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -92,13 +91,11 @@ public class StreamInSession
public void addFiles(Collection<PendingFile> files)
{
- for(PendingFile file : files)
+ for (PendingFile file : files)
{
if(logger.isDebugEnabled())
logger.debug("Adding file {} to Stream Request queue", file.getFilename());
this.files.add(file);
- if (cfs == null)
- cfs = Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
}
}
@@ -130,16 +127,20 @@ public class StreamInSession
if (files.isEmpty())
{
// wait for bloom filters and row indexes to finish building
- List<SSTableReader> sstables = new ArrayList<SSTableReader>(buildFutures.size());
+ HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
for (Future<SSTableReader> future : buildFutures)
{
try
{
SSTableReader sstable = future.get();
+ assert sstable.getTableName().equals(table);
if (sstable == null)
continue;
+ ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
cfs.addSSTable(sstable);
- sstables.add(sstable);
+ if (!cfstores.containsKey(cfs))
+ cfstores.put(cfs, new ArrayList<SSTableReader>());
+ cfstores.get(cfs).add(sstable);
}
catch (InterruptedException e)
{
@@ -152,8 +153,11 @@ public class StreamInSession
}
// build secondary indexes
- if (cfs != null && !cfs.getIndexedColumns().isEmpty())
- cfs.buildSecondaryIndexes(sstables, cfs.getIndexedColumns());
+ for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
+ {
+ if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
+ entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
+ }
// send reply to source that we're done
StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
Modified: cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java?rev=1060939&r1=1060938&r2=1060939&view=diff
==============================================================================
--- cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java (original)
+++ cassandra/trunk/test/distributed/org/apache/cassandra/MutationTest.java Wed Jan 19 19:54:53 2011
@@ -26,16 +26,18 @@ import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.thrift.TException;
+import org.apache.thrift.TException;
import org.apache.cassandra.client.*;
import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.CassandraServiceController.Failure;
@@ -48,6 +50,8 @@ import static junit.framework.Assert.ass
public class MutationTest extends TestBase
{
+ private static final Logger logger = LoggerFactory.getLogger(MutationTest.class);
+
@Test
public void testInsert() throws Exception
{
@@ -62,9 +66,9 @@ public class MutationTest extends TestBa
insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
-
- assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
- assertColumnEqual("c2", "v2", 0, getColumn(client, key, "Standard1", "c2", ConsistencyLevel.ONE));
+ // block until the column is available
+ new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
+ new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);
List<ColumnOrSuperColumn> coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
@@ -84,24 +88,22 @@ public class MutationTest extends TestBa
ByteBuffer key = newKey();
insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ALL);
+ // should be instantly available
assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, keyspace);
InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size()));
- Thread.sleep(10000); // let gossip catch up
-
try {
client = controller.createClient(coordinator);
client.set_keyspace(keyspace);
- assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
+ new Get(client, "Standard1", key).name("c1").value("v1")
+ .perform(ConsistencyLevel.ONE);
- insert(client, key, "Standard1", "c3", "v3", 0, ConsistencyLevel.ALL);
- assert false;
- } catch (UnavailableException e) {
- // [this is good]
+ new Insert(client, "Standard1", key).name("c3").value("v3")
+ .expecting(UnavailableException.class).perform(ConsistencyLevel.ALL);
} finally {
failure.resolve();
Thread.sleep(10000);
@@ -125,26 +127,21 @@ public class MutationTest extends TestBa
InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size())); //kill all but one nodes
- Thread.sleep(10000);
client = controller.createClient(coordinator);
client.set_keyspace(keyspace);
try {
- insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.QUORUM);
- assert false;
- } catch (UnavailableException e) {
- // [this is good]
+ new Insert(client, "Standard1", key).name("c1").value("v1")
+ .expecting(UnavailableException.class).perform(ConsistencyLevel.QUORUM);
} finally {
failure.resolve();
- Thread.sleep(10000);
}
// with all nodes up
- insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.QUORUM);
+ new Insert(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
failure = controller.failHosts(endpoints.get(0));
- Thread.sleep(10000);
try {
- getColumn(client, key, "Standard1", "c2", ConsistencyLevel.QUORUM);
+ new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
} finally {
failure.resolve();
Thread.sleep(10000);
@@ -180,12 +177,9 @@ public class MutationTest extends TestBa
// read with all (success)
Failure failure = controller.failHosts(endpoints);
- Thread.sleep(10000);
try {
- insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
- assert false;
- } catch (UnavailableException e) {
- // this is good
+ new Insert(client, "Standard1", key).name("c2").value("v2")
+ .expecting(UnavailableException.class).perform(ConsistencyLevel.ONE);
} finally {
failure.resolve();
}
@@ -210,6 +204,122 @@ public class MutationTest extends TestBa
return client.get(key, cpath, cl).column;
}
+ protected class Get extends RetryingAction
+ {
+ public Get(Cassandra.Client client, String cf, ByteBuffer key)
+ {
+ super(client, cf, key);
+ }
+
+ public void tryPerformAction(ConsistencyLevel cl) throws Exception
+ {
+ assertColumnEqual(name, value, timestamp, getColumn(client, key, cf, name, cl));
+ }
+ }
+
+ protected class Insert extends RetryingAction
+ {
+ public Insert(Cassandra.Client client, String cf, ByteBuffer key)
+ {
+ super(client, cf, key);
+ }
+
+ public void tryPerformAction(ConsistencyLevel cl) throws Exception
+ {
+ insert(client, key, cf, name, value, timestamp, cl);
+ }
+ }
+
+ /** Performs an action repeatedly until timeout, success or failure. */
+ protected abstract class RetryingAction
+ {
+ protected Cassandra.Client client;
+ protected String cf;
+ protected ByteBuffer key;
+ protected String name;
+ protected String value;
+ protected long timestamp;
+
+ private Set<Class<Exception>> expected = new HashSet<Class<Exception>>();
+ private long timeout = StorageService.RING_DELAY;
+
+ public RetryingAction(Cassandra.Client client, String cf, ByteBuffer key)
+ {
+ this.client = client;
+ this.cf = cf;
+ this.key = key;
+ this.timestamp = 0;
+ }
+
+ public RetryingAction name(String name)
+ {
+ this.name = name; return this;
+ }
+
+ /** The value to expect for the return column, or null to expect the column to be missing. */
+ public RetryingAction value(String value)
+ {
+ this.value = value; return this;
+ }
+
+ /** The total time to allow before failing. */
+ public RetryingAction timeout(long timeout)
+ {
+ this.timeout = timeout; return this;
+ }
+
+ /** The expected timestamp of the returned column. */
+ public RetryingAction timestamp(long timestamp)
+ {
+ this.timestamp = timestamp; return this;
+ }
+
+ /** The exception classes that indicate success. */
+ public RetryingAction expecting(Class... tempExceptions)
+ {
+ this.expected.clear();
+ for (Class exclass : tempExceptions)
+ expected.add((Class<Exception>)exclass);
+ return this;
+ }
+
+ public void perform(ConsistencyLevel cl) throws AssertionError
+ {
+ long deadline = System.currentTimeMillis() + timeout;
+ int attempts = 0;
+ String template = "%s for " + this + " after %d attempt(s) with %d ms to spare.";
+ Exception e = null;
+ while(deadline > System.currentTimeMillis())
+ {
+ try
+ {
+ attempts++;
+ tryPerformAction(cl);
+ logger.info(String.format(template, "Succeeded", attempts, deadline - System.currentTimeMillis()));
+ return;
+ }
+ catch (Exception ex)
+ {
+ e = ex;
+ if (!expected.contains(ex.getClass()))
+ continue;
+ logger.info(String.format(template, "Caught expected exception: " + e, attempts, deadline - System.currentTimeMillis()));
+ return;
+ }
+ }
+ String err = String.format(template, "Caught unexpected: " + e, attempts, deadline - System.currentTimeMillis());
+ logger.error(err);
+ throw new AssertionError(err);
+ }
+
+ public String toString()
+ {
+ return this.getClass() + "(" + key + "," + name + ")";
+ }
+
+ protected abstract void tryPerformAction(ConsistencyLevel cl) throws Exception;
+ }
+
protected List<ColumnOrSuperColumn> get_slice(Cassandra.Client client, ByteBuffer key, String cf, ConsistencyLevel cl)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{