You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 20:52:51 UTC

svn commit: r1060938 - in /cassandra/branches/cassandra-0.7: CHANGES.txt test/distributed/org/apache/cassandra/MutationTest.java

Author: jbellis
Date: Wed Jan 19 19:52:51 2011
New Revision: 1060938

URL: http://svn.apache.org/viewvc?rev=1060938&view=rev
Log:
fix distributed-test MutationTest
patch by stuhood; reviewed by Pavel Yaskevich for CASSANDRA-1964

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1060938&r1=1060937&r2=1060938&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 19 19:52:51 2011
@@ -12,7 +12,7 @@
  * implement describeOwnership for BOP, COPP (CASSANDRA-1928)
  * make read repair behave as expected for ConsistencyLevel > ONE
    (CASSANDRA-982)
- * distributed test harness (CASSANDRA-1859)
+ * distributed test harness (CASSANDRA-1859, 1964)
  * reduce flush lock contention (CASSANDRA-1930)
  * optimize supercolumn deserialization (CASSANDRA-1891)
  * fix CFMetaData.apply to only compare objects of the same class 

Modified: cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java?rev=1060938&r1=1060937&r2=1060938&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java Wed Jan 19 19:52:51 2011
@@ -26,16 +26,18 @@ import java.io.IOException;
 import java.io.Writer;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.utils.WrappedRunnable;
-import  org.apache.thrift.TException;
+import org.apache.thrift.TException;
 import org.apache.cassandra.client.*;
 import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.service.StorageService;
 
 import org.apache.cassandra.CassandraServiceController.Failure;
 
@@ -48,6 +50,8 @@ import static junit.framework.Assert.ass
 
 public class MutationTest extends TestBase
 {
+    private static final Logger logger = LoggerFactory.getLogger(MutationTest.class);
+
     @Test
     public void testInsert() throws Exception
     {
@@ -62,9 +66,9 @@ public class MutationTest extends TestBa
         insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
         insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
 
-
-        assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
-        assertColumnEqual("c2", "v2", 0, getColumn(client, key, "Standard1", "c2", ConsistencyLevel.ONE));
+        // block until the column is available
+        new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
+        new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);
 
         List<ColumnOrSuperColumn> coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
         assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
@@ -84,24 +88,22 @@ public class MutationTest extends TestBa
         ByteBuffer key = newKey();
 
         insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ALL);
+        // should be instantly available
         assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
 
         List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, keyspace);
         InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
         Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size()));
 
-        Thread.sleep(10000); // let gossip catch up
-
         try {
             client = controller.createClient(coordinator);
             client.set_keyspace(keyspace);
 
-            assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", "c1", ConsistencyLevel.ONE));
+            new Get(client, "Standard1", key).name("c1").value("v1")
+                .perform(ConsistencyLevel.ONE);
 
-            insert(client, key, "Standard1", "c3", "v3", 0, ConsistencyLevel.ALL);
-            assert false;
-        } catch (UnavailableException e) {
-            // [this is good]
+            new Insert(client, "Standard1", key).name("c3").value("v3")
+                .expecting(UnavailableException.class).perform(ConsistencyLevel.ALL);
         } finally {
             failure.resolve();
             Thread.sleep(10000);
@@ -125,26 +127,21 @@ public class MutationTest extends TestBa
         InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, keyspace);
         Failure failure = controller.failHosts(endpoints.subList(1, endpoints.size())); //kill all but one nodes
 
-        Thread.sleep(10000);
         client = controller.createClient(coordinator);
         client.set_keyspace(keyspace);
         try {
-            insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.QUORUM);
-            assert false;
-        } catch (UnavailableException e) {
-            // [this is good]
+            new Insert(client, "Standard1", key).name("c1").value("v1")
+                .expecting(UnavailableException.class).perform(ConsistencyLevel.QUORUM);
         } finally {
             failure.resolve();
-            Thread.sleep(10000);
         }
 
         // with all nodes up
-        insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.QUORUM);
+        new Insert(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
 
         failure = controller.failHosts(endpoints.get(0));
-        Thread.sleep(10000);
         try {
-            getColumn(client, key, "Standard1", "c2", ConsistencyLevel.QUORUM);
+            new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.QUORUM);
         } finally {
             failure.resolve();
             Thread.sleep(10000);
@@ -180,12 +177,9 @@ public class MutationTest extends TestBa
             // read with all (success)
 
         Failure failure = controller.failHosts(endpoints);
-        Thread.sleep(10000);
         try {
-            insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
-            assert false;
-        } catch (UnavailableException e) {
-            // this is good
+            new Insert(client, "Standard1", key).name("c2").value("v2")
+                .expecting(UnavailableException.class).perform(ConsistencyLevel.ONE);
         } finally {
             failure.resolve();
         }
@@ -210,6 +204,122 @@ public class MutationTest extends TestBa
         return client.get(key, cpath, cl).column;
     }
 
+    protected class Get extends RetryingAction
+    {
+        public Get(Cassandra.Client client, String cf, ByteBuffer key)
+        {
+            super(client, cf, key);
+        }
+
+        public void tryPerformAction(ConsistencyLevel cl) throws Exception
+        {
+            assertColumnEqual(name, value, timestamp, getColumn(client, key, cf, name, cl));
+        }
+    }
+
+    protected class Insert extends RetryingAction
+    {
+        public Insert(Cassandra.Client client, String cf, ByteBuffer key)
+        {
+            super(client, cf, key);
+        }
+
+        public void tryPerformAction(ConsistencyLevel cl) throws Exception
+        {
+            insert(client, key, cf, name, value, timestamp, cl);
+        }
+    }
+
+    /** Performs an action repeatedly until timeout, success or failure. */
+    protected abstract class RetryingAction
+    {
+        protected Cassandra.Client client;
+        protected String cf;
+        protected ByteBuffer key;
+        protected String name;
+        protected String value;
+        protected long timestamp;
+
+        private Set<Class<Exception>> expected = new HashSet<Class<Exception>>();
+        private long timeout = StorageService.RING_DELAY;
+
+        public RetryingAction(Cassandra.Client client, String cf, ByteBuffer key)
+        {
+            this.client = client;
+            this.cf = cf;
+            this.key = key;
+            this.timestamp = 0;
+        }
+
+        public RetryingAction name(String name)
+        {
+            this.name = name; return this;
+        }
+
+        /** The value to expect for the return column, or null to expect the column to be missing. */
+        public RetryingAction value(String value)
+        {
+            this.value = value; return this;
+        }
+        
+        /** The total time to allow before failing. */
+        public RetryingAction timeout(long timeout)
+        {
+            this.timeout = timeout; return this;
+        }
+
+        /** The expected timestamp of the returned column. */
+        public RetryingAction timestamp(long timestamp)
+        {
+            this.timestamp = timestamp; return this;
+        }
+
+        /** The exception classes that indicate success. */
+        public RetryingAction expecting(Class... tempExceptions)
+        {
+            this.expected.clear();
+            for (Class exclass : tempExceptions)
+                expected.add((Class<Exception>)exclass);
+            return this;
+        }
+
+        public void perform(ConsistencyLevel cl) throws AssertionError
+        {
+            long deadline = System.currentTimeMillis() + timeout;
+            int attempts = 0;
+            String template = "%s for " + this + " after %d attempt(s) with %d ms to spare.";
+            Exception e = null;
+            while(deadline > System.currentTimeMillis())
+            {
+                try
+                {
+                    attempts++;
+                    tryPerformAction(cl);
+                    logger.info(String.format(template, "Succeeded", attempts, deadline - System.currentTimeMillis()));
+                    return;
+                }
+                catch (Exception ex)
+                {
+                    e = ex;
+                    if (!expected.contains(ex.getClass()))
+                        continue;
+                    logger.info(String.format(template, "Caught expected exception: " + e, attempts, deadline - System.currentTimeMillis()));
+                    return;
+                }
+            }
+            String err = String.format(template, "Caught unexpected: " + e, attempts, deadline - System.currentTimeMillis());
+            logger.error(err);
+            throw new AssertionError(err);
+        }
+        
+        public String toString()
+        {
+            return this.getClass() + "(" + key + "," + name + ")";
+        }
+
+        protected abstract void tryPerformAction(ConsistencyLevel cl) throws Exception;
+    }
+
     protected List<ColumnOrSuperColumn> get_slice(Cassandra.Client client, ByteBuffer key, String cf, ConsistencyLevel cl)
       throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {