You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/05/06 06:24:28 UTC

svn commit: r1100043 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java

Author: stack
Date: Fri May  6 04:24:28 2011
New Revision: 1100043

URL: http://svn.apache.org/viewvc?rev=1100043&view=rev
Log:
HBASE-3862 Race conditions in aggregate calculation

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1100043&r1=1100042&r2=1100043&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri May  6 04:24:28 2011
@@ -99,6 +99,7 @@ Release 0.91.0 - Unreleased
    HBASE-3777  Redefine Identity Of HBase Configuration (Karthick Sankarachary)
    HBASE-3849  Fix master ui; hbase-1502 broke requests/second
    HBASE-3853  Fix TestInfoServers to pass after HBASE-3835 (todd)
+   HBASE-3862  Race conditions in aggregate calculation (John Heitmann)
 
   IMPROVEMENTS
    HBASE-3290  Max Compaction Size (Nicolas Spiegelberg via Stack)  

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1100043&r1=1100042&r2=1100043&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Fri May  6 04:24:28 2011
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client.c
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,7 +93,7 @@ public class AggregationClient {
       }
 
       @Override
-      public void update(byte[] region, byte[] row, R result) {
+      public synchronized void update(byte[] region, byte[] row, R result) {
         max = ci.compare(max, result) < 0 ? result : max;
       }
     }
@@ -141,7 +142,7 @@ public class AggregationClient {
       }
 
       @Override
-      public void update(byte[] region, byte[] row, R result) {
+      public synchronized void update(byte[] region, byte[] row, R result) {
         min = (min == null || ci.compare(result, min) < 0) ? result : min;
       }
     }
@@ -176,15 +177,15 @@ public class AggregationClient {
       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
     validateParameters(scan);
     class RowNumCallback implements Batch.Callback<Long> {
-      private long rowCountL = 0l;
+      private final AtomicLong rowCountL = new AtomicLong(0);
 
       public long getRowNumCount() {
-        return rowCountL;
+        return rowCountL.get();
       }
 
       @Override
       public void update(byte[] region, byte[] row, Long result) {
-        rowCountL += result.longValue();
+        rowCountL.addAndGet(result.longValue());
       }
     }
     RowNumCallback rowNum = new RowNumCallback();
@@ -219,7 +220,7 @@ public class AggregationClient {
       }
 
       @Override
-      public void update(byte[] region, byte[] row, S result) {
+      public synchronized void update(byte[] region, byte[] row, S result) {
         sumVal = ci.add(sumVal, result);
       }
     }
@@ -255,7 +256,7 @@ public class AggregationClient {
       }
 
       @Override
-      public void update(byte[] region, byte[] row, Pair<S, Long> result) {
+      public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
         sum = ci.add(sum, result.getFirst());
         rowCount += result.getSecond();
       }
@@ -317,7 +318,7 @@ public class AggregationClient {
       }
 
       @Override
-      public void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
+      public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
         sumVal = ci.add(sumVal, result.getFirst().get(0));
         sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
         rowCountVal += result.getSecond();