You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC
svn commit: r1446147 [28/35] - in /hbase/branches/hbase-7290v2: ./ bin/
conf/ dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/...
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Thu Feb 14 12:58:12 2013
@@ -27,7 +27,10 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -37,6 +40,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -55,6 +61,7 @@ public class TestHCM {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] TABLE_NAME = Bytes.toBytes("test");
private static final byte[] TABLE_NAME1 = Bytes.toBytes("test1");
+ private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2");
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
@@ -67,25 +74,6 @@ public class TestHCM {
TEST_UTIL.shutdownMiniCluster();
}
- /**
- * @throws InterruptedException
- * @throws IllegalAccessException
- * @throws NoSuchFieldException
- * @throws ZooKeeperConnectionException
- * @throws IllegalArgumentException
- * @throws SecurityException
- * @see https://issues.apache.org/jira/browse/HBASE-2925
- */
- // Disabling. Of course this test will OOME using new Configuration each time
- // St.Ack 20110428
- // @Test
- public void testManyNewConnectionsDoesnotOOME()
- throws SecurityException, IllegalArgumentException,
- ZooKeeperConnectionException, NoSuchFieldException, IllegalAccessException,
- InterruptedException {
- createNewConfigurations();
- }
-
private static Random _randy = new Random();
public static void createNewConfigurations() throws SecurityException,
@@ -118,6 +106,28 @@ public class TestHCM {
private static int getHConnectionManagerCacheSize(){
return HConnectionTestingUtility.getConnectionCount();
}
+
+ @Test
+ public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
+ // Save off current HConnections
+ Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
+ new HashMap<HConnectionKey, HConnectionImplementation>();
+ oldHBaseInstances.putAll(HConnectionManager.HBASE_INSTANCES);
+
+ HConnectionManager.HBASE_INSTANCES.clear();
+
+ try {
+ HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+ connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
+ "test abortingHConnectionRemovesItselfFromHCM"));
+ Assert.assertNotSame(connection,
+ HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
+ } finally {
+ // Put original HConnections back
+ HConnectionManager.HBASE_INSTANCES.clear();
+ HConnectionManager.HBASE_INSTANCES.putAll(oldHBaseInstances);
+ }
+ }
/**
* Test that when we delete a location using the first row of a region
@@ -140,10 +150,12 @@ public class TestHCM {
Bytes.toString(TABLE_NAME).getBytes() , Bytes.toString(ROW).getBytes()));
final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
- conn.updateCachedLocation(conn.getCachedLocation(TABLE_NAME, ROW), "127.0.0.1", nextPort);
+ HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
+ conn.updateCachedLocation(loc.getRegionInfo(), loc, "127.0.0.1", nextPort,
+ HConstants.LATEST_TIMESTAMP);
Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
- conn.deleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
+ conn.forceDeleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
assertNull("What is this location?? " + rl, rl);
@@ -156,18 +168,21 @@ public class TestHCM {
table.put(put2);
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
+ TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
+ HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
// We can wait for all regions to be online, that makes log reading easier when debugging
- while (TEST_UTIL.getMiniHBaseCluster().getMaster().
- getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ Thread.sleep(1);
}
// Now moving the region to the second server
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW);
byte[] regionName = toMove.getRegionInfo().getRegionName();
+ byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
// Choose the other server.
- int curServerId = TEST_UTIL.getHBaseCluster().getServerWith( regionName );
+ int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
int destServerId = (curServerId == 0 ? 1 : 0);
HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
@@ -181,6 +196,8 @@ public class TestHCM {
Assert.assertFalse( toMove.getPort() == destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
+ Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getAssignmentManager().getRegionStates().isRegionsInTransition());
// Moving. It's possible that we don't have all the regions online at this point, so
// the test must depends only on the region we're looking at.
@@ -190,18 +207,26 @@ public class TestHCM {
destServerName.getServerName().getBytes()
);
- while ( destServer.getOnlineRegion(regionName) == null ){
+ while (destServer.getOnlineRegion(regionName) == null ||
+ destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+ curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+ master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
+ Thread.sleep(1);
}
+ LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
// Check our new state.
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName));
- LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
+ Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+ Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+
// Cache was NOT updated and points to the wrong server
- Assert.assertFalse( conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
+ Assert.assertFalse(
+ conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
// Hijack the number of retry to fail immediately instead of retrying: there will be no new
// connection to the master
@@ -233,6 +258,8 @@ public class TestHCM {
"Previous server was "+curServer.getServerName().getHostAndPort(),
destServerName.getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
+ Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
+ Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
// We move it back to do another test with a scan
LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
@@ -241,8 +268,12 @@ public class TestHCM {
curServer.getServerName().getServerName().getBytes()
);
- while ( curServer.getOnlineRegion(regionName) == null ){
+ while (curServer.getOnlineRegion(regionName) == null ||
+ destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+ curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
+ master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
+ Thread.sleep(1);
}
// Check our new state.
@@ -260,11 +291,12 @@ public class TestHCM {
sc.setStopRow(ROW);
try {
- ResultScanner rs = table.getScanner(sc);
- while (rs.next() != null){}
+ ResultScanner rs = table.getScanner(sc);
+ while (rs.next() != null) {
+ }
Assert.assertFalse("Unreachable point", true);
- }catch (Throwable e){
- LOG.info("Put done, expected exception caught: "+e.getClass());
+ } catch (Throwable e) {
+ LOG.info("Scan done, expected exception caught: " + e.getClass());
}
// Cache is updated with the right value.
@@ -302,7 +334,55 @@ public class TestHCM {
}
/**
- * Make sure that {@link HConfiguration} instances that are essentially the
+ * Test that stale cache updates don't override newer cached values.
+ */
+ @Test(timeout = 60000)
+ public void testCacheSeqNums() throws Exception{
+ HTable table = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAM);
+ TEST_UTIL.createMultiRegions(table, FAM_NAM);
+ Put put = new Put(ROW);
+ put.add(FAM_NAM, ROW, ROW);
+ table.put(put);
+ HConnectionManager.HConnectionImplementation conn =
+ (HConnectionManager.HConnectionImplementation)table.getConnection();
+
+ HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
+ assertNotNull(location);
+
+ HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(),
+ location.getHostname(), location.getPort() - 1);
+
+ // Same server as already in cache reporting - overwrites any value despite seqNum.
+ int nextPort = location.getPort() + 1;
+ conn.updateCachedLocation(location.getRegionInfo(), location,
+ "127.0.0.1", nextPort, location.getSeqNum() - 1);
+ location = conn.getCachedLocation(TABLE_NAME2, ROW);
+ Assert.assertEquals(nextPort, location.getPort());
+
+ // No source specified - same.
+ nextPort = location.getPort() + 1;
+ conn.updateCachedLocation(location.getRegionInfo(), location,
+ "127.0.0.1", nextPort, location.getSeqNum() - 1);
+ location = conn.getCachedLocation(TABLE_NAME2, ROW);
+ Assert.assertEquals(nextPort, location.getPort());
+
+ // Higher seqNum - overwrites lower seqNum.
+ nextPort = location.getPort() + 1;
+ conn.updateCachedLocation(location.getRegionInfo(), anySource,
+ "127.0.0.1", nextPort, location.getSeqNum() + 1);
+ location = conn.getCachedLocation(TABLE_NAME2, ROW);
+ Assert.assertEquals(nextPort, location.getPort());
+
+ // Lower seqNum - does not overwrite higher seqNum.
+ nextPort = location.getPort() + 1;
+ conn.updateCachedLocation(location.getRegionInfo(), anySource,
+ "127.0.0.1", nextPort, location.getSeqNum() - 1);
+ location = conn.getCachedLocation(TABLE_NAME2, ROW);
+ Assert.assertEquals(nextPort - 1, location.getPort());
+ }
+
+ /**
+ * Make sure that {@link Configuration} instances that are essentially the
* same map to the same {@link HConnection} instance.
*/
@Test
@@ -332,7 +412,7 @@ public class TestHCM {
/**
* Makes sure that there is no leaking of
- * {@link HConnectionManager.TableServers} in the {@link HConnectionManager}
+ * {@link HConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
* class.
*/
@Test
@@ -376,7 +456,7 @@ public class TestHCM {
} finally {
for (HConnection c: connections) {
// Clean up connections made so we don't interfere w/ subsequent tests.
- HConnectionManager.deleteConnection(c.getConfiguration(), true);
+ HConnectionManager.deleteConnection(c.getConfiguration());
}
}
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Thu Feb 14 12:58:12 2013
@@ -18,6 +18,11 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -27,9 +32,13 @@ import java.util.concurrent.ThreadPoolEx
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.ipc.HBaseClient;
import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -41,8 +50,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.junit.Assert.*;
-
@Category(MediumTests.class)
public class TestMultiParallel {
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
@@ -65,6 +72,7 @@ public class TestMultiParallel {
UTIL.startMiniCluster(slaves);
HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
+ UTIL.waitTableAvailable(Bytes.toBytes(TEST_TABLE), 15 * 1000);
t.close();
}
@@ -72,11 +80,20 @@ public class TestMultiParallel {
UTIL.shutdownMiniCluster();
}
- @Before public void before() throws IOException {
+ @Before public void before() throws Exception {
LOG.info("before");
if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
// Distribute regions
UTIL.getMiniHBaseCluster().getMaster().balance();
+ // Wait until completing balance
+ final RegionStates regionStates = UTIL.getMiniHBaseCluster().getMaster()
+ .getAssignmentManager().getRegionStates();
+ UTIL.waitFor(15 * 1000, new Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return !regionStates.isRegionsInTransition();
+ }
+ });
}
LOG.info("before done");
}
@@ -226,6 +243,11 @@ public class TestMultiParallel {
doTestFlushCommits(true);
}
+ /**
+ * Set table auto flush to false and test flushing commits
+ * @param doAbort true if abort one regionserver in the testing
+ * @throws Exception
+ */
private void doTestFlushCommits(boolean doAbort) throws Exception {
// Load the data
LOG.info("get new table");
@@ -240,9 +262,21 @@ public class TestMultiParallel {
}
LOG.info("puts");
table.flushCommits();
+ int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
+ .size();
+ assert liveRScount > 0;
+ JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
+ .getLiveRegionServerThreads().get(0);
if (doAbort) {
- LOG.info("Aborted=" + UTIL.getMiniHBaseCluster().abortRegionServer(0));
-
+ liveRS.getRegionServer().abort("Aborting for tests",
+ new Exception("doTestFlushCommits"));
+ // If we wait for no regions being online after we abort the server, we
+ // could ensure the master has re-assigned the regions on killed server
+ // after writing successfully. It means the server we aborted is dead
+ // and detected by matser
+ while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
+ Thread.sleep(10);
+ }
// try putting more keys after the abort. same key/qual... just validating
// no exceptions thrown
puts = constructPutRequests();
@@ -264,10 +298,11 @@ public class TestMultiParallel {
LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
}
LOG.info("Count=" + count);
- Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, (doAbort? 1 : 2), count);
+ Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
+ (doAbort ? (liveRScount - 1) : liveRScount), count);
for (JVMClusterUtil.RegionServerThread t: liveRSs) {
int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
- Assert.assertTrue("Count of regions=" + regions, regions > 10);
+ // Assert.assertTrue("Count of regions=" + regions, regions > 10);
}
table.close();
LOG.info("done");
@@ -285,7 +320,13 @@ public class TestMultiParallel {
validateSizeAndEmpty(results, KEYS.length);
if (true) {
- UTIL.getMiniHBaseCluster().abortRegionServer(0);
+ int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
+ .size();
+ assert liveRScount > 0;
+ JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
+ .getLiveRegionServerThreads().get(0);
+ liveRS.getRegionServer().abort("Aborting for tests",
+ new Exception("testBatchWithPut"));
puts = constructPutRequests();
results = table.batch(puts);
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java Thu Feb 14 12:58:12 2013
@@ -52,7 +52,7 @@ public class TestScannerTimeout {
// Be careful w/ what you set this timer to... it can get in the way of
// the mini cluster coming up -- the verification in particular.
private final static int THREAD_WAKE_FREQUENCY = 1000;
- private final static int SCANNER_TIMEOUT = 10000;
+ private final static int SCANNER_TIMEOUT = 15000;
private final static int SCANNER_CACHING = 5;
/**
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotMetadata.java Thu Feb 14 12:58:12 2013
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java Thu Feb 14 12:58:12 2013
@@ -22,29 +22,67 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
/**
* The aggregation implementation at a region.
*/
-public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
-implements ColumnAggregationProtocol {
+public class ColumnAggregationEndpoint extends ColumnAggregationService
+implements Coprocessor, CoprocessorService {
+ static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
+ private RegionCoprocessorEnvironment env = null;
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ return;
+ }
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // Nothing to do.
+ }
@Override
- public long sum(byte[] family, byte[] qualifier)
- throws IOException {
+ public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
// aggregate at each region
Scan scan = new Scan();
- scan.addColumn(family, qualifier);
+ // Family is required in pb. Qualifier is not.
+ byte [] family = request.getFamily().toByteArray();
+ byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
+ if (request.hasQualifier()) {
+ scan.addColumn(family, qualifier);
+ } else {
+ scan.addFamily(family);
+ }
int sumResult = 0;
-
- InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment())
- .getRegion().getScanner(scan);
+ InternalScanner scanner = null;
try {
+ scanner = this.env.getRegion().getScanner(scan);
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean hasMore = false;
do {
@@ -56,9 +94,22 @@ implements ColumnAggregationProtocol {
}
}
} while (hasMore);
+ } catch (IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ // Set result to -1 to indicate error.
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
} finally {
- scanner.close();
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ sumResult = -1;
+ LOG.info("Setting sum result to -1 to indicate error", e);
+ }
+ }
}
- return sumResult;
+ done.run(SumResponse.newBuilder().setSum(sumResult).build());
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Thu Feb 14 12:58:12 2013
@@ -106,8 +106,6 @@ public class SimpleRegionObserver extend
Leases leases = re.getRegionServerServices().getLeases();
leases.createLease("x", 2000, null);
leases.cancelLease("x");
- Integer lid = re.getRegion().getLock(null, Bytes.toBytes("some row"), true);
- re.getRegion().releaseRowLock(lid);
}
@Override
@@ -188,7 +186,7 @@ public class SimpleRegionObserver extend
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
- HStore store, InternalScanner scanner) {
+ HStore store, InternalScanner scanner, ScanType scanType) {
hadPreCompact = true;
return scanner;
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java Thu Feb 14 12:58:12 2013
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.co
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -131,7 +133,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long median = aClient.median(TEST_TABLE, ci,
scan);
assertEquals(8L, median);
@@ -153,7 +156,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[2]);
scan.setStopRow(ROWS[14]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
assertEquals(12, rowCount);
}
@@ -168,7 +172,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci,
scan);
assertEquals(ROWSIZE, rowCount);
@@ -187,7 +192,8 @@ public class TestAggregateProtocol {
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[2]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -211,7 +217,8 @@ public class TestAggregateProtocol {
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[5]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -230,7 +237,8 @@ public class TestAggregateProtocol {
Scan scan = new Scan();
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -245,7 +253,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci,
scan);
assertEquals(20, rowCount);
@@ -256,7 +265,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
long rowCount = aClient.rowCount(TEST_TABLE, ci,
@@ -277,7 +287,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long maximum = aClient.max(TEST_TABLE, ci, scan);
assertEquals(19, maximum);
}
@@ -292,7 +303,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long max = aClient.max(TEST_TABLE, ci, scan);
assertEquals(14, max);
}
@@ -302,7 +314,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long maximum = aClient.max(TEST_TABLE, ci, scan);
assertEquals(190, maximum);
}
@@ -314,7 +327,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long max = aClient.max(TEST_TABLE, ci, scan);
assertEquals(60, max);
}
@@ -322,7 +336,8 @@ public class TestAggregateProtocol {
@Test
public void testMaxWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Scan scan = new Scan();
Long max = null;
try {
@@ -337,7 +352,8 @@ public class TestAggregateProtocol {
@Test
public void testMaxWithInvalidRange() {
AggregationClient aClient = new AggregationClient(conf);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Scan scan = new Scan();
scan.setStartRow(ROWS[4]);
scan.setStopRow(ROWS[2]);
@@ -360,7 +376,8 @@ public class TestAggregateProtocol {
scan.setStopRow(ROWS[4]);
try {
AggregationClient aClient = new AggregationClient(conf);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
max = aClient.max(TEST_TABLE, ci, scan);
} catch (Exception e) {
max = 0;
@@ -376,7 +393,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
max = aClient.max(TEST_TABLE, ci, scan);
assertEquals(null, max);
}
@@ -395,7 +413,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(HConstants.EMPTY_START_ROW);
scan.setStopRow(HConstants.EMPTY_END_ROW);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long min = aClient.min(TEST_TABLE, ci,
scan);
assertEquals(0l, min.longValue());
@@ -411,7 +430,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long min = aClient.min(TEST_TABLE, ci, scan);
assertEquals(5, min);
}
@@ -423,7 +443,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(HConstants.EMPTY_START_ROW);
scan.setStopRow(HConstants.EMPTY_END_ROW);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long min = aClient.min(TEST_TABLE, ci,
scan);
assertEquals(0, min);
@@ -436,7 +457,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long min = aClient.min(TEST_TABLE, ci, scan);
assertEquals(6, min);
}
@@ -447,7 +469,8 @@ public class TestAggregateProtocol {
Scan scan = new Scan();
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long min = null;
try {
min = aClient.min(TEST_TABLE, ci, scan);
@@ -465,7 +488,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[4]);
scan.setStopRow(ROWS[2]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
try {
min = aClient.min(TEST_TABLE, ci, scan);
} catch (Throwable e) {
@@ -480,7 +504,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[6]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long min = null;
try {
min = aClient.min(TEST_TABLE, ci, scan);
@@ -496,7 +521,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long min = null;
min = aClient.min(TEST_TABLE, ci, scan);
assertEquals(null, min);
@@ -513,7 +539,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci,
scan);
assertEquals(190, sum);
@@ -529,7 +556,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci, scan);
assertEquals(95, sum);
}
@@ -539,7 +567,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci,
scan);
assertEquals(190 + 1900, sum);
@@ -552,7 +581,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci, scan);
assertEquals(6 + 60, sum);
}
@@ -563,7 +593,8 @@ public class TestAggregateProtocol {
Scan scan = new Scan();
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long sum = null;
try {
sum = aClient.sum(TEST_TABLE, ci, scan);
@@ -580,7 +611,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[2]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long sum = null;
try {
sum = aClient.sum(TEST_TABLE, ci, scan);
@@ -596,7 +628,8 @@ public class TestAggregateProtocol {
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setFilter(f);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Long sum = null;
sum = aClient.sum(TEST_TABLE, ci, scan);
assertEquals(null, sum);
@@ -613,7 +646,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci,
scan);
assertEquals(9.5, avg, 0);
@@ -629,7 +663,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci, scan);
assertEquals(9.5, avg, 0);
}
@@ -639,7 +674,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci,
scan);
assertEquals(104.5, avg, 0);
@@ -652,7 +688,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci, scan);
assertEquals(6 + 60, avg, 0);
}
@@ -661,7 +698,8 @@ public class TestAggregateProtocol {
public void testAvgWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Double avg = null;
try {
avg = aClient.avg(TEST_TABLE, ci, scan);
@@ -678,7 +716,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[1]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Double avg = null;
try {
avg = aClient.avg(TEST_TABLE, ci, scan);
@@ -694,7 +733,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Double avg = null;
avg = aClient.avg(TEST_TABLE, ci, scan);
assertEquals(Double.NaN, avg, 0);
@@ -711,7 +751,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci,
scan);
assertEquals(5.766, std, 0.05d);
@@ -727,7 +768,8 @@ public class TestAggregateProtocol {
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci, scan);
assertEquals(2.87, std, 0.05d);
}
@@ -737,7 +779,8 @@ public class TestAggregateProtocol {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci,
scan);
assertEquals(63.42, std, 0.05d);
@@ -750,7 +793,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci, scan);
assertEquals(0, std, 0);
}
@@ -761,7 +805,8 @@ public class TestAggregateProtocol {
Scan scan = new Scan();
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[17]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Double std = null;
try {
std = aClient.std(TEST_TABLE, ci, scan);
@@ -778,7 +823,8 @@ public class TestAggregateProtocol {
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[1]);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Double std = null;
try {
std = aClient.std(TEST_TABLE, ci, scan);
@@ -794,7 +840,8 @@ public class TestAggregateProtocol {
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setFilter(f);
- final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+ final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+ new LongColumnInterpreter();
Double std = null;
std = aClient.std(TEST_TABLE, ci, scan);
assertEquals(Double.NaN, std, 0);
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java Thu Feb 14 12:58:12 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.TestServerCustomProtocol;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -64,7 +65,8 @@ public class TestClassLoading {
static final String cpNameInvalid = "TestCPInvalid";
private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
- private static Class<?> regionCoprocessor2 = GenericEndpoint.class;
+ // TOOD: Fix the import of this handler. It is coming in from a package that is far away.
+ private static Class<?> regionCoprocessor2 = TestServerCustomProtocol.PingHandler.class;
private static Class<?> regionServerCoprocessor = SampleRegionWALObserver.class;
private static Class<?> masterCoprocessor = BaseMasterObserver.class;
@@ -167,11 +169,8 @@ public class TestClassLoading {
// the classpath is {hbaseSrc}/target/classes.
String currentDir = new File(".").getAbsolutePath();
String classpath =
- currentDir + Path.SEPARATOR + "target"+ Path.SEPARATOR + "classes" +
- System.getProperty("path.separator") +
- // Note that the below trick only works if mvn is running the test;
- // doesn't work in eclipse for example.
- System.getProperty("surefire.test.class.path");
+ currentDir + File.separator + "target"+ File.separator + "classes" +
+ System.getProperty("path.separator") + System.getProperty("java.class.path");
options.add(classpath);
LOG.debug("Setting classpath to: "+classpath);
@@ -302,6 +301,10 @@ public class TestClassLoading {
}
}
+ private String getLocalPath(File file) {
+ return new Path(file.toURI()).toString();
+ }
+
@Test
// HBASE-3516: Test CP Class loading from local file system
public void testClassLoadingFromLocalFS() throws Exception {
@@ -310,7 +313,7 @@ public class TestClassLoading {
// create a table that references the jar
HTableDescriptor htd = new HTableDescriptor(cpName3);
htd.addFamily(new HColumnDescriptor("test"));
- htd.setValue("COPROCESSOR$1", jarFile.toString() + "|" + cpName3 + "|" +
+ htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" +
Coprocessor.PRIORITY_USER);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.createTable(htd);
@@ -336,7 +339,7 @@ public class TestClassLoading {
// create a table that references the jar
HTableDescriptor htd = new HTableDescriptor(cpName4);
htd.addFamily(new HColumnDescriptor("test"));
- htd.setValue("COPROCESSOR$1", jarFile.toString() + "|" + cpName4 + "|" +
+ htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName4 + "|" +
Coprocessor.PRIORITY_USER);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.createTable(htd);
@@ -374,9 +377,9 @@ public class TestClassLoading {
String cpKey2 = " Coprocessor$2 ";
String cpKey3 = " coprocessor$03 ";
- String cpValue1 = jarFile1.toString() + "|" + cpName1 + "|" +
+ String cpValue1 = getLocalPath(jarFile1) + "|" + cpName1 + "|" +
Coprocessor.PRIORITY_USER;
- String cpValue2 = jarFile2.toString() + " | " + cpName2 + " | ";
+ String cpValue2 = getLocalPath(jarFile2) + " | " + cpName2 + " | ";
// load from default class loader
String cpValue3 =
" | org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver | | k=v ";
@@ -391,13 +394,13 @@ public class TestClassLoading {
htd.setValue(cpKey3, cpValue3);
// add 2 coprocessor by using new htd.addCoprocessor() api
- htd.addCoprocessor(cpName5, new Path(jarFile5.getPath()),
+ htd.addCoprocessor(cpName5, new Path(getLocalPath(jarFile5)),
Coprocessor.PRIORITY_USER, null);
Map<String, String> kvs = new HashMap<String, String>();
kvs.put("k1", "v1");
kvs.put("k2", "v2");
kvs.put("k3", "v3");
- htd.addCoprocessor(cpName6, new Path(jarFile6.getPath()),
+ htd.addCoprocessor(cpName6, new Path(getLocalPath(jarFile6)),
Coprocessor.PRIORITY_USER, kvs);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Thu Feb 14 12:58:12 2013
@@ -18,34 +18,45 @@
*/
package org.apache.hadoop.hbase.coprocessor;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
-
-import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
@@ -58,9 +69,6 @@ public class TestCoprocessorEndpoint {
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static byte[] ROW = Bytes.toBytes("testRow");
-
- private static final String protocolName = "org.apache.hadoop.hbase.CustomProtocol";
- private static final String methodName = "myFunc";
private static final int ROWSIZE = 20;
private static final int rowSeperator1 = 5;
@@ -75,7 +83,6 @@ public class TestCoprocessorEndpoint {
Configuration conf = util.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
- org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
ProtobufCoprocessorService.class.getName());
conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
@@ -101,51 +108,34 @@ public class TestCoprocessorEndpoint {
util.shutdownMiniCluster();
}
- @Test
- public void testGeneric() throws Throwable {
- HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
- GenericProtocol protocol = table.coprocessorProxy(GenericProtocol.class,
- Bytes.toBytes("testRow"));
- String workResult1 = protocol.doWork("foo");
- assertEquals("foo", workResult1);
- byte[] workResult2 = protocol.doWork(new byte[]{1});
- assertArrayEquals(new byte[]{1}, workResult2);
- byte workResult3 = protocol.doWork((byte)1);
- assertEquals((byte)1, workResult3);
- char workResult4 = protocol.doWork('c');
- assertEquals('c', workResult4);
- boolean workResult5 = protocol.doWork(true);
- assertEquals(true, workResult5);
- short workResult6 = protocol.doWork((short)1);
- assertEquals((short)1, workResult6);
- int workResult7 = protocol.doWork(5);
- assertEquals(5, workResult7);
- long workResult8 = protocol.doWork(5l);
- assertEquals(5l, workResult8);
- double workResult9 = protocol.doWork(6d);
- assertEquals(6d, workResult9, 0.01);
- float workResult10 = protocol.doWork(6f);
- assertEquals(6f, workResult10, 0.01);
- Text workResult11 = protocol.doWork(new Text("foo"));
- assertEquals(new Text("foo"), workResult11);
- table.close();
+ private Map<byte [], Long> sum(final HTable table, final byte [] family,
+ final byte [] qualifier, final byte [] start, final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+ start, end,
+ new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+ @Override
+ public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+ throws IOException {
+ BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+ new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+ ColumnAggregationProtos.SumRequest.Builder builder =
+ ColumnAggregationProtos.SumRequest.newBuilder();
+ builder.setFamily(ByteString.copyFrom(family));
+ if (qualifier != null && qualifier.length > 0) {
+ builder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+ instance.sum(null, builder.build(), rpcCallback);
+ return rpcCallback.get().getSum();
+ }
+ });
}
@Test
public void testAggregation() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
- Map<byte[], Long> results;
-
- // scan: for all regions
- results = table
- .coprocessorExec(ColumnAggregationProtocol.class,
- ROWS[0], ROWS[ROWS.length-1],
- new Batch.Call<ColumnAggregationProtocol, Long>() {
- public Long call(ColumnAggregationProtocol instance)
- throws IOException {
- return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
- }
- });
+ Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+ ROWS[0], ROWS[ROWS.length-1]);
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
@@ -160,15 +150,8 @@ public class TestCoprocessorEndpoint {
results.clear();
// scan: for region 2 and region 3
- results = table
- .coprocessorExec(ColumnAggregationProtocol.class,
- ROWS[rowSeperator1], ROWS[ROWS.length-1],
- new Batch.Call<ColumnAggregationProtocol, Long>() {
- public Long call(ColumnAggregationProtocol instance)
- throws IOException {
- return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
- }
- });
+ results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+ ROWS[rowSeperator1], ROWS[ROWS.length-1]);
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
@@ -257,6 +240,44 @@ public class TestCoprocessorEndpoint {
}
@Test
+ public void testCoprocessorServiceNullResponse() throws Throwable {
+ HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
+ NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
+
+ final TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ try {
+ // scan: for all regions
+ final RpcController controller = new ServerRpcController();
+ // test that null results are supported
+ Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+ ROWS[0], ROWS[ROWS.length - 1],
+ new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
+ public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+ throws IOException {
+ BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
+ instance.echo(controller, request, callback);
+ TestProtos.EchoResponseProto response = callback.get();
+ LOG.debug("Batch.Call got result " + response);
+ return null;
+ }
+ }
+ );
+ for (Map.Entry<byte[], String> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ }
+ assertEquals(3, results.size());
+ for (HRegionInfo info : regions.navigableKeySet()) {
+ LOG.info("Region info is "+info.getRegionNameAsString());
+ assertTrue(results.containsKey(info.getRegionName()));
+ assertNull(results.get(info.getRegionName()));
+ }
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
public void testMasterCoprocessorService() throws Throwable {
HBaseAdmin admin = util.getHBaseAdmin();
final TestProtos.EchoRequestProto request =
@@ -267,6 +288,41 @@ public class TestCoprocessorEndpoint {
admin.close();
}
+ @Test
+ public void testCoprocessorError() throws Exception {
+ Configuration configuration = new Configuration(util.getConfiguration());
+ // Make it not retry forever
+ configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ HTable table = new HTable(configuration, TEST_TABLE);
+
+ try {
+ CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
+
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
+
+ service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+ fail("Should have thrown an exception");
+ } catch (ServiceException e) {
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
+ public void testMasterCoprocessorError() throws Throwable {
+ HBaseAdmin admin = util.getHBaseAdmin();
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+ try {
+ service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+ fail("Should have thrown an exception");
+ } catch (ServiceException e) {
+ } finally {
+ admin.close();
+ }
+ }
+
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Thu Feb 14 12:58:12 2013
@@ -27,8 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.commons.collections.map.AbstractReferenceMap;
-import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -46,16 +44,15 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
-import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.junit.experimental.categories.Category;
@@ -190,7 +187,7 @@ public class TestCoprocessorInterface ex
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
- HStore store, InternalScanner scanner) {
+ HStore store, InternalScanner scanner, ScanType scanType) {
preCompactCalled = true;
return scanner;
}
@@ -323,7 +320,7 @@ public class TestCoprocessorInterface ex
for (int i = 0; i < regions.length; i++) {
try {
Get g = new Get(regions[i].getStartKey());
- regions[i].get(g, null);
+ regions[i].get(g);
fail();
} catch (DoNotRetryIOException xc) {
}
@@ -462,8 +459,6 @@ public class TestCoprocessorInterface ex
TEST_UTIL.getConfiguration().setInt(
"hbase.master.lease.thread.wakefrequency", 5 * 1000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
- TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD,
- 10 * 1000);
// Increase the amount of time between client retries
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
// This size should make it so we always split using the addContent
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java Thu Feb 14 12:58:12 2013
@@ -19,11 +19,22 @@
package org.apache.hadoop.hbase.coprocessor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
-import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -35,8 +46,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.junit.Assert.*;
-
/**
* Tests unhandled exceptions thrown by coprocessors running on master.
* Expected result is that the master will abort with an informative
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverBypass.java Thu Feb 14 12:58:12 2013
@@ -18,31 +18,35 @@
*/
package org.apache.hadoop.hbase.coprocessor;
+import static junit.framework.Assert.assertEquals;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.junit.Assert.assertEquals;
-
@Category(MediumTests.class)
public class TestRegionObserverBypass {
private static HBaseTestingUtility util;
@@ -60,7 +64,6 @@ public class TestRegionObserverBypass {
TestCoprocessor.class.getName());
util = new HBaseTestingUtility(conf);
util.startMiniCluster();
- util.createTable(tableName, new byte[][] {dummy, test});
}
@AfterClass
@@ -68,6 +71,18 @@ public class TestRegionObserverBypass {
util.shutdownMiniCluster();
}
+ @Before
+ public void setUp() throws Exception {
+ HBaseAdmin admin = util.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ util.createTable(tableName, new byte[][] {dummy, test});
+ }
+
/**
* do a single put that is bypassed by a RegionObserver
* @throws Exception
@@ -89,6 +104,10 @@ public class TestRegionObserverBypass {
*/
@Test
public void testMulti() throws Exception {
+ //ensure that server time increments every time we do an operation, otherwise
+ //previous deletes will eclipse successive puts having the same timestamp
+ EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
+
HTable t = new HTable(util.getConfiguration(), tableName);
List<Put> puts = new ArrayList<Put>();
Put p = new Put(row1);
@@ -170,6 +189,8 @@ public class TestRegionObserverBypass {
checkRowAndDelete(t,row2,1);
checkRowAndDelete(t,row3,0);
t.close();
+
+ EnvironmentEdgeManager.reset();
}
private void checkRowAndDelete(HTable t, byte[] row, int count) throws IOException {
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Thu Feb 14 12:58:12 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -315,7 +316,7 @@ public class TestRegionObserverInterface
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
- HStore store, final InternalScanner scanner) {
+ HStore store, final InternalScanner scanner, final ScanType scanType) {
return new InternalScanner() {
@Override
public boolean next(List<KeyValue> results) throws IOException {
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java Thu Feb 14 12:58:12 2013
@@ -125,9 +125,7 @@ public class TestRegionObserverStacking
Put put = new Put(ROW);
put.add(A, A, A);
- int lockid = region.obtainRowLock(ROW);
- region.put(put, lockid);
- region.releaseRowLock(lockid);
+ region.put(put);
Coprocessor c = h.findCoprocessor(ObserverA.class.getName());
long idA = ((ObserverA)c).id;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java Thu Feb 14 12:58:12 2013
@@ -179,13 +179,13 @@ public class TestFilter {
Delete d = new Delete(ROW);
d.deleteColumns(FAMILIES[0], QUALIFIERS_ONE[1]);
d.deleteColumns(FAMILIES[1], QUALIFIERS_ONE[1]);
- this.region.delete(d, null, false);
+ this.region.delete(d, false);
}
for(byte [] ROW : ROWS_TWO) {
Delete d = new Delete(ROW);
d.deleteColumns(FAMILIES[0], QUALIFIERS_TWO[1]);
d.deleteColumns(FAMILIES[1], QUALIFIERS_TWO[1]);
- this.region.delete(d, null, false);
+ this.region.delete(d, false);
}
colsPerRow -= 2;
@@ -194,13 +194,13 @@ public class TestFilter {
Delete d = new Delete(ROWS_ONE[1]);
d.deleteColumns(FAMILIES[0], QUALIFIER);
d.deleteColumns(FAMILIES[1], QUALIFIER);
- this.region.delete(d, null, false);
+ this.region.delete(d, false);
}
for(byte [] QUALIFIER : QUALIFIERS_TWO) {
Delete d = new Delete(ROWS_TWO[1]);
d.deleteColumns(FAMILIES[0], QUALIFIER);
d.deleteColumns(FAMILIES[1], QUALIFIER);
- this.region.delete(d, null, false);
+ this.region.delete(d, false);
}
numRows -= 2;
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterSerialization.java Thu Feb 14 12:58:12 2013
@@ -257,7 +257,7 @@ public class TestFilterSerialization {
// non-null family/column SingleColumnValueFilter
singleColumnValueExcludeFilter =
new SingleColumnValueExcludeFilter(Bytes.toBytes("fam"), Bytes.toBytes("qual"),
- CompareFilter.CompareOp.LESS_OR_EQUAL, new NullComparator(), false, true, false, false);
+ CompareFilter.CompareOp.LESS_OR_EQUAL, new NullComparator(), false, false);
assertTrue(singleColumnValueExcludeFilter.areSerializedFieldsEqual(
ProtobufUtil.toFilter(ProtobufUtil.toFilter(singleColumnValueExcludeFilter))));
}
@@ -274,7 +274,7 @@ public class TestFilterSerialization {
// non-null family/column SingleColumnValueFilter
singleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
- CompareFilter.CompareOp.NOT_EQUAL, new NullComparator(), true, false, true, true);
+ CompareFilter.CompareOp.NOT_EQUAL, new NullComparator(), true, true);
assertTrue(singleColumnValueFilter.areSerializedFieldsEqual(
ProtobufUtil.toFilter(ProtobufUtil.toFilter(singleColumnValueFilter))));
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java Thu Feb 14 12:58:12 2013
@@ -27,6 +27,9 @@ import org.junit.experimental.categories
import static org.junit.Assert.*;
+import java.util.List;
+import java.util.ArrayList;
+
/**
* Tests for {@link SingleColumnValueExcludeFilter}. Because this filter
* extends {@link SingleColumnValueFilter}, only the added functionality is
@@ -52,16 +55,18 @@ public class TestSingleColumnValueExclud
CompareOp.EQUAL, VAL_1);
// A 'match' situation
- KeyValue kv;
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
- // INCLUDE expected because test column has not yet passed
- assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
- // Test column will pass (will match), will SKIP because test columns are excluded
- assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP);
- // Test column has already passed and matched, all subsequent columns are INCLUDE
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
- assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1));
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+
+ filter.filterRow(kvs);
+
+ assertEquals("resultSize", kvs.size(), 2);
+ assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0);
+ assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0);
assertFalse("allRemainingWhenMatch", filter.filterAllRemaining());
// A 'mismatch' situation
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Thu Feb 14 12:58:12 2013
@@ -129,13 +129,12 @@ public class TestEncodedSeekers {
private void doPuts(HRegion region) throws IOException{
LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE);
for (int i = 0; i < NUM_ROWS; ++i) {
- byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+ byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
Put put = new Put(key);
- String colAsStr = String.valueOf(j);
- byte[] col = Bytes.toBytes(colAsStr);
- byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
- put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+ byte[] col = Bytes.toBytes(String.valueOf(j));
+ byte[] value = dataGenerator.generateRandomSizeValue(key, col);
+ put.add(CF_BYTES, col, value);
if(VERBOSE){
KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);
System.err.println(Strings.padFront(i+"", ' ', 4)+" "+kvPut);
@@ -151,7 +150,7 @@ public class TestEncodedSeekers {
private void doGets(HRegion region) throws IOException{
for (int i = 0; i < NUM_ROWS; ++i) {
- final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+ final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
final String qualStr = String.valueOf(j);
if (VERBOSE) {
@@ -161,10 +160,10 @@ public class TestEncodedSeekers {
final byte[] qualBytes = Bytes.toBytes(qualStr);
Get get = new Get(rowKey);
get.addColumn(CF_BYTES, qualBytes);
- Result result = region.get(get, null);
+ Result result = region.get(get);
assertEquals(1, result.size());
- assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
- result.getValue(CF_BYTES, qualBytes)));
+ byte[] value = result.getValue(CF_BYTES, qualBytes);
+ assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
}
}
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Thu Feb 14 12:58:12 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.ChecksumType;
public class CacheTestUtils {
@@ -149,7 +150,11 @@ public class CacheTestUtils {
try {
if (toBeTested.getBlock(block.blockName, true, false) != null) {
toBeTested.cacheBlock(block.blockName, block.block);
- fail("Cache should not allow re-caching a block");
+ if (!(toBeTested instanceof BucketCache)) {
+ // BucketCache won't throw exception when caching already cached
+ // block
+ fail("Cache should not allow re-caching a block");
+ }
}
} catch (RuntimeException re) {
// expected
@@ -242,6 +247,30 @@ public class CacheTestUtils {
private static class ByteArrayCacheable implements Cacheable {
+ static final CacheableDeserializer<Cacheable> blockDeserializer =
+ new CacheableDeserializer<Cacheable>() {
+
+ @Override
+ public Cacheable deserialize(ByteBuffer b) throws IOException {
+ int len = b.getInt();
+ Thread.yield();
+ byte buf[] = new byte[len];
+ b.get(buf);
+ return new ByteArrayCacheable(buf);
+ }
+
+ @Override
+ public int getDeserialiserIdentifier() {
+ return deserializerIdentifier;
+ }
+
+ @Override
+ public Cacheable deserialize(ByteBuffer b, boolean reuse)
+ throws IOException {
+ return deserialize(b);
+ }
+ };
+
final byte[] buf;
public ByteArrayCacheable(byte[] buf) {
@@ -268,20 +297,22 @@ public class CacheTestUtils {
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
- return new CacheableDeserializer<Cacheable>() {
+ return blockDeserializer;
+ }
- @Override
- public Cacheable deserialize(ByteBuffer b) throws IOException {
- int len = b.getInt();
- Thread.yield();
- byte buf[] = new byte[len];
- b.get(buf);
- return new ByteArrayCacheable(buf);
- }
- };
+ private static final int deserializerIdentifier;
+ static {
+ deserializerIdentifier = CacheableDeserializerIdManager
+ .registerDeserializer(blockDeserializer);
+ }
+
+ @Override
+ public BlockType getBlockType() {
+ return BlockType.DATA;
}
}
+
private static HFileBlockPair[] generateHFileBlocks(int blockSize,
int numBlocks) {
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];