You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/01 03:52:03 UTC

svn commit: r1075627 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Author: jbellis
Date: Tue Mar  1 02:52:03 2011
New Revision: 1075627

URL: http://svn.apache.org/viewvc?rev=1075627&view=rev
Log:
fix Hadoop ColumnFamilyOutputFormat droppingof mutations
patch by Eldon Stegall and Jeremy Hanna; reviewed by jbellis for CASSANDRA-2255

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1075627&r1=1075626&r2=1075627&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Mar  1 02:52:03 2011
@@ -325,7 +325,7 @@ implements org.apache.hadoop.mapred.Reco
                 }
 
                 Map<ByteBuffer, Map<String, List<Mutation>>> batch = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-                while (batch.size() < batchThreshold)
+                while (mutation != null)
                 {
                     Map<String, List<Mutation>> subBatch = batch.get(mutation.left);
                     if (subBatch == null)
@@ -333,10 +333,12 @@ implements org.apache.hadoop.mapred.Reco
                         subBatch = Collections.singletonMap(columnFamily, (List<Mutation>) new ArrayList<Mutation>());
                         batch.put(mutation.left, subBatch);
                     }
-                    
+
                     subBatch.get(columnFamily).add(mutation.right);
-                    if ((mutation = queue.poll()) == null)
+                    if (batch.size() >= batchThreshold)
                         break;
+
+                    mutation = queue.poll();
                 }
 
                 Iterator<InetAddress> iter = endpoints.iterator();