You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/05 16:54:06 UTC

svn commit: r771761 [3/3] - in /incubator/cassandra/trunk: interface/ interface/gen-java/org/apache/cassandra/service/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/test/ test/system/

Added: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/UnavailableException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/UnavailableException.java?rev=771761&view=auto
==============================================================================
--- incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/UnavailableException.java (added)
+++ incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/UnavailableException.java Tue May  5 14:54:05 2009
@@ -0,0 +1,135 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import org.apache.log4j.Logger;
+
+import org.apache.thrift.*;
+import org.apache.thrift.meta_data.*;
+import org.apache.thrift.protocol.*;
+
+public class UnavailableException extends Exception implements TBase, java.io.Serializable, Cloneable {
+  private static final TStruct STRUCT_DESC = new TStruct("UnavailableException");
+
+  public static final Map<Integer, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
+  }});
+
+  static {
+    FieldMetaData.addStructMetaDataMap(UnavailableException.class, metaDataMap);
+  }
+
+  public UnavailableException() {
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public UnavailableException(UnavailableException other) {
+  }
+
+  @Override
+  public UnavailableException clone() {
+    return new UnavailableException(this);
+  }
+
+  public void setFieldValue(int fieldID, Object value) {
+    switch (fieldID) {
+    default:
+      throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+    }
+  }
+
+  public Object getFieldValue(int fieldID) {
+    switch (fieldID) {
+    default:
+      throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+    }
+  }
+
+  // Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise
+  public boolean isSet(int fieldID) {
+    switch (fieldID) {
+    default:
+      throw new IllegalArgumentException("Field " + fieldID + " doesn't exist!");
+    }
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof UnavailableException)
+      return this.equals((UnavailableException)that);
+    return false;
+  }
+
+  public boolean equals(UnavailableException that) {
+    if (that == null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+
+    // check for required fields of primitive type, which can't be checked in the validate method
+    validate();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("UnavailableException(");
+    boolean first = true;
+
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws TException {
+    // check for required fields
+    // check that fields of type enum have valid values
+  }
+
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=771761&r1=771760&r2=771761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue May  5 14:54:05 2009
@@ -21,11 +21,7 @@
 
 import org.antlr.runtime.tree.*;
 import org.apache.cassandra.cql.common.Utils;
-import org.apache.cassandra.service.Cassandra;
-import org.apache.cassandra.service.CqlResult_t;
-import org.apache.cassandra.service.column_t;
-import org.apache.cassandra.service.NotFoundException;
-import org.apache.cassandra.service.InvalidRequestException;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.utils.LogUtil;
 
 import java.util.*;
@@ -43,7 +39,7 @@
     }
 
     // Execute a CLI Statement 
-    public void executeCLIStmt(String stmt) throws TException, NotFoundException, InvalidRequestException
+    public void executeCLIStmt(String stmt) throws TException, NotFoundException, InvalidRequestException, UnavailableException
     {
         CommonTree ast = null;
 
@@ -165,7 +161,7 @@
     }
 
     // Execute SET statement
-    private void executeSet(CommonTree ast) throws TException
+    private void executeSet(CommonTree ast) throws TException, InvalidRequestException, UnavailableException
     {
         if (!CliMain.isConnected())
             return;
@@ -192,7 +188,7 @@
 
             // do the insert
             thriftClient_.insert(tableName, key, columnFamily + ":" + columnName,
-                                 value.getBytes(), System.currentTimeMillis());
+                                 value.getBytes(), System.currentTimeMillis(), true);
 
             css_.out.println("Value inserted.");
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=771761&r1=771760&r2=771761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Tue May  5 14:54:05 2009
@@ -21,12 +21,7 @@
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Arrays;
+import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.log4j.Logger;
@@ -282,66 +277,57 @@
         return columns.size();
 	}
 
-    public void insert(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp)
-	{
+    public void insert(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp, boolean block)
+    throws InvalidRequestException, UnavailableException
+    {
         logger.debug("insert");
         RowMutation rm = new RowMutation(tablename, key.trim());
         rm.add(columnFamily_column, cellData, timestamp);
-        try
+        Set<String> cfNames = rm.columnFamilyNames();
+        validateCommand(rm.key(), rm.table(), cfNames.toArray(new String[cfNames.size()]));
+
+        if (block)
         {
-            validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
+            StorageProxy.insertBlocking(rm);
         }
-        catch (InvalidRequestException e)
+        else
         {
-            throw new RuntimeException(e);
+            StorageProxy.insert(rm);
         }
-        StorageProxy.insert(rm);
-	}
-    
-    public boolean insert_blocking(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp) throws InvalidRequestException
-    {
-        logger.debug("insert_blocking");
-        RowMutation rm = new RowMutation(tablename, key.trim());
-        rm.add(columnFamily_column, cellData, timestamp);
-        validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
-        return StorageProxy.insertBlocking(rm);
     }
 
-    public boolean batch_insert_blocking(batch_mutation_t batchMutation) throws InvalidRequestException
-    {
-        logger.debug("batch_insert_blocking");
-        RowMutation rm = RowMutation.getRowMutation(batchMutation);
-        validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
-        return StorageProxy.insertBlocking(rm);
-    }
-
-	public void batch_insert(batch_mutation_t batchMutation)
+    public void batch_insert(batch_mutation_t batchMutation, boolean block) throws InvalidRequestException, UnavailableException
     {
         logger.debug("batch_insert");
         RowMutation rm = RowMutation.getRowMutation(batchMutation);
-        try
+        Set<String> cfNames = rm.columnFamilyNames();
+        validateCommand(rm.key(), rm.table(), cfNames.toArray(new String[cfNames.size()]));
+
+        if (block)
         {
-            validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
+            StorageProxy.insertBlocking(rm);
         }
-        catch (InvalidRequestException e)
+        else
         {
-            // it would be confusing to declare an exception in thrift that can't be returned to the client
-            throw new RuntimeException(e);
+            StorageProxy.insert(rm);
         }
-        StorageProxy.insert(rm);
-	}
+    }
 
-    public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) throws InvalidRequestException
+    public void remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block)
+    throws InvalidRequestException, UnavailableException
     {
         logger.debug("remove");
         RowMutation rm = new RowMutation(tablename, key.trim());
         rm.delete(columnFamily_column, timestamp);
-        validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
-        if (block) {
-            return StorageProxy.insertBlocking(rm);
-        } else {
+        Set<String> cfNames = rm.columnFamilyNames();
+        validateCommand(rm.key(), rm.table(), cfNames.toArray(new String[cfNames.size()]));
+        if (block)
+        {
+            StorageProxy.insertBlocking(rm);
+        }
+        else
+        {
             StorageProxy.insert(rm);
-            return true;
         }
 	}
 
@@ -412,29 +398,21 @@
 
         return new superColumn_t(column.name(), thriftifyColumns(column.getSubColumns()));
     }
-    
-    public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper) throws InvalidRequestException
-    {
-        logger.debug("batch_insert_SuperColumn_blocking");
-        RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
-        validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
-        return StorageProxy.insertBlocking(rm);
-    }
 
-    public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
+    public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper, boolean block) throws InvalidRequestException, UnavailableException
     {
         logger.debug("batch_insert_SuperColumn");
         RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
-        try
+        Set<String> cfNames = rm.columnFamilyNames();
+        validateCommand(rm.key(), rm.table(), cfNames.toArray(new String[cfNames.size()]));
+        if (block)
         {
-            validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
+            StorageProxy.insertBlocking(rm);
         }
-        catch (InvalidRequestException e)
+        else
         {
-            // it would be confusing to declare an exception in thrift that can't be returned to the client
-            throw new RuntimeException(e);
+            StorageProxy.insert(rm);
         }
-        StorageProxy.insert(rm);
     }
 
     public String getStringProperty(String propertyName)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=771761&r1=771760&r2=771761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue May  5 14:54:05 2009
@@ -140,13 +140,20 @@
         }
     }
 
-    public static boolean insertBlocking(RowMutation rm)
+    public static void insertBlocking(RowMutation rm) throws UnavailableException
     {
         long startTime = System.currentTimeMillis();
+        Message message = null;
+        try
+        {
+            message = rm.makeRowMutationMessage();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
         try
         {
-            Message message = rm.makeRowMutationMessage();
-
             IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
             QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
                     DatabaseDescriptor.getReplicationFactor(),
@@ -156,15 +163,13 @@
             // TODO: throw a thrift exception if we do not have N nodes
 
             MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
-            return quorumResponseHandler.get();
-
-            // TODO: if the result is false that means the writes to all the
-            // servers failed hence we need to throw an exception or return an
-            // error back to the client so that it can take appropriate action.
+            if (!quorumResponseHandler.get())
+                throw new UnavailableException();
         }
         catch (Exception e)
         {
-            throw new RuntimeException(e);
+            logger.error(e);
+            throw new UnavailableException();
         }
         finally
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java?rev=771761&r1=771760&r2=771761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java Tue May  5 14:54:05 2009
@@ -410,11 +410,11 @@
 				Thread.sleep(0, 1000000000/requestsPerSecond_);
 			else
 				Thread.sleep(1000/requestsPerSecond_);
-			peerstorageClient_.insert(table, key, columnFamily, bytes, ts);
+			peerstorageClient_.insert(table, key, columnFamily, bytes, ts, false);
 		} catch (Exception e) {
 			try {
 				peerstorageClient_ = connect();
-				peerstorageClient_.insert(table, key, columnFamily, bytes, ts);
+				peerstorageClient_.insert(table, key, columnFamily, bytes, ts, false);
 			} catch (Exception e1) {
 				e1.printStackTrace();
 			}
@@ -429,11 +429,11 @@
 				Thread.sleep(0, 1000000000/requestsPerSecond_);
 			else
 				Thread.sleep(1000/requestsPerSecond_);
-			peerstorageClient_.batch_insert(batchMutation);
+			peerstorageClient_.batch_insert(batchMutation, false);
 		} catch (Exception e) {
 			try {
 				peerstorageClient_ = connect();
-				peerstorageClient_.batch_insert(batchMutation);
+				peerstorageClient_.batch_insert(batchMutation, false);
 			} catch (Exception e1) {
 				e1.printStackTrace();
 			}
@@ -448,13 +448,13 @@
 			else
 				Thread.sleep(1000/requestsPerSecond_);
 			long t = System.currentTimeMillis();
-			peerstorageClient_.batch_insert_superColumn(batchMutation);
+			peerstorageClient_.batch_insert_superColumn(batchMutation, false);
 			logger_.debug("Time taken for thrift..."
 					+ (System.currentTimeMillis() - t));
 		} catch (Exception e) {
 			try {
 				peerstorageClient_ = connect();
-				peerstorageClient_.batch_insert_superColumn(batchMutation);
+				peerstorageClient_.batch_insert_superColumn(batchMutation, false);
 			} catch (Exception e1) {
 				e1.printStackTrace();
 			}
@@ -651,26 +651,6 @@
 		System.out.println(System.currentTimeMillis() - time);
 	}
 	
-	public void testCommitLog() throws Throwable
-	{
-        Random random = new Random(System.currentTimeMillis());
-    	byte[] bytes = new byte[4096];
-    	random.nextBytes(bytes);
-    	byte[] bytes1 = new byte[64];
-    	random.nextBytes(bytes1);
-    	peerstorageClient_ = connect();
-    	int t = 0 ;
-    	while( true )
-    	{
-	    	int key = random.nextInt();
-	    	int threadId = random.nextInt();
-	    	int word = random.nextInt();
-			peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxMailList0:" + Integer.toString(threadId), bytes1, t++);
-			peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxThreadList0:" + Integer.toString(word) + ":" + Integer.toString(threadId), bytes, t++);
-			peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxUserList0:"+ Integer.toString(word) + ":" + Integer.toString(threadId), bytes, t++);
-    	}
-	}
-
 	JSAPResult ParseArguments(String[] args)
 	{
         JSAPResult config = null;    
@@ -755,11 +735,6 @@
 //			stressMailboxWrites();
 			return;
 		}
-		if(config.getInt("commitLogTest") == 1)
-		{
-			testCommitLog();
-			return;
-		}
 		if(config.getInt("thrift") == 0)
 		{
 			if(config.getInt("supercolumns") == 0)

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=771761&r1=771760&r2=771761&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Tue May  5 14:54:05 2009
@@ -13,16 +13,14 @@
                                 columns=[column_t(columnName='c5', value='value5', timestamp=0),
                                          column_t(columnName='c6', value='value6', timestamp=0)])]
 
-def _insert_simple(method=client.insert_blocking):
-    v1 = method('Table1', 'key1', 'Standard1:c1', 'value1', 0)
-    v2 = method('Table1', 'key1', 'Standard1:c2', 'value2', 0)
-    assert v1 == v2
-    return v1
+def _insert_simple(block=True):
+    client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 0, block)
+    client.insert('Table1', 'key1', 'Standard1:c2', 'value2', 0, block)
 
-def _insert_batch(method):
+def _insert_batch(block):
     cfmap = {'Standard1': _SIMPLE_COLUMNS,
              'Standard2': _SIMPLE_COLUMNS}
-    return method(batch_mutation_t(table='Table1', key='key1', cfmap=cfmap))
+    client.batch_insert(batch_mutation_t(table='Table1', key='key1', cfmap=cfmap), block)
 
 def _verify_batch():
     _verify_simple()
@@ -36,11 +34,10 @@
     assert L == _SIMPLE_COLUMNS, L
 
 def _insert_super():
-    v1 = client.insert_blocking('Table1', 'key1', 'Super1:sc1:c4', 'value4', 0)
-    v2 = client.insert_blocking('Table1', 'key1', 'Super1:sc2:c5', 'value5', 0)
-    v3 = client.insert_blocking('Table1', 'key1', 'Super1:sc2:c6', 'value6', 0)
-    assert v1 == v2 == v3
-    return v1
+    client.insert('Table1', 'key1', 'Super1:sc1:c4', 'value4', 0, False)
+    client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 0, False)
+    client.insert('Table1', 'key1', 'Super1:sc2:c6', 'value6', 0, False)
+    time.sleep(0.1)
 
 def _verify_super(supercolumn='Super1'):
     assert client.get_column('Table1', 'key1', supercolumn + ':sc1:c4') == \
@@ -71,12 +68,12 @@
         assert client.get_column_count('Table1', 'key1', 'Standard2') == 0
 
     def test_insert(self):
-        _insert_simple(client.insert)
+        _insert_simple(False)
         time.sleep(0.1)
         _verify_simple()
 
     def test_insert_blocking(self):
-        assert _insert_simple(client.insert_blocking)
+        _insert_simple()
         _verify_simple()
 
     def test_super_insert(self):
@@ -84,18 +81,18 @@
         _verify_super()
 
     def test_batch_insert(self):
-        _insert_batch(client.batch_insert)
+        _insert_batch(False)
         time.sleep(0.1)
         _verify_batch()
 
     def test_batch_insert_blocking(self):
-        assert _insert_batch(client.batch_insert_blocking)
+        _insert_batch(True)
         _verify_batch()
 
     def test_batch_insert_super(self):
          cfmap = {'Super1': _SUPER_COLUMNS,
                   'Super2': _SUPER_COLUMNS}
-         client.batch_insert_superColumn(batch_mutation_t(table='Table1', key='key1', cfmap=cfmap))
+         client.batch_insert_superColumn(batch_mutation_t(table='Table1', key='key1', cfmap=cfmap), False)
          time.sleep(0.1)
          _verify_super('Super1')
          _verify_super('Super2')
@@ -103,12 +100,12 @@
     def test_batch_insert_super_blocking(self):
          cfmap = {'Super1': _SUPER_COLUMNS,
                   'Super2': _SUPER_COLUMNS}
-         client.batch_insert_superColumn_blocking(batch_mutation_t(table='Table1', key='key1', cfmap=cfmap))
+         client.batch_insert_superColumn(batch_mutation_t(table='Table1', key='key1', cfmap=cfmap), True)
          _verify_super('Super1')
          _verify_super('Super2')
 
     def test_cf_remove_column(self):
-        assert _insert_simple()
+        _insert_simple()
         client.remove('Table1', 'key1', 'Standard1:c1', 1, True)
         time.sleep(0.1)
         _expect_missing(lambda: client.get_column('Table1', 'key1', 'Standard1:c1'))
@@ -118,22 +115,19 @@
             [column_t(columnName='c2', value='value2', timestamp=0)]
 
         # New insert, make sure it shows up post-remove:
-        client.insert('Table1', 'key1', 'Standard1:c3', 'value3', 0)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Standard1:c3', 'value3', 0, True)
         assert client.get_slice('Table1', 'key1', 'Standard1', -1, -1) == \
             [column_t(columnName='c2', value='value2', timestamp=0), 
              column_t(columnName='c3', value='value3', timestamp=0)]
 
         # Test resurrection.  First, re-insert the value w/ older timestamp, 
         # and make sure it stays removed:
-        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 0)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 0, True)
         assert client.get_slice('Table1', 'key1', 'Standard1', -1, -1) == \
             [column_t(columnName='c2', value='value2', timestamp=0), 
              column_t(columnName='c3', value='value3', timestamp=0)]
         # Next, w/ a newer timestamp; it should come back:
-        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 2)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 2, True)
         assert client.get_slice('Table1', 'key1', 'Standard1', -1, -1) == \
             [column_t(columnName='c1', value='value1', timestamp=2),
              column_t(columnName='c2', value='value2', timestamp=0), 
@@ -141,8 +135,8 @@
 
 
     def test_cf_remove(self):
-        assert _insert_simple()
-        assert _insert_super()
+        _insert_simple()
+        _insert_super()
 
         # Remove the key1:Standard1 cf:
         client.remove('Table1', 'key1', 'Standard1', 3, True)
@@ -152,19 +146,17 @@
 
         # Test resurrection.  First, re-insert a value w/ older timestamp, 
         # and make sure it stays removed:
-        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 0)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 0, True)
         assert client.get_slice('Table1', 'key1', 'Standard1', -1, -1) == []
         # Next, w/ a newer timestamp; it should come back:
-        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 4)
-        # time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Standard1:c1', 'value1', 4, True)
         assert client.get_slice('Table1', 'key1', 'Standard1', -1, -1) == \
             [column_t(columnName='c1', value='value1', timestamp=4)]
 
 
     def test_super_cf_remove_column(self):
-        assert _insert_simple()
-        assert _insert_super()
+        _insert_simple()
+        _insert_super()
 
         # Make sure remove clears out what it's supposed to, and _only_ that:
         client.remove('Table1', 'key1', 'Super1:sc2:c5', 5, True)
@@ -178,8 +170,7 @@
         _verify_simple()
 
         # New insert, make sure it shows up post-remove:
-        client.insert('Table1', 'key1', 'Super1:sc2:c7', 'value7', 0)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Super1:sc2:c7', 'value7', 0, True)
         scs = [superColumn_t(name='sc1', 
                              columns=[column_t(columnName='c4', value='value4', timestamp=0)]),
                superColumn_t(name='sc2', 
@@ -190,14 +181,12 @@
 
         # Test resurrection.  First, re-insert the value w/ older timestamp, 
         # and make sure it stays removed:
-        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 0)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 0, True)
         actual = client.get_slice_super('Table1', 'key1', 'Super1', -1, -1)
         assert actual == scs, actual
 
         # Next, w/ a newer timestamp; it should come back
-        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 6)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 6, True)
         actual = client.get_slice_super('Table1', 'key1', 'Super1', -1, -1)
         assert actual == \
             [superColumn_t(name='sc1', 
@@ -208,8 +197,8 @@
                                     column_t(columnName='c7', value='value7', timestamp=0)])], actual
 
     def test_super_cf_remove_supercolumn(self):
-        assert _insert_simple()
-        assert _insert_super()
+        _insert_simple()
+        _insert_super()
 
         # Make sure remove clears out what it's supposed to, and _only_ that:
         client.remove('Table1', 'key1', 'Super1:sc2', 5, True)
@@ -225,14 +214,12 @@
 
         # Test resurrection.  First, re-insert the value w/ older timestamp, 
         # and make sure it stays removed:
-        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 0)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 0, True)
         actual = client.get_slice_super('Table1', 'key1', 'Super1', -1, -1)
         assert actual == scs, actual
 
         # Next, w/ a newer timestamp; it should come back
-        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 6)
-        time.sleep(0.1)
+        client.insert('Table1', 'key1', 'Super1:sc2:c5', 'value5', 6, True)
         actual = client.get_slice_super('Table1', 'key1', 'Super1', -1, -1)
         assert actual == \
             [superColumn_t(name='sc1', 
@@ -245,7 +232,7 @@
         assert client.get_key_range('Table1', '', '', 1000) == []
 
     def test_range_with_remove(self):
-        assert _insert_simple()
+        _insert_simple()
         assert client.get_key_range('Table1', 'key1', '', 1000) == ['key1']
 
         client.remove('Table1', 'key1', 'Standard1:c1', 1, True)
@@ -254,14 +241,14 @@
 
     def test_range_collation(self):
         for key in ['-a', '-b', 'a', 'b'] + [str(i) for i in xrange(100)]:
-            assert client.insert_blocking('Table1', key, 'Standard1:' + key, 'v', 0)
+            client.insert('Table1', key, 'Standard1:' + key, 'v', 0, True)
         L = client.get_key_range('Table1', '', '', 1000)
         # note the collated ordering rather than ascii
         assert L == ['0', '1', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '2', '20', '21', '22', '23', '24', '25', '26', '27','28', '29', '3', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '4', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '5', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '6', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '7', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '8', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '9', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', 'a', '-a', 'b', '-b'], L
 
     def test_range_partial(self):
         for key in ['-a', '-b', 'a', 'b'] + [str(i) for i in xrange(100)]:
-            assert client.insert_blocking('Table1', key, 'Standard1:' + key, 'v', 0)
+            client.insert('Table1', key, 'Standard1:' + key, 'v', 0, True)
 
         L = client.get_key_range('Table1', 'a', '', 1000)
         assert L == ['a', '-a', 'b', '-b'], L