You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/04/07 15:55:11 UTC

[hbase] branch branch-2 updated: HBASE-24055 Make AsyncFSWAL can run on EC cluster (#1437)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 74a85e2  HBASE-24055 Make AsyncFSWAL can run on EC cluster (#1437)
74a85e2 is described below

commit 74a85e26eea4aa9a62192b0770ead4dd359a7fee
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Apr 7 23:41:35 2020 +0800

    HBASE-24055 Make AsyncFSWAL can run on EC cluster (#1437)
    
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
    Signed-off-by: stack <st...@apache.org>
---
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 33 +++++++++--
 .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java  |  2 +-
 .../hbase/regionserver/wal/TestHBaseWalOnEC.java   | 67 +++++++++++++---------
 3 files changed, 71 insertions(+), 31 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 5eb2004..32cdfcf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.apache.hadoop.fs.CreateFlag.CREATE;
-import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@@ -177,6 +175,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final FileCreator FILE_CREATOR;
 
+  // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
+  // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
+  // indirectly reference it through reflection.
+  private static final CreateFlag SHOULD_REPLICATE_FLAG;
+
   private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
     Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
     isClientRunningMethod.setAccessible(true);
@@ -272,6 +275,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return createFileCreator2();
   }
 
+  private static CreateFlag loadShouldReplicateFlag() {
+    try {
+      return CreateFlag.valueOf("SHOULD_REPLICATE");
+    } catch (IllegalArgumentException e) {
+      LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
+      return null;
+    }
+  }
+
   // cancel the processing if DFSClient is already closed.
   static final class CancelOnClose implements CancelableProgressable {
 
@@ -292,6 +304,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       LEASE_MANAGER = createLeaseManager();
       DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
       FILE_CREATOR = createFileCreator();
+      SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
     } catch (Exception e) {
       String msg = "Couldn't properly initialize access to HDFS internals. Please " +
           "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
@@ -486,6 +499,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     }
   }
 
+  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
+    List<CreateFlag> flags = new ArrayList<>();
+    flags.add(CreateFlag.CREATE);
+    if (overwrite) {
+      flags.add(CreateFlag.OVERWRITE);
+    }
+    if (SHOULD_REPLICATE_FLAG != null) {
+      flags.add(SHOULD_REPLICATE_FLAG);
+    }
+    return new EnumSetWritable<>(EnumSet.copyOf(flags));
+  }
+
   private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
       boolean overwrite, boolean createParent, short replication, long blockSize,
       EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
@@ -502,8 +527,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       try {
         stat = FILE_CREATOR.create(namenode, src,
           FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
-          new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
-          createParent, replication, blockSize, CryptoProtocolVersion.supported());
+          getCreateFlags(overwrite), createParent, replication, blockSize,
+          CryptoProtocolVersion.supported());
       } catch (Exception e) {
         if (e instanceof RemoteException) {
           throw (RemoteException) e;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 6be44e9..d23e816 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import java.io.FileNotFoundException;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
index 96c5729..5a96b4b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
@@ -34,40 +35,49 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
-@Category(LargeTests.class)
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, LargeTests.class })
 public class TestHBaseWalOnEC {
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
+    HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
 
-  private static final HBaseTestingUtility util = new HBaseTestingUtility();
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   @BeforeClass
-  public static void setup() throws Exception {
+  public static void setUpBeforeClass() throws Exception {
     try {
-      MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
+      MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
       DistributedFileSystem fs = cluster.getFileSystem();
 
-      Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies",
-          DistributedFileSystem.class);
+      Method enableAllECPolicies =
+        DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
       enableAllECPolicies.invoke(null, fs);
 
       DFSClient client = fs.getClient();
-      Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy",
-          String.class, String.class);
+      Method setErasureCodingPolicy =
+        DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
       setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
 
       try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
@@ -80,25 +90,31 @@ public class TestHBaseWalOnEC {
       Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
     }
 
-    util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
-    util.startMiniCluster();
+    UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
+
   }
 
-  @AfterClass
-  public static void tearDown() throws Exception {
-    util.shutdownMiniCluster();
+  @Parameter
+  public String walProvider;
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
   }
 
-  @Test
-  public void testStreamCreate() throws IOException {
-    try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
-        new Path("/testStreamCreate"), true)) {
-      assertTrue(out.hasCapability(StreamCapabilities.HFLUSH));
-    }
+  @Before
+  public void setUp() throws Exception {
+    UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
+    UTIL.startMiniCluster(3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
   }
 
   @Test
-  public void testFlush() throws IOException {
+  public void testReadWrite() throws IOException {
     byte[] row = Bytes.toBytes("row");
     byte[] cf = Bytes.toBytes("cf");
     byte[] cq = Bytes.toBytes("cq");
@@ -106,12 +122,11 @@ public class TestHBaseWalOnEC {
 
     TableName name = TableName.valueOf(getClass().getSimpleName());
 
-    Table t = util.createTable(name, cf);
+    Table t = UTIL.createTable(name, cf);
     t.put(new Put(row).addColumn(cf, cq, value));
 
-    util.getAdmin().flush(name);
+    UTIL.getAdmin().flush(name);
 
     assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
   }
 }
-