You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Jonathan Ellis (JIRA)" <ji...@apache.org> on 2010/06/11 19:06:14 UTC

[jira] Assigned: (CASSANDRA-1183) Column insert via batch_mutate under some conditions sometimes fails to add the column

     [ https://issues.apache.org/jira/browse/CASSANDRA-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis reassigned CASSANDRA-1183:
-----------------------------------------

    Assignee: Brandon Williams

> Column insert via batch_mutate under some conditions sometimes fails to add the column
> --------------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-1183
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-1183
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.6.1, 0.6.2
>         Environment: OS (via uname -a):  Linux 2.6.27.5-117.fc10.x86_64 #1 SMP Tue Nov 18 11:58:53 EST 2008 x86_64 x86_64 x86_64 GNU/Linux
> Java build 1.6.0_20-b02
> Cassandra 0.6.2 & 0.6.1
> hector-0.6.0-14.jar
>            Reporter: Scott McCarty
>            Assignee: Brandon Williams
>
> Under heavy load batch_mutate sometimes fails to add a column but doesn't report an error to the client.  Details:
> * client is running a tight loop that does:  write to randomly named column; do a get_slice that should return that newly created column; delete the column if it was returned in get_slice
> * the mutation map for batch_insert has exactly one mutation in it:  insert the column
> * if the column insert is done with the insert() API it doesn't ever fail (for as long as I've run it)
> * the column really isn't inserted as doing a 'get' on the entire rowkey in the command line interface doesn't show it, so it's not a timing issue
> * read consistency level is ONE, write consistency level is ALL
> * it fails on a single node cluster and on a 4 node cluster (replication factor = 3)
> I am including a Java cmdline program (below) that demonstrates the problem; it uses the 'hector' java client (I hope that's okay--I verified that no errors or warning were being masked by hector).  I didn't see a place to upload a file in bug reports so this program is in line.
> To show the failure using the program, just run it with the command line:
>   bash>   java BatchMutateBug localhost:9160 true keyspace columnfamily rowkey
> substituting in the appropriate Cassandra server connection info, a valid keyspace name, and a valid column family name.  The second parameter ("true" in the example) tells the program to use batch_mutate().  If you pass in "false" it will use insert() and the program won't fail.
> I've tried the above on two different setups and each one fails within 10-15 seconds generally.  It seems to fail quickest if you use a non-existing rowkey.
> Here's the program:
> import java.util.*;
> import org.apache.cassandra.thrift.*;
> import me.prettyprint.cassandra.service.*;
> public class BatchMutateBug
> {
>     private static String[]  cHosts;
>     private static String    cKeyspace;
>     public static void main(
>         String[] args
>         )
>     {
>         BatchMutateBug c = new BatchMutateBug();
>         if (args.length != 5)
>             System.out.println("Usage:  BatchMutateBug host:port useBatchFlag keyspace columnfamily rowkey");
>         else
>             c.contend(args[0], args[1], args[2], args[3], args[4]);
>     }
>     private void contend(
>         String        hostPort,
>         final String  batchFlag,
>         String        keyspace,
>         final String  cfname,
>         final String  rowkey
>         )
>     {
>         final  boolean  useBatch = Boolean.valueOf(batchFlag);
>         Thread          wr1;
>         cHosts = new String[] {hostPort};
>         cKeyspace = keyspace;
>         if (useBatch)
>             System.out.println("Using batchMutate:  you should see an ERROR eventually");
>         else
>             System.out.println("NOT using batchMutate:  you should NOT see an ERROR");
>         wr1 = new Thread()
>             {
>                 public void run()
>                 {
>                     try
>                     {
>                         writeRead(useBatch, cfname, rowkey);
>                     }
>                     catch (Exception x)
>                     {
>                         x.printStackTrace();
>                     }
>                 }
>             };
>         wr1.start();
>     }
>     /**
>      *
>      */
>     private void writeRead(
>         boolean useBatch,
>         String  cfname,
>         String  rowkey
>         )
>         throws Exception
>     {
>         Random       rand   = new Random();
>         ColumnParent par    = new ColumnParent(cfname);
>         ColumnPath   cp     = new ColumnPath(cfname);
>         Keyspace     ks;
>         String       colname;
>         String       rpart;
>         while (true)
>         {
>             boolean  pass = false;
>             ks      = getWriteKeyspace();
>             rpart   = Integer.toString(rand.nextInt());
>             colname = "%reader:" + rpart;
>             try
>             {
>                 if (useBatch)
>                 {
>                     Map<String, Map<String, List<Mutation>>> muts = new HashMap<String, Map<String, List<Mutation>>>();
>                     Map<String, List<Mutation>>              cfm  = new HashMap<String, List<Mutation>>();
>                     List<Mutation>                           mutl = new ArrayList<Mutation>();
>                     Mutation                                 mut  = new Mutation();
>                     mutl.add(mut);
>                     cfm.put(cfname, mutl);
>                     muts.put(rowkey, cfm);
>                     mut.column_or_supercolumn = new ColumnOrSuperColumn();
>                     mut.column_or_supercolumn.column = new Column();
>                     mut.column_or_supercolumn.column.name = colname.getBytes();
>                     mut.column_or_supercolumn.column.value = colname.getBytes();
>                     mut.column_or_supercolumn.column.timestamp = System.currentTimeMillis() * 1000L;
>                     ks.batchMutate(muts);
>                 }
>                 else
>                 {
>                     cp.column = colname.getBytes();
>                     ks.insert(rowkey, cp, colname.getBytes());
>                 }
>             }
>             finally
>             {
>                 releaseKeyspace(ks);
>             }
>             ks = getReadKeyspace();
>             try
>             {
>                 List<Column>   cols = readSlice(rowkey, cfname, "%", "&");
>                 if (cols.size() == 0)
>                 {
>                     System.out.println("ERROR:  column '" + colname + "' was inserted but getSlice returned 0 columns");
>                 }
>                 else
>                 {
>                     boolean found = false;
>                     for (Column c : cols)
>                     {
>                         String  fetched = new String(c.name);
>                         if (colname.equals(fetched))
>                         {
>                             found = true;
>                             pass = true;
>                             break;
>                         }
>                     }
>                     if (!found)
>                         System.out.println("ERROR:  column '" + colname + "' was inserted but getSlice did NOT return it");
>                 }
>             }
>             finally
>             {
>                 releaseKeyspace(ks);
>             }
>             ks = getWriteKeyspace();
>             if (pass)
>             {
>                 try
>                 {
>                     ks.remove(rowkey, cp);
>                 }
>                 finally
>                 {
>                     releaseKeyspace(ks);
>                 }
>             }
>         }
>     }
>     private List<Column> readSlice(
>         String    rowkey,
>         String    cfname,
>         String    start,
>         String    finish
>         )
>         throws Exception
>     {
>         Keyspace      ks   = getReadKeyspace();
>         List<Column>  cols = null;
>         ColumnParent  par  = new ColumnParent(cfname);
>         try
>         {
>             SlicePredicate sp;
>             sp = new SlicePredicate();
>             sp.slice_range = new SliceRange();
>             sp.slice_range.count = 100;
>             sp.slice_range.start = start.getBytes();
>             sp.slice_range.finish = finish.getBytes();
>             cols = ks.getSlice(rowkey, par, sp);
>         }
>         catch (NotFoundException nfe)
>         {
>             System.out.println("ERROR:  got NotFoundException");
>         }
>         finally
>         {
>             releaseKeyspace(ks);
>         }
>         return cols;
>     }
>     private Keyspace getReadKeyspace()
>     {
>         try
>         {
>             CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get();
>             CassandraClient     c   = ccp.borrowClient(cHosts);
>             me.prettyprint.cassandra.service.Keyspace ks;
>             ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ONE);
>             return ks;
>         }
>         catch (Exception x)
>         {
>             x.printStackTrace();
>         }
>         return null;
>     }
>     private Keyspace getWriteKeyspace()
>     {
>         try
>         {
>             CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get();
>             CassandraClient     c   = ccp.borrowClient(cHosts);
>             me.prettyprint.cassandra.service.Keyspace ks;
>             ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ALL);
>             return ks;
>         }
>         catch (Exception x)
>         {
>             x.printStackTrace();
>         }
>         return null;
>     }
>     protected static void releaseKeyspace(
>         Keyspace  ks
>         )
>     {
>         if (ks != null)
>         {
>             try
>             {
>                 CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get();
>                 ccp.releaseClient(ks.getClient());
>             }
>             catch (Exception x)
>             {
>                 x.printStackTrace();
>             }
>         }
>     }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.