You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2019/02/07 03:10:39 UTC
[hbase] branch branch-1.2 updated: HBASE-21374 Backport HBASE-21342
to branch-1
This is an automated email from the ASF dual-hosted git repository.
busbey pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new f7b51cc HBASE-21374 Backport HBASE-21342 to branch-1
f7b51cc is described below
commit f7b51ccdf7414bc1deada02e3add556dc76e97be
Author: mazhenlin <ma...@alibaba-inc.com>
AuthorDate: Fri Oct 26 21:22:29 2018 +0800
HBASE-21374 Backport HBASE-21342 to branch-1
Signed-off-by: Andrew Purtell <ap...@apache.org>
(cherry picked from commit 25d0c3ec7f80f51e814ed53caa9509fe60a4a8cf)
Original commit message:
HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad
Signed-off-by: Mike Drob <md...@apache.org>
Signed-off-by: Ted Yu <ty...@apache.org>
---
.../hbase/mapreduce/LoadIncrementalHFiles.java | 3 +-
.../security/access/SecureBulkLoadEndpoint.java | 46 ++++-
.../access/TestSecureBulkLoadEndpoint.java | 220 ++++++++++++++++++++-
3 files changed, 265 insertions(+), 4 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 9cb417e..953ae89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -264,7 +264,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* region boundary, and each part is added back into the queue.
* The import process finishes when the queue is empty.
*/
- static class LoadQueueItem {
+ @InterfaceAudience.Private
+ public static class LoadQueueItem {
final byte[] family;
final Path hfilePath;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index 48b4f7c..8af4ca7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.security.access;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@@ -134,6 +135,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
private RegionCoprocessorEnvironment env;
private UserProvider userProvider;
+ private static HashMap<UserGroupInformation, Integer> ugiReferenceCounter = new HashMap<>();
@Override
public void start(CoprocessorEnvironment env) {
@@ -238,7 +240,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
} finally {
UserGroupInformation ugi = getActiveUser().getUGI();
try {
- if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+ if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
@@ -248,6 +250,43 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
done.run(null);
}
+ @VisibleForTesting
+ interface Consumer<T> {
+ void accept(T t);
+ }
+ private static Consumer<Region> fsCreatedListener;
+
+ @VisibleForTesting
+ static void setFsCreatedListener(Consumer<Region> listener) {
+ fsCreatedListener = listener;
+ }
+
+
+ private void incrementUgiReference(UserGroupInformation ugi) {
+ synchronized (ugiReferenceCounter) {
+ Integer counter = ugiReferenceCounter.get(ugi);
+ ugiReferenceCounter.put(ugi, counter == null ? 1 : ++counter);
+ }
+ }
+
+ private void decrementUgiReference(UserGroupInformation ugi) {
+ synchronized (ugiReferenceCounter) {
+ Integer counter = ugiReferenceCounter.get(ugi);
+ if(counter == null || counter <= 1) {
+ ugiReferenceCounter.remove(ugi);
+ } else {
+ ugiReferenceCounter.put(ugi,--counter);
+ }
+ }
+ }
+
+ private boolean isUserReferenced(UserGroupInformation ugi) {
+ synchronized (ugiReferenceCounter) {
+ Integer count = ugiReferenceCounter.get(ugi);
+ return count != null && count > 0;
+ }
+ }
+
@Override
public void secureBulkLoadHFiles(RpcController controller,
SecureBulkLoadHFilesRequest request,
@@ -311,6 +350,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
}
+ incrementUgiReference(ugi);
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
@@ -325,6 +365,9 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
}
}
+ if (fsCreatedListener != null) {
+ fsCreatedListener.accept(env.getRegion());
+ }
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return env.getRegion().bulkLoadHFiles(familyPaths, true,
@@ -335,6 +378,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
return false;
}
});
+ decrementUgiReference(ugi);
}
if (region.getCoprocessorHost() != null) {
try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java
index d10d966..d7a4a85 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java
@@ -18,25 +18,94 @@
package org.apache.hadoop.hbase.security.access;
+import static org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint.Consumer;
import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the SecureBulkLoadEndpoint code.
*/
-@Category(SmallTests.class)
+@Category(MediumTests.class)
public class TestSecureBulkLoadEndpoint {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSecureBulkLoadEndpoint.class);
+
+ private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
+ private static byte[] FAMILY = Bytes.toBytes("family");
+ private static byte[] COLUMN = Bytes.toBytes("column");
+ private static byte[] key1 = Bytes.toBytes("row1");
+ private static byte[] key2 = Bytes.toBytes("row2");
+ private static byte[] key3 = Bytes.toBytes("row3");
+ private static byte[] value1 = Bytes.toBytes("t1");
+ private static byte[] value3 = Bytes.toBytes("t3");
+ private static byte[] SPLIT_ROWKEY = key2;
+
+ private Thread ealierBulkload;
+ private Thread laterBulkload;
+
+ protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ private static Configuration conf = testUtil.getConfiguration();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,SecureBulkLoadEndpoint.class.getName());
+ testUtil.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ testUtil.shutdownMiniCluster();
+ testUtil.cleanupTestDir();
+ }
+
@Test
public void testFileSystemsWithoutPermissionSupport() {
final Configuration emptyConf = new Configuration(false);
@@ -61,4 +130,151 @@ public class TestSecureBulkLoadEndpoint {
defaultIgnoredSchemes = endpoint.getFileSystemSchemesWithoutPermissionSupport(defaultConf);
assertEquals(defaultIgnoredSchemes, new HashSet<String>(Arrays.asList("foo", "bar")));
}
+
+ /**
+ * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
+ * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
+ * calls, or there are other FileSystems created by the same user, they could be closed by a
+ * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
+ * later can not get closed ,or else a race condition occurs.
+ *
+ * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
+ * into two different regions and one bulkload finishes earlier when the other bulkload still
+ * needs its FileSystems, checks that both bulkloads succeed.
+ */
+ @Test
+ public void testForRaceCondition() throws Exception {
+ /// create table
+ testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
+
+ Consumer<Region> fsCreatedListener = new Consumer<Region>() {
+ @Override
+ public void accept(Region hRegion) {
+ if (hRegion.getRegionInfo().containsRow(key3)) {
+ Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
+ }
+ }
+ };
+ SecureBulkLoadEndpoint.setFsCreatedListener(fsCreatedListener);
+
+ /// prepare files
+ Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
+ .getRegionServer().getFileSystem().getHomeDirectory();
+ final Path dir1 = new Path(rootdir, "dir1");
+ prepareHFile(dir1, key1, value1);
+ final Path dir2 = new Path(rootdir, "dir2");
+ prepareHFile(dir2, key3, value3);
+
+ /// do bulkload
+ final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
+ final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
+ ealierBulkload = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doBulkloadWithoutRetry(dir1);
+ } catch (Exception e) {
+ LOG.error("bulk load failed .",e);
+ t1Exception.set(e);
+ }
+ }
+ });
+ laterBulkload = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doBulkloadWithoutRetry(dir2);
+ } catch (Exception e) {
+ LOG.error("bulk load failed .",e);
+ t2Exception.set(e);
+ }
+ }
+ });
+ ealierBulkload.start();
+ laterBulkload.start();
+ Threads.shutdown(ealierBulkload);
+ Threads.shutdown(laterBulkload);
+ Assert.assertNull(t1Exception.get());
+ Assert.assertNull(t2Exception.get());
+
+ /// check bulkload ok
+ Get get1 = new Get(key1);
+ Get get3 = new Get(key3);
+ Table t = testUtil.getConnection().getTable(TABLE);
+ Result r = t.get(get1);
+ Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
+ r = t.get(get3);
+ Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
+
+ }
+
+ /**
+ * A trick is used to make sure server-side failures( if any ) not being covered up by a client
+ * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
+ * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
+ * to a client exception, a bulkload will always succeed in this case by default, thus client
+ * will never be aware that failures have ever happened . To avoid this kind of retry ,
+ * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
+ * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
+ * once, and server-side failures, if any ,can be checked via data.
+ */
+ class MyExceptionToAvoidRetry extends DoNotRetryIOException {
+ }
+
+ private void doBulkloadWithoutRetry(Path dir) throws Exception {
+ Connection connection = testUtil.getConnection();
+ LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
+ @Override
+ protected void bulkLoadPhase(final Table table, final Connection conn,
+ ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+ super.bulkLoadPhase(table, conn, pool, queue, regionGroups);
+ throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
+ }
+ };
+ try {
+ h.doBulkLoad(dir, testUtil.getHBaseAdmin(), connection.getTable(TABLE),
+ connection.getRegionLocator(TABLE));
+ Assert.fail("MyExceptionToAvoidRetry is expected");
+ } catch (MyExceptionToAvoidRetry e) { //expected
+ }
+ }
+
+ private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
+ HTableDescriptor desc = testUtil.getHBaseAdmin().getTableDescriptor(TABLE);
+ HColumnDescriptor family = desc.getFamily(FAMILY);
+ Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+
+ CacheConfig writerCacheConf = new CacheConfig(conf, family);
+ writerCacheConf.setCacheDataOnWrite(false);
+ HFileContext hFileContext = new HFileContextBuilder()
+ .withIncludesMvcc(false)
+ .withIncludesTags(true)
+ .withCompression(compression)
+ .withCompressTags(family.isCompressTags())
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(family.getBlocksize())
+ .withHBaseCheckSum(true)
+ .withDataBlockEncoding(family.getDataBlockEncoding())
+ .withEncryptionContext(Encryption.Context.NONE)
+ .withCreateTime(EnvironmentEdgeManager.currentTime())
+ .build();
+ StoreFile.WriterBuilder builder =
+ new StoreFile.WriterBuilder(conf, writerCacheConf, dir.getFileSystem(conf))
+ .withOutputDir(new Path(dir, family.getNameAsString()))
+ .withBloomType(family.getBloomFilterType())
+ .withMaxKeyCount(Integer.MAX_VALUE)
+ .withFileContext(hFileContext);
+ StoreFile.Writer writer = builder.build();
+
+ Put put = new Put(key);
+ put.addColumn(FAMILY, COLUMN, value);
+ for (Cell c : put.get(FAMILY, COLUMN)) {
+ writer.append(c);
+ }
+
+ writer.close();
+ }
+
}