You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by md...@apache.org on 2018/10/23 20:51:53 UTC
hbase git commit: HBASE-21342 FileSystem in use may get closed by
other bulk load call in secure bulkLoad
Repository: hbase
Updated Branches:
refs/heads/master 807736fcf -> 1e9d99872
HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad
Signed-off-by: Mike Drob <md...@apache.org>
Signed-off-by: Ted Yu <ty...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1e9d9987
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1e9d9987
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1e9d9987
Branch: refs/heads/master
Commit: 1e9d998727773a724d3a37401e4e1cea3474bd9d
Parents: 807736f
Author: mazhenlin <ma...@alibaba-inc.com>
Authored: Fri Oct 19 14:51:00 2018 +0800
Committer: Mike Drob <md...@apache.org>
Committed: Tue Oct 23 15:51:46 2018 -0500
----------------------------------------------------------------------
.../regionserver/SecureBulkLoadManager.java | 45 +++-
.../regionserver/TestSecureBulkLoadManager.java | 248 +++++++++++++++++++
2 files changed, 292 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1e9d9987/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index a4ee517..566a6b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -25,6 +25,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +56,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@@ -106,6 +110,7 @@ public class SecureBulkLoadManager {
private Path baseStagingDir;
private UserProvider userProvider;
+ private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
private Connection conn;
SecureBulkLoadManager(Configuration conf, Connection conn) {
@@ -116,6 +121,7 @@ public class SecureBulkLoadManager {
public void start() throws IOException {
random = new SecureRandom();
userProvider = UserProvider.instantiate(conf);
+ ugiReferenceCounter = new ConcurrentHashMap<>();
fs = FileSystem.get(conf);
baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
@@ -158,7 +164,7 @@ public class SecureBulkLoadManager {
} finally {
UserGroupInformation ugi = getActiveUser().getUGI();
try {
- if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+ if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
@@ -167,6 +173,38 @@ public class SecureBulkLoadManager {
}
}
+ private Consumer<HRegion> fsCreatedListener;
+
+ @VisibleForTesting
+ void setFsCreatedListener(Consumer<HRegion> fsCreatedListener) {
+ this.fsCreatedListener = fsCreatedListener;
+ }
+
+
+ private void incrementUgiReference(UserGroupInformation ugi) {
+ ugiReferenceCounter.merge(ugi, 1, new BiFunction<Integer, Integer, Integer>() {
+ @Override
+ public Integer apply(Integer oldvalue, Integer value) {
+ return ++oldvalue;
+ }
+ });
+ }
+
+ private void decrementUgiReference(UserGroupInformation ugi) {
+ ugiReferenceCounter.computeIfPresent(ugi,
+ new BiFunction<UserGroupInformation, Integer, Integer>() {
+ @Override
+ public Integer apply(UserGroupInformation key, Integer value) {
+ return value > 1 ? --value : null;
+ }
+ });
+ }
+
+ private boolean isUserReferenced(UserGroupInformation ugi) {
+ Integer count = ugiReferenceCounter.get(ugi);
+ return count != null && count > 0;
+ }
+
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
final BulkLoadHFileRequest request) throws IOException {
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
@@ -208,6 +246,7 @@ public class SecureBulkLoadManager {
Map<byte[], List<Path>> map = null;
try {
+ incrementUgiReference(ugi);
// Get the target fs (HBase region server fs) delegation token
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
// the 'request user' necessary token to operate on the target fs.
@@ -237,6 +276,9 @@ public class SecureBulkLoadManager {
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
}
}
+ if (fsCreatedListener != null) {
+ fsCreatedListener.accept(region);
+ }
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
@@ -248,6 +290,7 @@ public class SecureBulkLoadManager {
}
});
} finally {
+ decrementUgiReference(ugi);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1e9d9987/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
new file mode 100644
index 0000000..75ebfd3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestSecureBulkLoadManager {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
+
+ private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
+ private static byte[] FAMILY = Bytes.toBytes("family");
+ private static byte[] COLUMN = Bytes.toBytes("column");
+ private static byte[] key1 = Bytes.toBytes("row1");
+ private static byte[] key2 = Bytes.toBytes("row2");
+ private static byte[] key3 = Bytes.toBytes("row3");
+ private static byte[] value1 = Bytes.toBytes("t1");
+ private static byte[] value3 = Bytes.toBytes("t3");
+ private static byte[] SPLIT_ROWKEY = key2;
+
+ private Thread ealierBulkload;
+ private Thread laterBulkload;
+
+ protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ private static Configuration conf = testUtil.getConfiguration();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ testUtil.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ testUtil.shutdownMiniCluster();
+ testUtil.cleanupTestDir();
+ }
+
+ /**
+ * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
+ * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
+ * calls, or there are other FileSystems created by the same user, they could be closed by a
+ * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
+ * later can not get closed ,or else a race condition occurs.
+ *
+ * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
+ * into two different regions and one bulkload finishes earlier when the other bulkload still
+ * needs its FileSystems, checks that both bulkloads succeed.
+ */
+ @Test
+ public void testForRaceCondition() throws Exception {
+ Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
+ @Override
+ public void accept(HRegion hRegion) {
+ if (hRegion.getRegionInfo().containsRow(key3)) {
+ Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
+ }
+ }
+ } ;
+ testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
+ .secureBulkLoadManager.setFsCreatedListener(fsCreatedListener);
+ /// create table
+ testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
+
+ /// prepare files
+ Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
+ .getRegionServer().getRootDir();
+ Path dir1 = new Path(rootdir, "dir1");
+ prepareHFile(dir1, key1, value1);
+ Path dir2 = new Path(rootdir, "dir2");
+ prepareHFile(dir2, key3, value3);
+
+ /// do bulkload
+ final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
+ final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
+ ealierBulkload = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doBulkloadWithoutRetry(dir1);
+ } catch (Exception e) {
+ LOG.error("bulk load failed .",e);
+ t1Exception.set(e);
+ }
+ }
+ });
+ laterBulkload = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doBulkloadWithoutRetry(dir2);
+ } catch (Exception e) {
+ LOG.error("bulk load failed .",e);
+ t2Exception.set(e);
+ }
+ }
+ });
+ ealierBulkload.start();
+ laterBulkload.start();
+ Threads.shutdown(ealierBulkload);
+ Threads.shutdown(laterBulkload);
+ Assert.assertNull(t1Exception.get());
+ Assert.assertNull(t2Exception.get());
+
+ /// check bulkload ok
+ Get get1 = new Get(key1);
+ Get get3 = new Get(key3);
+ Table t = testUtil.getConnection().getTable(TABLE);
+ Result r = t.get(get1);
+ Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
+ r = t.get(get3);
+ Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
+
+ }
+
+ /**
+ * A trick is used to make sure server-side failures( if any ) not being covered up by a client
+ * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
+ * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
+ * to a client exception, a bulkload will always succeed in this case by default, thus client
+ * will never be aware that failures have ever happened . To avoid this kind of retry ,
+ * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
+ * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
+ * once, and server-side failures, if any ,can be checked via data.
+ */
+ class MyExceptionToAvoidRetry extends DoNotRetryIOException {
+ }
+
+ private void doBulkloadWithoutRetry(Path dir) throws Exception {
+ Connection connection = testUtil.getConnection();
+ LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
+ @Override
+ protected void bulkLoadPhase(final Table htable, final Connection conn,
+ ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
+ Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+ throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
+ }
+ };
+ try {
+ h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
+ connection.getRegionLocator(TABLE));
+ Assert.fail("MyExceptionToAvoidRetry is expected");
+ } catch (MyExceptionToAvoidRetry e) { //expected
+ }
+ }
+
+ private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
+ TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
+ ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
+ Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+
+ CacheConfig writerCacheConf = new CacheConfig(conf, family);
+ writerCacheConf.setCacheDataOnWrite(false);
+ HFileContext hFileContext = new HFileContextBuilder()
+ .withIncludesMvcc(false)
+ .withIncludesTags(true)
+ .withCompression(compression)
+ .withCompressTags(family.isCompressTags())
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(family.getBlocksize())
+ .withHBaseCheckSum(true)
+ .withDataBlockEncoding(family.getDataBlockEncoding())
+ .withEncryptionContext(Encryption.Context.NONE)
+ .withCreateTime(EnvironmentEdgeManager.currentTime())
+ .build();
+ StoreFileWriter.Builder builder =
+ new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
+ .withOutputDir(new Path(dir, family.getNameAsString()))
+ .withBloomType(family.getBloomFilterType())
+ .withMaxKeyCount(Integer.MAX_VALUE)
+ .withFileContext(hFileContext);
+ StoreFileWriter writer = builder.build();
+
+ Put put = new Put(key);
+ put.addColumn(FAMILY, COLUMN, value);
+ for (Cell c : put.get(FAMILY, COLUMN)) {
+ writer.append(c);
+ }
+
+ writer.close();
+ }
+}