You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC

svn commit: r1446147 [28/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/...

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Thu Feb 14 12:58:12 2013
@@ -27,7 +27,10 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -37,6 +40,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -55,6 +61,7 @@ public class TestHCM {
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static final byte[] TABLE_NAME = Bytes.toBytes("test");
   private static final byte[] TABLE_NAME1 = Bytes.toBytes("test1");
+  private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2");
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
 
@@ -67,25 +74,6 @@ public class TestHCM {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  /**
-   * @throws InterruptedException 
-   * @throws IllegalAccessException 
-   * @throws NoSuchFieldException 
-   * @throws ZooKeeperConnectionException 
-   * @throws IllegalArgumentException 
-   * @throws SecurityException 
-   * @see https://issues.apache.org/jira/browse/HBASE-2925
-   */
-  // Disabling.  Of course this test will OOME using new Configuration each time
-  // St.Ack 20110428
-  // @Test
-  public void testManyNewConnectionsDoesnotOOME()
-  throws SecurityException, IllegalArgumentException,
-  ZooKeeperConnectionException, NoSuchFieldException, IllegalAccessException,
-  InterruptedException {
-    createNewConfigurations();
-  }
-
   private static Random _randy = new Random();
 
   public static void createNewConfigurations() throws SecurityException,
@@ -118,6 +106,28 @@ public class TestHCM {
   private static int getHConnectionManagerCacheSize(){
     return HConnectionTestingUtility.getConnectionCount();
   }
+  
+  @Test
+  public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
+    // Save off current HConnections
+    Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances = 
+        new HashMap<HConnectionKey, HConnectionImplementation>();
+    oldHBaseInstances.putAll(HConnectionManager.HBASE_INSTANCES);
+    
+    HConnectionManager.HBASE_INSTANCES.clear();
+
+    try {
+      HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+      connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
+          "test abortingHConnectionRemovesItselfFromHCM"));
+      Assert.assertNotSame(connection,
+        HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
+    } finally {
+      // Put original HConnections back
+      HConnectionManager.HBASE_INSTANCES.clear();
+      HConnectionManager.HBASE_INSTANCES.putAll(oldHBaseInstances);
+    }
+  }
 
   /**
    * Test that when we delete a location using the first row of a region
@@ -140,10 +150,12 @@ public class TestHCM {
       Bytes.toString(TABLE_NAME).getBytes() , Bytes.toString(ROW).getBytes()));
 
     final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
-    conn.updateCachedLocation(conn.getCachedLocation(TABLE_NAME, ROW), "127.0.0.1", nextPort);
+    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
+    conn.updateCachedLocation(loc.getRegionInfo(), loc, "127.0.0.1", nextPort,
+      HConstants.LATEST_TIMESTAMP);
     Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
 
-    conn.deleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
+    conn.forceDeleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
     HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
     assertNull("What is this location?? " + rl, rl);
 
@@ -156,18 +168,21 @@ public class TestHCM {
     table.put(put2);
     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
 
+    TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
     // We can wait for all regions to be online, that makes log reading easier when debugging
-    while (TEST_UTIL.getMiniHBaseCluster().getMaster().
-      getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+    while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+      Thread.sleep(1);
     }
 
     // Now moving the region to the second server
-    TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
     HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW);
     byte[] regionName = toMove.getRegionInfo().getRegionName();
+    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
 
     // Choose the other server.
-    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith( regionName  );
+    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
     int destServerId = (curServerId == 0 ? 1 : 0);
 
     HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
@@ -181,6 +196,8 @@ public class TestHCM {
     Assert.assertFalse( toMove.getPort() == destServerName.getPort());
     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
     Assert.assertNull(destServer.getOnlineRegion(regionName));
+    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
+        getAssignmentManager().getRegionStates().isRegionsInTransition());
 
     // Moving. It's possible that we don't have all the regions online at this point, so
     //  the test must depends only on the region we're looking at.
@@ -190,18 +207,26 @@ public class TestHCM {
       destServerName.getServerName().getBytes()
     );
 
-    while ( destServer.getOnlineRegion(regionName) == null ){
+    while (destServer.getOnlineRegion(regionName) == null ||
+        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+        master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
       // wait for the move to be finished
+      Thread.sleep(1);
     }
 
+    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
 
     // Check our new state.
     Assert.assertNull(curServer.getOnlineRegion(regionName));
     Assert.assertNotNull(destServer.getOnlineRegion(regionName));
-    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
+    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+
 
     // Cache was NOT updated and points to the wrong server
-    Assert.assertFalse( conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
+    Assert.assertFalse(
+        conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
 
     // Hijack the number of retry to fail immediately instead of retrying: there will be no new
     //  connection to the master
@@ -233,6 +258,8 @@ public class TestHCM {
       "Previous server was "+curServer.getServerName().getHostAndPort(),
       destServerName.getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
 
+    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
 
     // We move it back to do another test with a scan
     LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
@@ -241,8 +268,12 @@ public class TestHCM {
       curServer.getServerName().getServerName().getBytes()
     );
 
-    while ( curServer.getOnlineRegion(regionName) == null ){
+    while (curServer.getOnlineRegion(regionName) == null ||
+        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+        master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
       // wait for the move to be finished
+      Thread.sleep(1);
     }
 
     // Check our new state.
@@ -260,11 +291,12 @@ public class TestHCM {
     sc.setStopRow(ROW);
 
     try {
-    ResultScanner rs = table.getScanner(sc);
-    while (rs.next() != null){}
+      ResultScanner rs = table.getScanner(sc);
+      while (rs.next() != null) {
+      }
       Assert.assertFalse("Unreachable point", true);
-    }catch (Throwable e){
-      LOG.info("Put done, expected exception caught: "+e.getClass());
+    } catch (Throwable e) {
+      LOG.info("Scan done, expected exception caught: " + e.getClass());
     }
 
     // Cache is updated with the right value.
@@ -302,7 +334,55 @@ public class TestHCM {
   }
 
   /**
-   * Make sure that {@link HConfiguration} instances that are essentially the
+   * Test that stale cache updates don't override newer cached values.
+   */
+  @Test(timeout = 60000)
+  public void testCacheSeqNums() throws Exception{
+    HTable table = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAM);
+    TEST_UTIL.createMultiRegions(table, FAM_NAM);
+    Put put = new Put(ROW);
+    put.add(FAM_NAM, ROW, ROW);
+    table.put(put);
+    HConnectionManager.HConnectionImplementation conn =
+      (HConnectionManager.HConnectionImplementation)table.getConnection();
+
+    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
+    assertNotNull(location);
+
+    HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(),
+        location.getHostname(), location.getPort() - 1);
+
+    // Same server as already in cache reporting - overwrites any value despite seqNum.
+    int nextPort = location.getPort() + 1;
+    conn.updateCachedLocation(location.getRegionInfo(), location,
+        "127.0.0.1", nextPort, location.getSeqNum() - 1);
+    location = conn.getCachedLocation(TABLE_NAME2, ROW);
+    Assert.assertEquals(nextPort, location.getPort());
+
+    // No source specified - same.
+    nextPort = location.getPort() + 1;
+    conn.updateCachedLocation(location.getRegionInfo(), location,
+        "127.0.0.1", nextPort, location.getSeqNum() - 1);
+    location = conn.getCachedLocation(TABLE_NAME2, ROW);
+    Assert.assertEquals(nextPort, location.getPort());
+
+    // Higher seqNum - overwrites lower seqNum.
+    nextPort = location.getPort() + 1;
+    conn.updateCachedLocation(location.getRegionInfo(), anySource,
+        "127.0.0.1", nextPort, location.getSeqNum() + 1);
+    location = conn.getCachedLocation(TABLE_NAME2, ROW);
+    Assert.assertEquals(nextPort, location.getPort());
+
+    // Lower seqNum - does not overwrite higher seqNum.
+    nextPort = location.getPort() + 1;
+    conn.updateCachedLocation(location.getRegionInfo(), anySource,
+        "127.0.0.1", nextPort, location.getSeqNum() - 1);
+    location = conn.getCachedLocation(TABLE_NAME2, ROW);
+    Assert.assertEquals(nextPort - 1, location.getPort());
+  }
+
+  /**
+   * Make sure that {@link Configuration} instances that are essentially the
    * same map to the same {@link HConnection} instance.
    */
   @Test
@@ -332,7 +412,7 @@ public class TestHCM {
 
   /**
    * Makes sure that there is no leaking of
-   * {@link HConnectionManager.TableServers} in the {@link HConnectionManager}
+   * {@link HConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
    * class.
    */
   @Test
@@ -376,7 +456,7 @@ public class TestHCM {
     } finally {
       for (HConnection c: connections) {
         // Clean up connections made so we don't interfere w/ subsequent tests.
-        HConnectionManager.deleteConnection(c.getConfiguration(), true);
+        HConnectionManager.deleteConnection(c.getConfiguration());
       }
     }
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Thu Feb 14 12:58:12 2013
@@ -18,6 +18,11 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -27,9 +32,13 @@ import java.util.concurrent.ThreadPoolEx
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.ipc.HBaseClient;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -41,8 +50,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.*;
-
 @Category(MediumTests.class)
 public class TestMultiParallel {
   private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
@@ -65,6 +72,7 @@ public class TestMultiParallel {
     UTIL.startMiniCluster(slaves);
     HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
     UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
+    UTIL.waitTableAvailable(Bytes.toBytes(TEST_TABLE), 15 * 1000);
     t.close();
   }
 
@@ -72,11 +80,20 @@ public class TestMultiParallel {
     UTIL.shutdownMiniCluster();
   }
 
-  @Before public void before() throws IOException {
+  @Before public void before() throws Exception {
     LOG.info("before");
     if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
       // Distribute regions
       UTIL.getMiniHBaseCluster().getMaster().balance();
+      // Wait until completing balance
+      final RegionStates regionStates = UTIL.getMiniHBaseCluster().getMaster()
+          .getAssignmentManager().getRegionStates();
+      UTIL.waitFor(15 * 1000, new Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return !regionStates.isRegionsInTransition();
+        }
+      });
     }
     LOG.info("before done");
   }
@@ -226,6 +243,11 @@ public class TestMultiParallel {
     doTestFlushCommits(true);
   }
 
+  /**
+   * Set table auto flush to false and test flushing commits
+   * @param doAbort true if abort one regionserver in the testing
+   * @throws Exception
+   */
   private void doTestFlushCommits(boolean doAbort) throws Exception {
     // Load the data
     LOG.info("get new table");
@@ -240,9 +262,21 @@ public class TestMultiParallel {
     }
     LOG.info("puts");
     table.flushCommits();
+    int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
+        .size();
+    assert liveRScount > 0;
+    JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
+        .getLiveRegionServerThreads().get(0);
     if (doAbort) {
-      LOG.info("Aborted=" + UTIL.getMiniHBaseCluster().abortRegionServer(0));
-
+      liveRS.getRegionServer().abort("Aborting for tests",
+          new Exception("doTestFlushCommits"));
+      // If we wait for no regions being online after we abort the server, we
+      // could ensure the master has re-assigned the regions on killed server
+      // after writing successfully. It means the server we aborted is dead
+      // and detected by matser
+      while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
+        Thread.sleep(10);
+      }
       // try putting more keys after the abort. same key/qual... just validating
       // no exceptions thrown
       puts = constructPutRequests();
@@ -264,10 +298,11 @@ public class TestMultiParallel {
       LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
     }
     LOG.info("Count=" + count);
-    Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, (doAbort? 1 : 2), count);
+    Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
+        (doAbort ? (liveRScount - 1) : liveRScount), count);
     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
       int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
-      Assert.assertTrue("Count of regions=" + regions, regions > 10);
+      // Assert.assertTrue("Count of regions=" + regions, regions > 10);
     }
     table.close();
     LOG.info("done");
@@ -285,7 +320,13 @@ public class TestMultiParallel {
     validateSizeAndEmpty(results, KEYS.length);
 
     if (true) {
-      UTIL.getMiniHBaseCluster().abortRegionServer(0);
+      int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
+          .size();
+      assert liveRScount > 0;
+      JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
+          .getLiveRegionServerThreads().get(0);
+      liveRS.getRegionServer().abort("Aborting for tests",
+          new Exception("testBatchWithPut"));
 
       puts = constructPutRequests();
       results = table.batch(puts);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java Thu Feb 14 12:58:12 2013
@@ -52,7 +52,7 @@ public class TestScannerTimeout {
   // Be careful w/ what you set this timer to... it can get in the way of
   // the mini cluster coming up -- the verification in particular.
   private final static int THREAD_WAKE_FREQUENCY = 1000;
-  private final static int SCANNER_TIMEOUT = 10000;
+  private final static int SCANNER_TIMEOUT = 15000;
   private final static int SCANNER_CACHING = 5;
 
    /**

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java Thu Feb 14 12:58:12 2013
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java Thu Feb 14 12:58:12 2013
@@ -22,29 +22,67 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
 
 /**
  * The aggregation implementation at a region.
  */
-public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
-implements ColumnAggregationProtocol {
+public class ColumnAggregationEndpoint extends ColumnAggregationService
+implements Coprocessor, CoprocessorService {
+  static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
+  private RegionCoprocessorEnvironment env = null;
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+      return;
+    }
+    throw new CoprocessorException("Must be loaded on a table region!");
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // Nothing to do.
+  }
 
   @Override
-  public long sum(byte[] family, byte[] qualifier)
-  throws IOException {
+  public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
     // aggregate at each region
     Scan scan = new Scan();
-    scan.addColumn(family, qualifier);
+    // Family is required in pb. Qualifier is not.
+    byte [] family = request.getFamily().toByteArray();
+    byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
+    if (request.hasQualifier()) {
+      scan.addColumn(family, qualifier);
+    } else {
+      scan.addFamily(family);
+    }
     int sumResult = 0;
-
-    InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment())
-        .getRegion().getScanner(scan);
+    InternalScanner scanner = null;
     try {
+      scanner = this.env.getRegion().getScanner(scan);
       List<KeyValue> curVals = new ArrayList<KeyValue>();
       boolean hasMore = false;
       do {
@@ -56,9 +94,22 @@ implements ColumnAggregationProtocol {
           }
         }
       } while (hasMore);
+    } catch (IOException e) {
+      ResponseConverter.setControllerException(controller, e);
+      // Set result to -1 to indicate error.
+      sumResult = -1;
+      LOG.info("Setting sum result to -1 to indicate error", e);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException e) {
+          ResponseConverter.setControllerException(controller, e);
+          sumResult = -1;
+          LOG.info("Setting sum result to -1 to indicate error", e);
+        }
+      }
     }
-    return sumResult;
+    done.run(SumResponse.newBuilder().setSum(sumResult).build());
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Thu Feb 14 12:58:12 2013
@@ -106,8 +106,6 @@ public class SimpleRegionObserver extend
     Leases leases = re.getRegionServerServices().getLeases();
     leases.createLease("x", 2000, null);
     leases.cancelLease("x");
-    Integer lid = re.getRegion().getLock(null, Bytes.toBytes("some row"), true);
-    re.getRegion().releaseRowLock(lid);
   }
 
   @Override
@@ -188,7 +186,7 @@ public class SimpleRegionObserver extend
 
   @Override
   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
-      HStore store, InternalScanner scanner) {
+      HStore store, InternalScanner scanner, ScanType scanType) {
     hadPreCompact = true;
     return scanner;
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java Thu Feb 14 12:58:12 2013
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -131,7 +133,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci = 
+        new LongColumnInterpreter();
     long median = aClient.median(TEST_TABLE, ci,
         scan);
     assertEquals(8L, median);
@@ -153,7 +156,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[2]);
     scan.setStopRow(ROWS[14]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
     assertEquals(12, rowCount);
   }
@@ -168,7 +172,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
         scan);
     assertEquals(ROWSIZE, rowCount);
@@ -187,7 +192,8 @@ public class TestAggregateProtocol {
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[2]);
 
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = -1;
     try {
       rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -211,7 +217,8 @@ public class TestAggregateProtocol {
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[5]);
 
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = -1;
     try {
       rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -230,7 +237,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = -1;
     try {
       rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -245,7 +253,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
         scan);
     assertEquals(20, rowCount);
@@ -256,7 +265,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
@@ -277,7 +287,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long maximum = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(19, maximum);
   }
@@ -292,7 +303,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long max = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(14, max);
   }
@@ -302,7 +314,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long maximum = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(190, maximum);
   }
@@ -314,7 +327,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long max = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(60, max);
   }
@@ -322,7 +336,8 @@ public class TestAggregateProtocol {
   @Test
   public void testMaxWithValidRangeWithNullCF() {
     AggregationClient aClient = new AggregationClient(conf);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Scan scan = new Scan();
     Long max = null;
     try {
@@ -337,7 +352,8 @@ public class TestAggregateProtocol {
   @Test
   public void testMaxWithInvalidRange() {
     AggregationClient aClient = new AggregationClient(conf);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Scan scan = new Scan();
     scan.setStartRow(ROWS[4]);
     scan.setStopRow(ROWS[2]);
@@ -360,7 +376,8 @@ public class TestAggregateProtocol {
     scan.setStopRow(ROWS[4]);
     try {
       AggregationClient aClient = new AggregationClient(conf);
-      final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+      final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+          new LongColumnInterpreter();
       max = aClient.max(TEST_TABLE, ci, scan);
     } catch (Exception e) {
       max = 0;
@@ -376,7 +393,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     max = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(null, max);
   }
@@ -395,7 +413,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(HConstants.EMPTY_START_ROW);
     scan.setStopRow(HConstants.EMPTY_END_ROW);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = aClient.min(TEST_TABLE, ci,
         scan);
     assertEquals(0l, min.longValue());
@@ -411,7 +430,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long min = aClient.min(TEST_TABLE, ci, scan);
     assertEquals(5, min);
   }
@@ -423,7 +443,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(HConstants.EMPTY_START_ROW);
     scan.setStopRow(HConstants.EMPTY_END_ROW);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long min = aClient.min(TEST_TABLE, ci,
         scan);
     assertEquals(0, min);
@@ -436,7 +457,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long min = aClient.min(TEST_TABLE, ci, scan);
     assertEquals(6, min);
   }
@@ -447,7 +469,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = null;
     try {
       min = aClient.min(TEST_TABLE, ci, scan);
@@ -465,7 +488,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[4]);
     scan.setStopRow(ROWS[2]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     try {
       min = aClient.min(TEST_TABLE, ci, scan);
     } catch (Throwable e) {
@@ -480,7 +504,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[6]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = null;
     try {
       min = aClient.min(TEST_TABLE, ci, scan);
@@ -496,7 +521,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = null;
     min = aClient.min(TEST_TABLE, ci, scan);
     assertEquals(null, min);
@@ -513,7 +539,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci,
         scan);
     assertEquals(190, sum);
@@ -529,7 +556,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci, scan);
     assertEquals(95, sum);
   }
@@ -539,7 +567,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci,
         scan);
     assertEquals(190 + 1900, sum);
@@ -552,7 +581,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci, scan);
     assertEquals(6 + 60, sum);
   }
@@ -563,7 +593,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long sum = null;
     try {
       sum = aClient.sum(TEST_TABLE, ci, scan);
@@ -580,7 +611,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[2]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long sum = null;
     try {
       sum = aClient.sum(TEST_TABLE, ci, scan);
@@ -596,7 +628,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long sum = null;
     sum = aClient.sum(TEST_TABLE, ci, scan);
     assertEquals(null, sum);
@@ -613,7 +646,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci,
         scan);
     assertEquals(9.5, avg, 0);
@@ -629,7 +663,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci, scan);
     assertEquals(9.5, avg, 0);
   }
@@ -639,7 +674,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci,
         scan);
     assertEquals(104.5, avg, 0);
@@ -652,7 +688,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci, scan);
     assertEquals(6 + 60, avg, 0);
   }
@@ -661,7 +698,8 @@ public class TestAggregateProtocol {
   public void testAvgWithValidRangeWithNullCF() {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double avg = null;
     try {
       avg = aClient.avg(TEST_TABLE, ci, scan);
@@ -678,7 +716,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[1]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double avg = null;
     try {
       avg = aClient.avg(TEST_TABLE, ci, scan);
@@ -694,7 +733,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double avg = null;
     avg = aClient.avg(TEST_TABLE, ci, scan);
     assertEquals(Double.NaN, avg, 0);
@@ -711,7 +751,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci,
         scan);
     assertEquals(5.766, std, 0.05d);
@@ -727,7 +768,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci, scan);
     assertEquals(2.87, std, 0.05d);
   }
@@ -737,7 +779,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci,
         scan);
     assertEquals(63.42, std, 0.05d);
@@ -750,7 +793,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci, scan);
     assertEquals(0, std, 0);
   }
@@ -761,7 +805,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[17]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double std = null;
     try {
       std = aClient.std(TEST_TABLE, ci, scan);
@@ -778,7 +823,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[1]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double std = null;
     try {
       std = aClient.std(TEST_TABLE, ci, scan);
@@ -794,7 +840,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double std = null;
     std = aClient.std(TEST_TABLE, ci, scan);
     assertEquals(Double.NaN, std, 0);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java Thu Feb 14 12:58:12 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.TestServerCustomProtocol;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -64,7 +65,8 @@ public class TestClassLoading {
   static final String cpNameInvalid = "TestCPInvalid";
 
   private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
-  private static Class<?> regionCoprocessor2 = GenericEndpoint.class;
+  // TOOD: Fix the import of this handler.  It is coming in from a package that is far away.
+  private static Class<?> regionCoprocessor2 = TestServerCustomProtocol.PingHandler.class;
   private static Class<?> regionServerCoprocessor = SampleRegionWALObserver.class;
   private static Class<?> masterCoprocessor = BaseMasterObserver.class;
 
@@ -167,11 +169,8 @@ public class TestClassLoading {
     // the classpath is {hbaseSrc}/target/classes.
     String currentDir = new File(".").getAbsolutePath();
     String classpath =
-        currentDir + Path.SEPARATOR + "target"+ Path.SEPARATOR + "classes" +
-        System.getProperty("path.separator") +
-        // Note that the below trick only works if mvn is running the test;
-        // doesn't work in eclipse for example.
-        System.getProperty("surefire.test.class.path");
+        currentDir + File.separator + "target"+ File.separator + "classes" +
+        System.getProperty("path.separator") + System.getProperty("java.class.path");
     options.add(classpath);
     LOG.debug("Setting classpath to: "+classpath);
 
@@ -302,6 +301,10 @@ public class TestClassLoading {
     }
   }
 
+  private String getLocalPath(File file) {
+    return new Path(file.toURI()).toString();
+  }
+
   @Test
   // HBASE-3516: Test CP Class loading from local file system
   public void testClassLoadingFromLocalFS() throws Exception {
@@ -310,7 +313,7 @@ public class TestClassLoading {
     // create a table that references the jar
     HTableDescriptor htd = new HTableDescriptor(cpName3);
     htd.addFamily(new HColumnDescriptor("test"));
-    htd.setValue("COPROCESSOR$1", jarFile.toString() + "|" + cpName3 + "|" +
+    htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" +
       Coprocessor.PRIORITY_USER);
     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
     admin.createTable(htd);
@@ -336,7 +339,7 @@ public class TestClassLoading {
     // create a table that references the jar
     HTableDescriptor htd = new HTableDescriptor(cpName4);
     htd.addFamily(new HColumnDescriptor("test"));
-    htd.setValue("COPROCESSOR$1", jarFile.toString() + "|" + cpName4 + "|" +
+    htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName4 + "|" +
       Coprocessor.PRIORITY_USER);
     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
     admin.createTable(htd);
@@ -374,9 +377,9 @@ public class TestClassLoading {
     String cpKey2 = " Coprocessor$2 ";
     String cpKey3 = " coprocessor$03 ";
 
-    String cpValue1 = jarFile1.toString() + "|" + cpName1 + "|" +
+    String cpValue1 = getLocalPath(jarFile1) + "|" + cpName1 + "|" +
         Coprocessor.PRIORITY_USER;
-    String cpValue2 = jarFile2.toString() + " | " + cpName2 + " | ";
+    String cpValue2 = getLocalPath(jarFile2) + " | " + cpName2 + " | ";
     // load from default class loader
     String cpValue3 =
         " | org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver | | k=v ";
@@ -391,13 +394,13 @@ public class TestClassLoading {
     htd.setValue(cpKey3, cpValue3);
 
     // add 2 coprocessor by using new htd.addCoprocessor() api
-    htd.addCoprocessor(cpName5, new Path(jarFile5.getPath()),
+    htd.addCoprocessor(cpName5, new Path(getLocalPath(jarFile5)),
         Coprocessor.PRIORITY_USER, null);
     Map<String, String> kvs = new HashMap<String, String>();
     kvs.put("k1", "v1");
     kvs.put("k2", "v2");
     kvs.put("k3", "v3");
-    htd.addCoprocessor(cpName6, new Path(jarFile6.getPath()),
+    htd.addCoprocessor(cpName6, new Path(getLocalPath(jarFile6)),
         Coprocessor.PRIORITY_USER, kvs);
 
     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Thu Feb 14 12:58:12 2013
@@ -18,34 +18,45 @@
  */
 package org.apache.hadoop.hbase.coprocessor;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-
-import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * TestEndpoint: test cases to verify coprocessor Endpoint
@@ -58,9 +69,6 @@ public class TestCoprocessorEndpoint {
   private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
   private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
   private static byte[] ROW = Bytes.toBytes("testRow");
-  
-  private static final String protocolName =  "org.apache.hadoop.hbase.CustomProtocol";
-  private static final String methodName = "myFunc";
 
   private static final int ROWSIZE = 20;
   private static final int rowSeperator1 = 5;
@@ -75,7 +83,6 @@ public class TestCoprocessorEndpoint {
     Configuration conf = util.getConfiguration();
     conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
         org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
-        org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
         ProtobufCoprocessorService.class.getName());
     conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
         ProtobufCoprocessorService.class.getName());
@@ -101,51 +108,34 @@ public class TestCoprocessorEndpoint {
     util.shutdownMiniCluster();
   }
 
-  @Test
-  public void testGeneric() throws Throwable {
-    HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
-    GenericProtocol protocol = table.coprocessorProxy(GenericProtocol.class,
-                                                      Bytes.toBytes("testRow"));
-    String workResult1 = protocol.doWork("foo");
-    assertEquals("foo", workResult1);
-    byte[] workResult2 = protocol.doWork(new byte[]{1});
-    assertArrayEquals(new byte[]{1}, workResult2);
-    byte workResult3 = protocol.doWork((byte)1);
-    assertEquals((byte)1, workResult3);
-    char workResult4 = protocol.doWork('c');
-    assertEquals('c', workResult4);
-    boolean workResult5 = protocol.doWork(true);
-    assertEquals(true, workResult5);
-    short workResult6 = protocol.doWork((short)1);
-    assertEquals((short)1, workResult6);
-    int workResult7 = protocol.doWork(5);
-    assertEquals(5, workResult7);
-    long workResult8 = protocol.doWork(5l);
-    assertEquals(5l, workResult8);
-    double workResult9 = protocol.doWork(6d);
-    assertEquals(6d, workResult9, 0.01);
-    float workResult10 = protocol.doWork(6f);
-    assertEquals(6f, workResult10, 0.01);
-    Text workResult11 = protocol.doWork(new Text("foo"));
-    assertEquals(new Text("foo"), workResult11);
-    table.close();
+  private Map<byte [], Long> sum(final HTable table, final byte [] family,
+      final byte [] qualifier, final byte [] start, final byte [] end)
+  throws ServiceException, Throwable {
+    return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+        start, end,
+      new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+        @Override
+        public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+        throws IOException {
+          BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+              new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+          ColumnAggregationProtos.SumRequest.Builder builder =
+            ColumnAggregationProtos.SumRequest.newBuilder();
+          builder.setFamily(ByteString.copyFrom(family));
+          if (qualifier != null && qualifier.length > 0) {
+            builder.setQualifier(ByteString.copyFrom(qualifier));
+          }
+          instance.sum(null, builder.build(), rpcCallback);
+          return rpcCallback.get().getSum();
+        }
+      });
   }
 
   @Test
   public void testAggregation() throws Throwable {
     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
-    Map<byte[], Long> results;
-
-    // scan: for all regions
-    results = table
-        .coprocessorExec(ColumnAggregationProtocol.class,
-                         ROWS[0], ROWS[ROWS.length-1],
-                         new Batch.Call<ColumnAggregationProtocol, Long>() {
-                           public Long call(ColumnAggregationProtocol instance)
-                               throws IOException {
-                             return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
-                           }
-                         });
+    Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+      ROWS[0], ROWS[ROWS.length-1]);
     int sumResult = 0;
     int expectedResult = 0;
     for (Map.Entry<byte[], Long> e : results.entrySet()) {
@@ -160,15 +150,8 @@ public class TestCoprocessorEndpoint {
     results.clear();
 
     // scan: for region 2 and region 3
-    results = table
-        .coprocessorExec(ColumnAggregationProtocol.class,
-                         ROWS[rowSeperator1], ROWS[ROWS.length-1],
-                         new Batch.Call<ColumnAggregationProtocol, Long>() {
-                           public Long call(ColumnAggregationProtocol instance)
-                               throws IOException {
-                             return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
-                           }
-                         });
+    results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+      ROWS[rowSeperator1], ROWS[ROWS.length-1]);
     sumResult = 0;
     expectedResult = 0;
     for (Map.Entry<byte[], Long> e : results.entrySet()) {
@@ -257,6 +240,44 @@ public class TestCoprocessorEndpoint {
   }
 
   @Test
+  public void testCoprocessorServiceNullResponse() throws Throwable {
+    HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
+    NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
+
+    final TestProtos.EchoRequestProto request =
+        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+    try {
+      // scan: for all regions
+      final RpcController controller = new ServerRpcController();
+      // test that null results are supported
+      Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+          ROWS[0], ROWS[ROWS.length - 1],
+          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
+            public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+                throws IOException {
+              BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
+              instance.echo(controller, request, callback);
+              TestProtos.EchoResponseProto response = callback.get();
+              LOG.debug("Batch.Call got result " + response);
+              return null;
+            }
+          }
+      );
+      for (Map.Entry<byte[], String> e : results.entrySet()) {
+        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      }
+      assertEquals(3, results.size());
+      for (HRegionInfo info : regions.navigableKeySet()) {
+        LOG.info("Region info is "+info.getRegionNameAsString());
+        assertTrue(results.containsKey(info.getRegionName()));
+        assertNull(results.get(info.getRegionName()));
+      }
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
   public void testMasterCoprocessorService() throws Throwable {
     HBaseAdmin admin = util.getHBaseAdmin();
     final TestProtos.EchoRequestProto request =
@@ -267,6 +288,41 @@ public class TestCoprocessorEndpoint {
     admin.close();
   }
 
+  @Test
+  public void testCoprocessorError() throws Exception {
+    Configuration configuration = new Configuration(util.getConfiguration());
+    // Make it not retry forever
+    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    HTable table = new HTable(configuration, TEST_TABLE);
+
+    try {
+      CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
+
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
+
+      service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+      fail("Should have thrown an exception");
+    } catch (ServiceException e) {
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testMasterCoprocessorError() throws Throwable {
+    HBaseAdmin admin = util.getHBaseAdmin();
+    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+    try {
+      service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+      fail("Should have thrown an exception");
+    } catch (ServiceException e) {
+    } finally {
+      admin.close();
+    }
+  }
+
   private static byte[][] makeN(byte[] base, int n) {
     byte[][] ret = new byte[n][];
     for (int i = 0; i < n; i++) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Thu Feb 14 12:58:12 2013
@@ -27,8 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.collections.map.AbstractReferenceMap;
-import org.apache.commons.collections.map.ReferenceMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -46,16 +44,15 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
-import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.junit.experimental.categories.Category;
@@ -190,7 +187,7 @@ public class TestCoprocessorInterface ex
     }
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
-        HStore store, InternalScanner scanner) {
+        HStore store, InternalScanner scanner, ScanType scanType) {
       preCompactCalled = true;
       return scanner;
     }
@@ -323,7 +320,7 @@ public class TestCoprocessorInterface ex
     for (int i = 0; i < regions.length; i++) {
       try {
         Get g = new Get(regions[i].getStartKey());
-        regions[i].get(g, null);
+        regions[i].get(g);
         fail();
       } catch (DoNotRetryIOException xc) {
       }
@@ -462,8 +459,6 @@ public class TestCoprocessorInterface ex
     TEST_UTIL.getConfiguration().setInt(
         "hbase.master.lease.thread.wakefrequency", 5 * 1000);
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD,
-      10 * 1000);
     // Increase the amount of time between client retries
     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
     // This size should make it so we always split using the addContent

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java Thu Feb 14 12:58:12 2013
@@ -19,11 +19,22 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
-import java.io.InterruptedIOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -35,8 +46,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.*;
-
 /**
  * Tests unhandled exceptions thrown by coprocessors running on master.
  * Expected result is that the master will abort with an informative

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java Thu Feb 14 12:58:12 2013
@@ -18,31 +18,35 @@
  */
 package org.apache.hadoop.hbase.coprocessor;
 
+import static junit.framework.Assert.assertEquals;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertEquals;
-
 @Category(MediumTests.class)
 public class TestRegionObserverBypass {
   private static HBaseTestingUtility util;
@@ -60,7 +64,6 @@ public class TestRegionObserverBypass {
         TestCoprocessor.class.getName());
     util = new HBaseTestingUtility(conf);
     util.startMiniCluster();
-    util.createTable(tableName, new byte[][] {dummy, test});
   }
 
   @AfterClass
@@ -68,6 +71,18 @@ public class TestRegionObserverBypass {
     util.shutdownMiniCluster();
   }
 
+  @Before
+  public void setUp() throws Exception {
+    HBaseAdmin admin = util.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+    util.createTable(tableName, new byte[][] {dummy, test});
+  }
+
   /**
    * do a single put that is bypassed by a RegionObserver
    * @throws Exception
@@ -89,6 +104,10 @@ public class TestRegionObserverBypass {
    */
   @Test
   public void testMulti() throws Exception {
+    //ensure that server time increments every time we do an operation, otherwise
+    //previous deletes will eclipse successive puts having the same timestamp
+    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
+
     HTable t = new HTable(util.getConfiguration(), tableName);
     List<Put> puts = new ArrayList<Put>();
     Put p = new Put(row1);
@@ -170,6 +189,8 @@ public class TestRegionObserverBypass {
     checkRowAndDelete(t,row2,1);
     checkRowAndDelete(t,row3,0);
     t.close();
+
+    EnvironmentEdgeManager.reset();
   }
 
   private void checkRowAndDelete(HTable t, byte[] row, int count) throws IOException {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Thu Feb 14 12:58:12 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -315,7 +316,7 @@ public class TestRegionObserverInterface
 
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
-        HStore store, final InternalScanner scanner) {
+        HStore store, final InternalScanner scanner, final ScanType scanType) {
       return new InternalScanner() {
         @Override
         public boolean next(List<KeyValue> results) throws IOException {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java Thu Feb 14 12:58:12 2013
@@ -125,9 +125,7 @@ public class TestRegionObserverStacking 
 
     Put put = new Put(ROW);
     put.add(A, A, A);
-    int lockid = region.obtainRowLock(ROW);
-    region.put(put, lockid);
-    region.releaseRowLock(lockid);
+    region.put(put);
 
     Coprocessor c = h.findCoprocessor(ObserverA.class.getName());
     long idA = ((ObserverA)c).id;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java Thu Feb 14 12:58:12 2013
@@ -179,13 +179,13 @@ public class TestFilter {
       Delete d = new Delete(ROW);
       d.deleteColumns(FAMILIES[0], QUALIFIERS_ONE[1]);
       d.deleteColumns(FAMILIES[1], QUALIFIERS_ONE[1]);
-      this.region.delete(d, null, false);
+      this.region.delete(d, false);
     }
     for(byte [] ROW : ROWS_TWO) {
       Delete d = new Delete(ROW);
       d.deleteColumns(FAMILIES[0], QUALIFIERS_TWO[1]);
       d.deleteColumns(FAMILIES[1], QUALIFIERS_TWO[1]);
-      this.region.delete(d, null, false);
+      this.region.delete(d, false);
     }
     colsPerRow -= 2;
 
@@ -194,13 +194,13 @@ public class TestFilter {
       Delete d = new Delete(ROWS_ONE[1]);
       d.deleteColumns(FAMILIES[0], QUALIFIER);
       d.deleteColumns(FAMILIES[1], QUALIFIER);
-      this.region.delete(d, null, false);
+      this.region.delete(d, false);
     }
     for(byte [] QUALIFIER : QUALIFIERS_TWO) {
       Delete d = new Delete(ROWS_TWO[1]);
       d.deleteColumns(FAMILIES[0], QUALIFIER);
       d.deleteColumns(FAMILIES[1], QUALIFIER);
-      this.region.delete(d, null, false);
+      this.region.delete(d, false);
     }
     numRows -= 2;
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java Thu Feb 14 12:58:12 2013
@@ -257,7 +257,7 @@ public class TestFilterSerialization {
     // non-null family/column SingleColumnValueFilter
     singleColumnValueExcludeFilter =
       new SingleColumnValueExcludeFilter(Bytes.toBytes("fam"), Bytes.toBytes("qual"),
-      CompareFilter.CompareOp.LESS_OR_EQUAL, new NullComparator(), false, true, false, false);
+      CompareFilter.CompareOp.LESS_OR_EQUAL, new NullComparator(), false, false);
     assertTrue(singleColumnValueExcludeFilter.areSerializedFieldsEqual(
       ProtobufUtil.toFilter(ProtobufUtil.toFilter(singleColumnValueExcludeFilter))));
   }
@@ -274,7 +274,7 @@ public class TestFilterSerialization {
     // non-null family/column SingleColumnValueFilter
     singleColumnValueFilter =
       new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
-      CompareFilter.CompareOp.NOT_EQUAL, new NullComparator(), true, false, true, true);
+      CompareFilter.CompareOp.NOT_EQUAL, new NullComparator(), true, true);
     assertTrue(singleColumnValueFilter.areSerializedFieldsEqual(
       ProtobufUtil.toFilter(ProtobufUtil.toFilter(singleColumnValueFilter))));
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java Thu Feb 14 12:58:12 2013
@@ -27,6 +27,9 @@ import org.junit.experimental.categories
 
 import static org.junit.Assert.*;
 
+import java.util.List;
+import java.util.ArrayList;
+
 /**
  * Tests for {@link SingleColumnValueExcludeFilter}. Because this filter
  * extends {@link SingleColumnValueFilter}, only the added functionality is
@@ -52,16 +55,18 @@ public class TestSingleColumnValueExclud
         CompareOp.EQUAL, VAL_1);
 
     // A 'match' situation
-    KeyValue kv;
-    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
-    // INCLUDE expected because test column has not yet passed
-    assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
-    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
-    // Test column will pass (will match), will SKIP because test columns are excluded
-    assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP);
-    // Test column has already passed and matched, all subsequent columns are INCLUDE
-    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
-    assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+
+    kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+    kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1));
+    kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+
+    filter.filterRow(kvs);
+
+    assertEquals("resultSize", kvs.size(), 2);
+    assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0);
+    assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0);
     assertFalse("allRemainingWhenMatch", filter.filterAllRemaining());
 
     // A 'mismatch' situation

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Thu Feb 14 12:58:12 2013
@@ -129,13 +129,12 @@ public class TestEncodedSeekers {
   private void doPuts(HRegion region) throws IOException{
     LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE);
      for (int i = 0; i < NUM_ROWS; ++i) {
-      byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+      byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
         Put put = new Put(key);
-        String colAsStr = String.valueOf(j);
-        byte[] col = Bytes.toBytes(colAsStr);
-        byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
-        put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+        byte[] col = Bytes.toBytes(String.valueOf(j));
+        byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+        put.add(CF_BYTES, col, value);
         if(VERBOSE){
           KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);
           System.err.println(Strings.padFront(i+"", ' ', 4)+" "+kvPut);
@@ -151,7 +150,7 @@ public class TestEncodedSeekers {
   
   private void doGets(HRegion region) throws IOException{
     for (int i = 0; i < NUM_ROWS; ++i) {
-      final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+      final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
         final String qualStr = String.valueOf(j);
         if (VERBOSE) {
@@ -161,10 +160,10 @@ public class TestEncodedSeekers {
         final byte[] qualBytes = Bytes.toBytes(qualStr);
         Get get = new Get(rowKey);
         get.addColumn(CF_BYTES, qualBytes);
-        Result result = region.get(get, null);
+        Result result = region.get(get);
         assertEquals(1, result.size());
-        assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
-            result.getValue(CF_BYTES, qualBytes)));
+        byte[] value = result.getValue(CF_BYTES, qualBytes);
+        assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
       }
     }
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Thu Feb 14 12:58:12 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.util.ChecksumType;
 
 public class CacheTestUtils {
@@ -149,7 +150,11 @@ public class CacheTestUtils {
       try {
         if (toBeTested.getBlock(block.blockName, true, false) != null) {
           toBeTested.cacheBlock(block.blockName, block.block);
-          fail("Cache should not allow re-caching a block");
+          if (!(toBeTested instanceof BucketCache)) {
+            // BucketCache won't throw exception when caching already cached
+            // block
+            fail("Cache should not allow re-caching a block");
+          }
         }
       } catch (RuntimeException re) {
         // expected
@@ -242,6 +247,30 @@ public class CacheTestUtils {
 
   private static class ByteArrayCacheable implements Cacheable {
 
+    static final CacheableDeserializer<Cacheable> blockDeserializer = 
+      new CacheableDeserializer<Cacheable>() {
+      
+      @Override
+      public Cacheable deserialize(ByteBuffer b) throws IOException {
+        int len = b.getInt();
+        Thread.yield();
+        byte buf[] = new byte[len];
+        b.get(buf);
+        return new ByteArrayCacheable(buf);
+      }
+
+      @Override
+      public int getDeserialiserIdentifier() {
+        return deserializerIdentifier;
+      }
+
+      @Override
+      public Cacheable deserialize(ByteBuffer b, boolean reuse)
+          throws IOException {
+        return deserialize(b);
+      }
+    };
+
     final byte[] buf;
 
     public ByteArrayCacheable(byte[] buf) {
@@ -268,20 +297,22 @@ public class CacheTestUtils {
 
     @Override
     public CacheableDeserializer<Cacheable> getDeserializer() {
-      return new CacheableDeserializer<Cacheable>() {
+      return blockDeserializer;
+    }
 
-        @Override
-        public Cacheable deserialize(ByteBuffer b) throws IOException {
-          int len = b.getInt();
-          Thread.yield();
-          byte buf[] = new byte[len];
-          b.get(buf);
-          return new ByteArrayCacheable(buf);
-        }
-      };
+    private static final int deserializerIdentifier;
+    static {
+      deserializerIdentifier = CacheableDeserializerIdManager
+          .registerDeserializer(blockDeserializer);
+    }
+
+    @Override
+    public BlockType getBlockType() {
+      return BlockType.DATA;
     }
   }
 
+
   private static HFileBlockPair[] generateHFileBlocks(int blockSize,
       int numBlocks) {
     HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];