You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:24 UTC
[22/50] [abbrv] hbase git commit: HBASE-17174 Refactor the
AsyncProcess, BufferedMutatorImpl, and HTable
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
new file mode 100644
index 0000000..b46e572
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSimpleRequestController.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ClientTests.class, SmallTests.class})
+public class TestSimpleRequestController {
+
+ private static final TableName DUMMY_TABLE
+ = TableName.valueOf("DUMMY_TABLE");
+ private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
+ private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
+ private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
+ private static final ServerName SN = ServerName.valueOf("s1:1,1");
+ private static final ServerName SN2 = ServerName.valueOf("s2:2,2");
+ private static final ServerName SN3 = ServerName.valueOf("s3:3,3");
+ private static final HRegionInfo HRI1
+ = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
+ private static final HRegionInfo HRI2
+ = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
+ private static final HRegionInfo HRI3
+ = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
+ private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
+ private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
+ private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
+
+ @Test
+ public void testIllegalRequestSize() {
+ testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
+ }
+
+ @Test
+ public void testIllegalRsTasks() {
+ testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
+ }
+
+ @Test
+ public void testIllegalRegionTasks() {
+ testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
+ }
+
+ @Test
+ public void testIllegalSubmittedSize() {
+ testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
+ }
+
+ private void testIllegalArgument(String key, long value) {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
+ try {
+ SimpleRequestController controller = new SimpleRequestController(conf);
+ fail("The " + key + " must be bigger than zero");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ private static Put createPut(long maxHeapSizePerRequest) {
+ return new Put(Bytes.toBytes("row")) {
+ @Override
+ public long heapSize() {
+ return maxHeapSizePerRequest;
+ }
+ };
+ }
+
+ @Test
+ public void testTaskCheckerHost() throws IOException {
+ final int maxTotalConcurrentTasks = 100;
+ final int maxConcurrentTasksPerServer = 2;
+ final int maxConcurrentTasksPerRegion = 1;
+ final AtomicLong tasksInProgress = new AtomicLong(0);
+ final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
+ final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
+ SimpleRequestController.TaskCountChecker countChecker = new SimpleRequestController.TaskCountChecker(
+ maxTotalConcurrentTasks,
+ maxConcurrentTasksPerServer,
+ maxConcurrentTasksPerRegion,
+ tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
+ final long maxHeapSizePerRequest = 2 * 1024 * 1024;
+ // unlimiited
+ SimpleRequestController.RequestSizeChecker sizeChecker = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
+ RequestController.Checker checker = SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
+ ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
+ assertEquals(ReturnCode.INCLUDE, loc1Code);
+
+ ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
+ // rejected for size
+ assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
+
+ ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
+ // rejected for size
+ assertNotEquals(ReturnCode.INCLUDE, loc2Code);
+
+ // fill the task slots for LOC3.
+ taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
+ taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
+
+ ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
+ // rejected for count
+ assertNotEquals(ReturnCode.INCLUDE, loc3Code);
+
+ // release the task slots for LOC3.
+ taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
+ taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
+
+ ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
+ assertEquals(ReturnCode.INCLUDE, loc3Code_2);
+ }
+
+ @Test
+ public void testRequestSizeCheckerr() throws IOException {
+ final long maxHeapSizePerRequest = 2 * 1024 * 1024;
+ SimpleRequestController.RequestSizeChecker checker
+ = new SimpleRequestController.RequestSizeChecker(maxHeapSizePerRequest);
+
+ // inner state is unchanged.
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+ assertEquals(ReturnCode.INCLUDE, code);
+ code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
+ assertEquals(ReturnCode.INCLUDE, code);
+ }
+
+ // accept the data located on LOC1 region.
+ ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+ assertEquals(ReturnCode.INCLUDE, acceptCode);
+ checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
+
+ // the sn server reachs the limit.
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+ assertNotEquals(ReturnCode.INCLUDE, code);
+ code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
+ assertNotEquals(ReturnCode.INCLUDE, code);
+ }
+
+ // the request to sn2 server should be accepted.
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
+ assertEquals(ReturnCode.INCLUDE, code);
+ }
+
+ checker.reset();
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
+ assertEquals(ReturnCode.INCLUDE, code);
+ code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
+ assertEquals(ReturnCode.INCLUDE, code);
+ }
+ }
+
+ @Test
+ public void testSubmittedSizeChecker() {
+ final long maxHeapSizeSubmit = 2 * 1024 * 1024;
+ SimpleRequestController.SubmittedSizeChecker checker
+ = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
+
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode include = checker.canTakeOperation(LOC1, 100000);
+ assertEquals(ReturnCode.INCLUDE, include);
+ }
+
+ for (int i = 0; i != 10; ++i) {
+ checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
+ }
+
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode include = checker.canTakeOperation(LOC1, 100000);
+ assertEquals(ReturnCode.END, include);
+ }
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode include = checker.canTakeOperation(LOC2, 100000);
+ assertEquals(ReturnCode.END, include);
+ }
+ checker.reset();
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode include = checker.canTakeOperation(LOC1, 100000);
+ assertEquals(ReturnCode.INCLUDE, include);
+ }
+ }
+
+ @Test
+ public void testTaskCountChecker() throws InterruptedIOException {
+ long rowSize = 12345;
+ int maxTotalConcurrentTasks = 100;
+ int maxConcurrentTasksPerServer = 2;
+ int maxConcurrentTasksPerRegion = 1;
+ AtomicLong tasksInProgress = new AtomicLong(0);
+ Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
+ Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
+ SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
+ maxTotalConcurrentTasks,
+ maxConcurrentTasksPerServer,
+ maxConcurrentTasksPerRegion,
+ tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
+
+ // inner state is unchanged.
+ for (int i = 0; i != 10; ++i) {
+ ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
+ assertEquals(ReturnCode.INCLUDE, code);
+ }
+ // add LOC1 region.
+ ReturnCode code = checker.canTakeOperation(LOC1, rowSize);
+ assertEquals(ReturnCode.INCLUDE, code);
+ checker.notifyFinal(code, LOC1, rowSize);
+
+ // fill the task slots for LOC1.
+ taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100));
+ taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
+
+ // the region was previously accepted, so it must be accpted now.
+ for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+ ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
+ assertEquals(ReturnCode.INCLUDE, includeCode);
+ checker.notifyFinal(includeCode, LOC1, rowSize);
+ }
+
+ // fill the task slots for LOC3.
+ taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
+ taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
+
+ // no task slots.
+ for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+ ReturnCode excludeCode = checker.canTakeOperation(LOC3, rowSize);
+ assertNotEquals(ReturnCode.INCLUDE, excludeCode);
+ checker.notifyFinal(excludeCode, LOC3, rowSize);
+ }
+
+ // release the tasks for LOC3.
+ taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
+ taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
+
+ // add LOC3 region.
+ ReturnCode code3 = checker.canTakeOperation(LOC3, rowSize);
+ assertEquals(ReturnCode.INCLUDE, code3);
+ checker.notifyFinal(code3, LOC3, rowSize);
+
+ // the region was previously accepted, so it must be accpted now.
+ for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+ ReturnCode includeCode = checker.canTakeOperation(LOC3, rowSize);
+ assertEquals(ReturnCode.INCLUDE, includeCode);
+ checker.notifyFinal(includeCode, LOC3, rowSize);
+ }
+
+ checker.reset();
+ // the region was previously accepted,
+ // but checker have reseted and task slots for LOC1 is full.
+ // So it must be rejected now.
+ for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+ ReturnCode includeCode = checker.canTakeOperation(LOC1, rowSize);
+ assertNotEquals(ReturnCode.INCLUDE, includeCode);
+ checker.notifyFinal(includeCode, LOC1, rowSize);
+ }
+ }
+
+ @Test
+ public void testWaitForMaximumCurrentTasks() throws Exception {
+ final AtomicInteger max = new AtomicInteger(0);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
+ final AtomicLong tasks = controller.tasksInProgress;
+ Runnable runnable = () -> {
+ try {
+ barrier.await();
+ controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
+ } catch (InterruptedIOException e) {
+ Assert.fail(e.getMessage());
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ };
+ // First test that our runnable thread only exits when tasks is zero.
+ Thread t = new Thread(runnable);
+ t.start();
+ barrier.await();
+ t.join();
+ // Now assert we stay running if max == zero and tasks is > 0.
+ barrier.reset();
+ tasks.set(1000000);
+ t = new Thread(runnable);
+ t.start();
+ barrier.await();
+ while (tasks.get() > 0) {
+ assertTrue(t.isAlive());
+ tasks.set(tasks.get() - 1);
+ }
+ t.join();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index ee89609..e5ab3e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -126,11 +126,8 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
- new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
- RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
- HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
+ new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
+ RpcControllerFactory.instantiate(conf)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 53488ec..2c5e89d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.AsyncProcessTask;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
@@ -137,14 +138,20 @@ public class TestClientPushback {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
-
- ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback<Result>() {
- @Override
- public void update(byte[] region, byte[] row, Result result) {
+ BufferedMutatorImpl mutator = ((HTable) table).mutator;
+ Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown();
- }
- }, true);
+ };
+ AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
+ .setPool(mutator.getPool())
+ .setTableName(tableName)
+ .setRowAccess(ops)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+ .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
+ .setRpcTimeout(60 * 1000)
+ .build();
+ mutator.getAsyncProcess().submit(task);
// Currently the ExponentialClientBackoffPolicy under these test conditions
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 6d1e1f0..0f7f3d9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -563,9 +563,17 @@ public class TestReplicasClient {
gets.add(g);
Object[] results = new Object[2];
- AsyncRequestFuture reqs = ap.submitAll(
- HTable.getDefaultExecutor(HTU.getConfiguration()),
- table.getName(), gets, null, results);
+ int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
+ int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
+ .setTableName(table.getName())
+ .setRowAccess(gets)
+ .setResults(results)
+ .setOperationTimeout(operationTimeout)
+ .setRpcTimeout(readTimeout)
+ .build();
+ AsyncRequestFuture reqs = ap.submit(task);
reqs.waitUntilDone();
// verify we got the right results back
for (Object r : results) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index be41e54..295f47a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -501,7 +501,6 @@ public class TestPerColumnFamilyFlush {
Thread.sleep(100);
}
}
- table.close();
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
index 68fffb1..380c252 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
@@ -171,22 +171,35 @@ public class TestTablePermissions {
}
}
+ /**
+ * The AccessControlLists.addUserPermission may throw exception before closing the table.
+ */
+ private void addUserPermission(Configuration conf, UserPermission userPerm, Table t) throws IOException {
+ try {
+ AccessControlLists.addUserPermission(conf, userPerm, t);
+ } finally {
+ t.close();
+ }
+ }
+
@Test
public void testBasicWrite() throws Exception {
Configuration conf = UTIL.getConfiguration();
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
// add some permissions
- AccessControlLists.addUserPermission(conf,
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null,
- UserPermission.Action.READ, UserPermission.Action.WRITE), table);
- AccessControlLists.addUserPermission(conf,
+ UserPermission.Action.READ, UserPermission.Action.WRITE),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null,
- UserPermission.Action.READ), table);
- AccessControlLists.addUserPermission(conf,
+ UserPermission.Action.READ),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("humphrey"),
TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
- UserPermission.Action.READ), table);
+ UserPermission.Action.READ),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
}
// retrieve the same
ListMultimap<String,TablePermission> perms =
@@ -274,23 +287,22 @@ public class TestTablePermissions {
@Test
public void testPersistence() throws Exception {
Configuration conf = UTIL.getConfiguration();
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
- AccessControlLists.addUserPermission(conf,
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null,
- (byte[])null, TablePermission.Action.READ), table);
- AccessControlLists.addUserPermission(conf,
+ (byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null,
(byte[])null, TablePermission.Action.READ,
- TablePermission.Action.WRITE), table);
- AccessControlLists.addUserPermission(conf,
+ TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("clark"),
TEST_TABLE, TEST_FAMILY,
- TablePermission.Action.READ), table);
- AccessControlLists.addUserPermission(conf,
+ TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("dwight"),
TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
- TablePermission.Action.WRITE), table);
+ TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
}
// verify permissions survive changes in table metadata
ListMultimap<String,TablePermission> preperms =
@@ -404,17 +416,17 @@ public class TestTablePermissions {
Configuration conf = UTIL.getConfiguration();
// add some permissions
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
- AccessControlLists.addUserPermission(conf,
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("user1"),
- Permission.Action.READ, Permission.Action.WRITE), table);
- AccessControlLists.addUserPermission(conf,
+ Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("user2"),
- Permission.Action.CREATE), table);
- AccessControlLists.addUserPermission(conf,
+ Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("user3"),
- Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), table);
+ Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
}
ListMultimap<String,TablePermission> perms = AccessControlLists.getTablePermissions(conf, null);
List<TablePermission> user1Perms = perms.get("user1");
@@ -448,11 +460,11 @@ public class TestTablePermissions {
// currently running user is the system user and should have global admin perms
User currentUser = User.getCurrent();
assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN));
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
for (int i=1; i<=50; i++) {
- AccessControlLists.addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
- Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), table);
+ addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
+ Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
// make sure the system user still shows as authorized
assertTrue("Failed current user auth check on iter "+i,
authManager.authorize(currentUser, Permission.Action.ADMIN));