You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 20:52:51 UTC
svn commit: r1060938 - in /cassandra/branches/cassandra-0.7: CHANGES.txt
test/distributed/org/apache/cassandra/MutationTest.java
Author: jbellis
Date: Wed Jan 19 19:52:51 2011
New Revision: 1060938
URL: http://svn.apache.org/viewvc?rev=1060938&view=rev
Log:
fix distributed-test MutationTest
patch by stuhood; reviewed by Pavel Yaskevich for CASSANDRA-1964
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1060938&r1=1060937&r2=1060938&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 19 19:52:51 2011
@@ -12,7 +12,7 @@
* implement describeOwnership for BOP, COPP (CASSANDRA-1928)
* make read repair behave as expected for ConsistencyLevel > ONE
(CASSANDRA-982)
- * distributed test harness (CASSANDRA-1859)
+ * distributed test harness (CASSANDRA-1859, 1964)
* reduce flush lock contention (CASSANDRA-1930)
* optimize supercolumn deserialization (CASSANDRA-1891)
* fix CFMetaData.apply to only compare objects of the same class
Modified: cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java?rev=1060938&r1=1060937&r2=1060938&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java Wed Jan 19 19:52:51 2011
@@ -26,16 +26,18 @@ import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.thrift.TException;
+import org.apache.thrift.TException;
import org.apache.cassandra.client.*;
import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.CassandraServiceController.Failure;
@@ -48,6 +50,8 @@ import static junit.framework.Assert.ass
public class MutationTest extends TestBase
{
+ private static final Logger logger = LoggerFactory.getLogger(MutationTest.class);
+
@Test
public void testInsert() throws Exception
{
@@ -62,9 +66,9 @@ public class MutationTest extends TestBa
insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
-
- assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
- assertColumnEqual("c2", "v2", 0, getColumn(client, key, "Standard1", "c2", ConsistencyLevel.ONE));
+ // block until the column is available
+ new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
+ new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);
List<ColumnOrSuperColumn> coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
@@ -84,24 +88,22 @@ public class MutationTest extends TestBa
ByteBuffer key = newKey();
insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ALL);
+ // should be instantly available
assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, keyspace);
InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size()));
- Thread.sleep(10000); // let gossip catch up
-
try {
client = controller.createClient(coordinator);
client.set_keyspace(keyspace);
- assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
+ new Get(client, "Standard1", key).name("c1").value("v1")
+ .perform(ConsistencyLevel.ONE);
- insert(client, key, "Standard1", "c3", "v3", 0, ConsistencyLevel.ALL);
- assert false;
- } catch (UnavailableException e) {
- // [this is good]
+ new Insert(client, "Standard1", key).name("c3").value("v3")
+ .expecting(UnavailableException.class).perform(ConsistencyLevel.ALL);
} finally {
failure.resolve();
Thread.sleep(10000);
@@ -125,26 +127,21 @@ public class MutationTest extends TestBa
InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size())); //kill all but one nodes
- Thread.sleep(10000);
client = controller.createClient(coordinator);
client.set_keyspace(keyspace);
try {
- insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.QUORUM);
- assert false;
- } catch (UnavailableException e) {
- // [this is good]
+ new Insert(client, "Standard1", key).name("c1").value("v1")
+ .expecting(UnavailableException.class).perform(ConsistencyLevel.QUORUM);
} finally {
failure.resolve();
- Thread.sleep(10000);
}
// with all nodes up
- insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.QUORUM);
+ new Insert(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
failure = controller.failHosts(endpoints.get(0));
- Thread.sleep(10000);
try {
- getColumn(client, key, "Standard1", "c2", ConsistencyLevel.QUORUM);
+ new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
} finally {
failure.resolve();
Thread.sleep(10000);
@@ -180,12 +177,9 @@ public class MutationTest extends TestBa
// read with all (success)
Failure failure = controller.failHosts(endpoints);
- Thread.sleep(10000);
try {
- insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
- assert false;
- } catch (UnavailableException e) {
- // this is good
+ new Insert(client, "Standard1", key).name("c2").value("v2")
+ .expecting(UnavailableException.class).perform(ConsistencyLevel.ONE);
} finally {
failure.resolve();
}
@@ -210,6 +204,122 @@ public class MutationTest extends TestBa
return client.get(key, cpath, cl).column;
}
+ protected class Get extends RetryingAction
+ {
+ public Get(Cassandra.Client client, String cf, ByteBuffer key)
+ {
+ super(client, cf, key);
+ }
+
+ public void tryPerformAction(ConsistencyLevel cl) throws Exception
+ {
+ assertColumnEqual(name, value, timestamp, getColumn(client, key, cf, name, cl));
+ }
+ }
+
+ protected class Insert extends RetryingAction
+ {
+ public Insert(Cassandra.Client client, String cf, ByteBuffer key)
+ {
+ super(client, cf, key);
+ }
+
+ public void tryPerformAction(ConsistencyLevel cl) throws Exception
+ {
+ insert(client, key, cf, name, value, timestamp, cl);
+ }
+ }
+
+ /** Performs an action repeatedly until timeout, success or failure. */
+ protected abstract class RetryingAction
+ {
+ protected Cassandra.Client client;
+ protected String cf;
+ protected ByteBuffer key;
+ protected String name;
+ protected String value;
+ protected long timestamp;
+
+ private Set<Class<Exception>> expected = new HashSet<Class<Exception>>();
+ private long timeout = StorageService.RING_DELAY;
+
+ public RetryingAction(Cassandra.Client client, String cf, ByteBuffer key)
+ {
+ this.client = client;
+ this.cf = cf;
+ this.key = key;
+ this.timestamp = 0;
+ }
+
+ public RetryingAction name(String name)
+ {
+ this.name = name; return this;
+ }
+
+ /** The value to expect for the return column, or null to expect the column to be missing. */
+ public RetryingAction value(String value)
+ {
+ this.value = value; return this;
+ }
+
+ /** The total time to allow before failing. */
+ public RetryingAction timeout(long timeout)
+ {
+ this.timeout = timeout; return this;
+ }
+
+ /** The expected timestamp of the returned column. */
+ public RetryingAction timestamp(long timestamp)
+ {
+ this.timestamp = timestamp; return this;
+ }
+
+ /** The exception classes that indicate success. */
+ public RetryingAction expecting(Class... tempExceptions)
+ {
+ this.expected.clear();
+ for (Class exclass : tempExceptions)
+ expected.add((Class<Exception>)exclass);
+ return this;
+ }
+
+ public void perform(ConsistencyLevel cl) throws AssertionError
+ {
+ long deadline = System.currentTimeMillis() + timeout;
+ int attempts = 0;
+ String template = "%s for " + this + " after %d attempt(s) with %d ms to spare.";
+ Exception e = null;
+ while(deadline > System.currentTimeMillis())
+ {
+ try
+ {
+ attempts++;
+ tryPerformAction(cl);
+ logger.info(String.format(template, "Succeeded", attempts, deadline - System.currentTimeMillis()));
+ return;
+ }
+ catch (Exception ex)
+ {
+ e = ex;
+ if (!expected.contains(ex.getClass()))
+ continue;
+ logger.info(String.format(template, "Caught expected exception: " + e, attempts, deadline - System.currentTimeMillis()));
+ return;
+ }
+ }
+ String err = String.format(template, "Caught unexpected: " + e, attempts, deadline - System.currentTimeMillis());
+ logger.error(err);
+ throw new AssertionError(err);
+ }
+
+ public String toString()
+ {
+ return this.getClass() + "(" + key + "," + name + ")";
+ }
+
+ protected abstract void tryPerformAction(ConsistencyLevel cl) throws Exception;
+ }
+
protected List<ColumnOrSuperColumn> get_slice(Cassandra.Client client, ByteBuffer key, String cf, ConsistencyLevel cl)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{