You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by md...@apache.org on 2018/10/23 20:51:53 UTC

hbase git commit: HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad

Repository: hbase
Updated Branches:
  refs/heads/master 807736fcf -> 1e9d99872


HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad

Signed-off-by: Mike Drob <md...@apache.org>
Signed-off-by: Ted Yu <ty...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1e9d9987
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1e9d9987
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1e9d9987

Branch: refs/heads/master
Commit: 1e9d998727773a724d3a37401e4e1cea3474bd9d
Parents: 807736f
Author: mazhenlin <ma...@alibaba-inc.com>
Authored: Fri Oct 19 14:51:00 2018 +0800
Committer: Mike Drob <md...@apache.org>
Committed: Tue Oct 23 15:51:46 2018 -0500

----------------------------------------------------------------------
 .../regionserver/SecureBulkLoadManager.java     |  45 +++-
 .../regionserver/TestSecureBulkLoadManager.java | 248 +++++++++++++++++++
 2 files changed, 292 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1e9d9987/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index a4ee517..566a6b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -25,6 +25,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +56,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@@ -106,6 +110,7 @@ public class SecureBulkLoadManager {
   private Path baseStagingDir;
 
   private UserProvider userProvider;
+  private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
   private Connection conn;
 
   SecureBulkLoadManager(Configuration conf, Connection conn) {
@@ -116,6 +121,7 @@ public class SecureBulkLoadManager {
   public void start() throws IOException {
     random = new SecureRandom();
     userProvider = UserProvider.instantiate(conf);
+    ugiReferenceCounter = new ConcurrentHashMap<>();
     fs = FileSystem.get(conf);
     baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
 
@@ -158,7 +164,7 @@ public class SecureBulkLoadManager {
     } finally {
       UserGroupInformation ugi = getActiveUser().getUGI();
       try {
-        if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+        if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
           FileSystem.closeAllForUGI(ugi);
         }
       } catch (IOException e) {
@@ -167,6 +173,38 @@ public class SecureBulkLoadManager {
     }
   }
 
+  private Consumer<HRegion> fsCreatedListener;
+
+  @VisibleForTesting
+  void setFsCreatedListener(Consumer<HRegion> fsCreatedListener) {
+    this.fsCreatedListener = fsCreatedListener;
+  }
+
+
+  private void incrementUgiReference(UserGroupInformation ugi) {
+    ugiReferenceCounter.merge(ugi, 1, new BiFunction<Integer, Integer, Integer>() {
+      @Override
+      public Integer apply(Integer oldvalue, Integer value) {
+        return ++oldvalue;
+      }
+    });
+  }
+
+  private void decrementUgiReference(UserGroupInformation ugi) {
+    ugiReferenceCounter.computeIfPresent(ugi,
+        new BiFunction<UserGroupInformation, Integer, Integer>() {
+          @Override
+          public Integer apply(UserGroupInformation key, Integer value) {
+            return value > 1 ? --value : null;
+          }
+      });
+  }
+
+  private boolean isUserReferenced(UserGroupInformation ugi) {
+    Integer count = ugiReferenceCounter.get(ugi);
+    return count != null && count > 0;
+  }
+
   public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
       final BulkLoadHFileRequest request) throws IOException {
     final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
@@ -208,6 +246,7 @@ public class SecureBulkLoadManager {
     Map<byte[], List<Path>> map = null;
 
     try {
+      incrementUgiReference(ugi);
       // Get the target fs (HBase region server fs) delegation token
       // Since we have checked the permission via 'preBulkLoadHFile', now let's give
       // the 'request user' necessary token to operate on the target fs.
@@ -237,6 +276,9 @@ public class SecureBulkLoadManager {
                 fs.setPermission(stageFamily, PERM_ALL_ACCESS);
               }
             }
+            if (fsCreatedListener != null) {
+              fsCreatedListener.accept(region);
+            }
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
@@ -248,6 +290,7 @@ public class SecureBulkLoadManager {
         }
       });
     } finally {
+      decrementUgiReference(ugi);
       if (region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e9d9987/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
new file mode 100644
index 0000000..75ebfd3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestSecureBulkLoadManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
+
+  private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
+  private static byte[] FAMILY = Bytes.toBytes("family");
+  private static byte[] COLUMN = Bytes.toBytes("column");
+  private static byte[] key1 = Bytes.toBytes("row1");
+  private static byte[] key2 = Bytes.toBytes("row2");
+  private static byte[] key3 = Bytes.toBytes("row3");
+  private static byte[] value1 = Bytes.toBytes("t1");
+  private static byte[] value3 = Bytes.toBytes("t3");
+  private static byte[] SPLIT_ROWKEY = key2;
+
+  private Thread ealierBulkload;
+  private Thread laterBulkload;
+
+  protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
+  private static Configuration conf = testUtil.getConfiguration();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    testUtil.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    testUtil.shutdownMiniCluster();
+    testUtil.cleanupTestDir();
+  }
+
+  /**
+   * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
+   * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
+   * calls, or there are other FileSystems created by the same user, they could be closed by a
+   * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
+   * later can not get closed ,or else a race condition occurs.
+   *
+   * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
+   * into two different regions and one bulkload finishes earlier when the other bulkload still
+   * needs its FileSystems, checks that both bulkloads succeed.
+   */
+  @Test
+  public void testForRaceCondition() throws Exception {
+    Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
+      @Override
+      public void accept(HRegion hRegion) {
+        if (hRegion.getRegionInfo().containsRow(key3)) {
+          Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
+        }
+      }
+    } ;
+    testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
+        .secureBulkLoadManager.setFsCreatedListener(fsCreatedListener);
+    /// create table
+    testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
+
+    /// prepare files
+    Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
+        .getRegionServer().getRootDir();
+    Path dir1 = new Path(rootdir, "dir1");
+    prepareHFile(dir1, key1, value1);
+    Path dir2 = new Path(rootdir, "dir2");
+    prepareHFile(dir2, key3, value3);
+
+    /// do bulkload
+    final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
+    final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
+    ealierBulkload = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          doBulkloadWithoutRetry(dir1);
+        } catch (Exception e) {
+          LOG.error("bulk load failed .",e);
+          t1Exception.set(e);
+        }
+      }
+    });
+    laterBulkload = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          doBulkloadWithoutRetry(dir2);
+        } catch (Exception e) {
+          LOG.error("bulk load failed .",e);
+          t2Exception.set(e);
+        }
+      }
+    });
+    ealierBulkload.start();
+    laterBulkload.start();
+    Threads.shutdown(ealierBulkload);
+    Threads.shutdown(laterBulkload);
+    Assert.assertNull(t1Exception.get());
+    Assert.assertNull(t2Exception.get());
+
+    /// check bulkload ok
+    Get get1 = new Get(key1);
+    Get get3 = new Get(key3);
+    Table t = testUtil.getConnection().getTable(TABLE);
+    Result r = t.get(get1);
+    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
+    r = t.get(get3);
+    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
+
+  }
+
+  /**
+   * A trick is used to make sure server-side failures( if any ) not being covered up by a client
+   * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
+   * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
+   * to a client exception, a bulkload will always succeed in this case by default, thus client
+   * will never be aware that failures have ever happened . To avoid this kind of retry ,
+   * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
+   * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
+   * once, and server-side failures, if any ,can be checked via data.
+   */
+  class MyExceptionToAvoidRetry extends DoNotRetryIOException {
+  }
+
+  private void doBulkloadWithoutRetry(Path dir) throws Exception {
+    Connection connection = testUtil.getConnection();
+    LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
+      @Override
+      protected void bulkLoadPhase(final Table htable, final Connection conn,
+          ExecutorService pool, Deque<LoadQueueItem> queue,
+          final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
+          Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+        super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+        throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
+      }
+    };
+    try {
+      h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
+          connection.getRegionLocator(TABLE));
+      Assert.fail("MyExceptionToAvoidRetry is expected");
+    } catch (MyExceptionToAvoidRetry e) { //expected
+    }
+  }
+
+  private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
+    TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
+    ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
+    Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+
+    CacheConfig writerCacheConf = new CacheConfig(conf, family);
+    writerCacheConf.setCacheDataOnWrite(false);
+    HFileContext hFileContext = new HFileContextBuilder()
+        .withIncludesMvcc(false)
+        .withIncludesTags(true)
+        .withCompression(compression)
+        .withCompressTags(family.isCompressTags())
+        .withChecksumType(HStore.getChecksumType(conf))
+        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+        .withBlockSize(family.getBlocksize())
+        .withHBaseCheckSum(true)
+        .withDataBlockEncoding(family.getDataBlockEncoding())
+        .withEncryptionContext(Encryption.Context.NONE)
+        .withCreateTime(EnvironmentEdgeManager.currentTime())
+        .build();
+    StoreFileWriter.Builder builder =
+        new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
+        .withOutputDir(new Path(dir, family.getNameAsString()))
+        .withBloomType(family.getBloomFilterType())
+        .withMaxKeyCount(Integer.MAX_VALUE)
+        .withFileContext(hFileContext);
+    StoreFileWriter writer = builder.build();
+
+    Put put = new Put(key);
+    put.addColumn(FAMILY, COLUMN, value);
+    for (Cell c : put.get(FAMILY, COLUMN)) {
+      writer.append(c);
+    }
+
+    writer.close();
+  }
+}