You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2019/02/07 03:10:39 UTC

[hbase] branch branch-1.2 updated: HBASE-21374 Backport HBASE-21342 to branch-1

This is an automated email from the ASF dual-hosted git repository.

busbey pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new f7b51cc  HBASE-21374 Backport HBASE-21342 to branch-1
f7b51cc is described below

commit f7b51ccdf7414bc1deada02e3add556dc76e97be
Author: mazhenlin <ma...@alibaba-inc.com>
AuthorDate: Fri Oct 26 21:22:29 2018 +0800

    HBASE-21374 Backport HBASE-21342 to branch-1
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    (cherry picked from commit 25d0c3ec7f80f51e814ed53caa9509fe60a4a8cf)
    
    Original commit message:
    
    HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad
    
    Signed-off-by: Mike Drob <md...@apache.org>
    Signed-off-by: Ted Yu <ty...@apache.org>
---
 .../hbase/mapreduce/LoadIncrementalHFiles.java     |   3 +-
 .../security/access/SecureBulkLoadEndpoint.java    |  46 ++++-
 .../access/TestSecureBulkLoadEndpoint.java         | 220 ++++++++++++++++++++-
 3 files changed, 265 insertions(+), 4 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 9cb417e..953ae89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -264,7 +264,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * region boundary, and each part is added back into the queue.
    * The import process finishes when the queue is empty.
    */
-  static class LoadQueueItem {
+  @InterfaceAudience.Private
+  public static class LoadQueueItem {
     final byte[] family;
     final Path hfilePath;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index 48b4f7c..8af4ca7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.security.access;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -134,6 +135,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
   private RegionCoprocessorEnvironment env;
 
   private UserProvider userProvider;
+  private static HashMap<UserGroupInformation, Integer> ugiReferenceCounter = new HashMap<>();
 
   @Override
   public void start(CoprocessorEnvironment env) {
@@ -238,7 +240,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     } finally {
       UserGroupInformation ugi = getActiveUser().getUGI();
       try {
-        if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+        if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
           FileSystem.closeAllForUGI(ugi);
         }
       } catch (IOException e) {
@@ -248,6 +250,43 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     done.run(null);
   }
 
+  @VisibleForTesting
+  interface Consumer<T> {
+    void accept(T t);
+  }
+  private static Consumer<Region> fsCreatedListener;
+
+  @VisibleForTesting
+  static void setFsCreatedListener(Consumer<Region> listener) {
+    fsCreatedListener = listener;
+  }
+
+
+  private void incrementUgiReference(UserGroupInformation ugi) {
+    synchronized (ugiReferenceCounter) {
+      Integer counter = ugiReferenceCounter.get(ugi);
+      ugiReferenceCounter.put(ugi, counter == null ? 1 : ++counter);
+    }
+  }
+
+  private void decrementUgiReference(UserGroupInformation ugi) {
+    synchronized (ugiReferenceCounter) {
+      Integer counter = ugiReferenceCounter.get(ugi);
+      if(counter == null || counter <= 1) {
+        ugiReferenceCounter.remove(ugi);
+      } else {
+        ugiReferenceCounter.put(ugi,--counter);
+      }
+    }
+  }
+
+  private boolean isUserReferenced(UserGroupInformation ugi) {
+    synchronized (ugiReferenceCounter) {
+      Integer count = ugiReferenceCounter.get(ugi);
+      return count != null && count > 0;
+    }
+  }
+
   @Override
   public void secureBulkLoadHFiles(RpcController controller,
                                    SecureBulkLoadHFilesRequest request,
@@ -311,6 +350,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
         }
       }
 
+      incrementUgiReference(ugi);
       loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
         @Override
         public Boolean run() {
@@ -325,6 +365,9 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
                 fs.setPermission(stageFamily, PERM_ALL_ACCESS);
               }
             }
+            if (fsCreatedListener != null) {
+              fsCreatedListener.accept(env.getRegion());
+            }
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return env.getRegion().bulkLoadHFiles(familyPaths, true,
@@ -335,6 +378,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
           return false;
         }
       });
+      decrementUgiReference(ugi);
     }
     if (region.getCoprocessorHost() != null) {
       try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java
index d10d966..d7a4a85 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndpoint.java
@@ -18,25 +18,94 @@
 
 package org.apache.hadoop.hbase.security.access;
 
+import static org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint.Consumer;
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests the SecureBulkLoadEndpoint code.
  */
-@Category(SmallTests.class)
+@Category(MediumTests.class)
 public class TestSecureBulkLoadEndpoint {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSecureBulkLoadEndpoint.class);
+
+  private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
+  private static byte[] FAMILY = Bytes.toBytes("family");
+  private static byte[] COLUMN = Bytes.toBytes("column");
+  private static byte[] key1 = Bytes.toBytes("row1");
+  private static byte[] key2 = Bytes.toBytes("row2");
+  private static byte[] key3 = Bytes.toBytes("row3");
+  private static byte[] value1 = Bytes.toBytes("t1");
+  private static byte[] value3 = Bytes.toBytes("t3");
+  private static byte[] SPLIT_ROWKEY = key2;
+
+  private Thread ealierBulkload;
+  private Thread laterBulkload;
+
+  protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
+  private static Configuration conf = testUtil.getConfiguration();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,SecureBulkLoadEndpoint.class.getName());
+    testUtil.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    testUtil.shutdownMiniCluster();
+    testUtil.cleanupTestDir();
+  }
+
   @Test
   public void testFileSystemsWithoutPermissionSupport() {
     final Configuration emptyConf = new Configuration(false);
@@ -61,4 +130,151 @@ public class TestSecureBulkLoadEndpoint {
     defaultIgnoredSchemes = endpoint.getFileSystemSchemesWithoutPermissionSupport(defaultConf);
     assertEquals(defaultIgnoredSchemes, new HashSet<String>(Arrays.asList("foo", "bar")));
   }
+
+  /**
+   * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
+   * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
+   * calls, or there are other FileSystems created by the same user, they could be closed by a
+   * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
+   * later can not get closed ,or else a race condition occurs.
+   *
+   * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
+   * into two different regions and one bulkload finishes earlier when the other bulkload still
+   * needs its FileSystems, checks that both bulkloads succeed.
+   */
+  @Test
+  public void testForRaceCondition() throws Exception {
+    /// create table
+    testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
+
+    Consumer<Region> fsCreatedListener = new Consumer<Region>() {
+      @Override
+      public void accept(Region hRegion) {
+        if (hRegion.getRegionInfo().containsRow(key3)) {
+          Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
+        }
+      }
+    };
+    SecureBulkLoadEndpoint.setFsCreatedListener(fsCreatedListener);
+
+    /// prepare files
+    Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
+        .getRegionServer().getFileSystem().getHomeDirectory();
+    final Path dir1 = new Path(rootdir, "dir1");
+    prepareHFile(dir1, key1, value1);
+    final Path dir2 = new Path(rootdir, "dir2");
+    prepareHFile(dir2, key3, value3);
+
+    /// do bulkload
+    final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
+    final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
+    ealierBulkload = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          doBulkloadWithoutRetry(dir1);
+        } catch (Exception e) {
+          LOG.error("bulk load failed .",e);
+          t1Exception.set(e);
+        }
+      }
+    });
+    laterBulkload = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          doBulkloadWithoutRetry(dir2);
+        } catch (Exception e) {
+          LOG.error("bulk load failed .",e);
+          t2Exception.set(e);
+        }
+      }
+    });
+    ealierBulkload.start();
+    laterBulkload.start();
+    Threads.shutdown(ealierBulkload);
+    Threads.shutdown(laterBulkload);
+    Assert.assertNull(t1Exception.get());
+    Assert.assertNull(t2Exception.get());
+
+    /// check bulkload ok
+    Get get1 = new Get(key1);
+    Get get3 = new Get(key3);
+    Table t = testUtil.getConnection().getTable(TABLE);
+    Result r = t.get(get1);
+    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
+    r = t.get(get3);
+    Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
+
+  }
+
+  /**
+   * A trick is used to make sure server-side failures( if any ) not being covered up by a client
+   * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
+   * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
+   * to a client exception, a bulkload will always succeed in this case by default, thus client
+   * will never be aware that failures have ever happened . To avoid this kind of retry ,
+   * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
+   * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
+   * once, and server-side failures, if any ,can be checked via data.
+   */
+  class MyExceptionToAvoidRetry extends DoNotRetryIOException {
+  }
+
+  private void doBulkloadWithoutRetry(Path dir) throws Exception {
+    Connection connection = testUtil.getConnection();
+    LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
+      @Override
+      protected void bulkLoadPhase(final Table table, final Connection conn,
+          ExecutorService pool, Deque<LoadQueueItem> queue,
+          final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+        super.bulkLoadPhase(table, conn, pool, queue, regionGroups);
+        throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
+      }
+    };
+    try {
+      h.doBulkLoad(dir, testUtil.getHBaseAdmin(), connection.getTable(TABLE),
+          connection.getRegionLocator(TABLE));
+      Assert.fail("MyExceptionToAvoidRetry is expected");
+    } catch (MyExceptionToAvoidRetry e) { //expected
+    }
+  }
+
+  private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
+    HTableDescriptor desc = testUtil.getHBaseAdmin().getTableDescriptor(TABLE);
+    HColumnDescriptor family = desc.getFamily(FAMILY);
+    Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+
+    CacheConfig writerCacheConf = new CacheConfig(conf, family);
+    writerCacheConf.setCacheDataOnWrite(false);
+    HFileContext hFileContext = new HFileContextBuilder()
+        .withIncludesMvcc(false)
+        .withIncludesTags(true)
+        .withCompression(compression)
+        .withCompressTags(family.isCompressTags())
+        .withChecksumType(HStore.getChecksumType(conf))
+        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+        .withBlockSize(family.getBlocksize())
+        .withHBaseCheckSum(true)
+        .withDataBlockEncoding(family.getDataBlockEncoding())
+        .withEncryptionContext(Encryption.Context.NONE)
+        .withCreateTime(EnvironmentEdgeManager.currentTime())
+        .build();
+    StoreFile.WriterBuilder builder =
+        new StoreFile.WriterBuilder(conf, writerCacheConf, dir.getFileSystem(conf))
+            .withOutputDir(new Path(dir, family.getNameAsString()))
+            .withBloomType(family.getBloomFilterType())
+            .withMaxKeyCount(Integer.MAX_VALUE)
+            .withFileContext(hFileContext);
+    StoreFile.Writer writer = builder.build();
+
+    Put put = new Put(key);
+    put.addColumn(FAMILY, COLUMN, value);
+    for (Cell c : put.get(FAMILY, COLUMN)) {
+      writer.append(c);
+    }
+
+    writer.close();
+  }
+
 }