You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/08/10 09:36:26 UTC

[hbase] branch branch-2 updated: HBASE-24694 Support flush a single column family of table (#2218)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ebf493f  HBASE-24694 Support flush a single column family of table (#2218)
ebf493f is described below

commit ebf493f07523825f0e1045091c595e3f15ede33d
Author: bsglz <18...@qq.com>
AuthorDate: Mon Aug 10 17:36:13 2020 +0800

    HBASE-24694 Support flush a single column family of table (#2218)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../java/org/apache/hadoop/hbase/client/Admin.java | 10 +++++++++
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |  8 +++++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       |  5 +++++
 .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 11 +++++++++-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    | 12 +++++++++--
 .../java/org/apache/hadoop/hbase/HConstants.java   |  3 +++
 .../procedure/flush/FlushTableSubprocedure.java    | 25 +++++++++++++++++-----
 .../flush/MasterFlushTableProcedureManager.java    | 12 ++++++++++-
 .../RegionServerFlushTableProcedureManager.java    | 21 ++++++++++++++----
 .../hadoop/hbase/client/TestFlushFromClient.java   | 19 ++++++++++++++++
 hbase-shell/src/main/ruby/hbase/admin.rb           |  6 +++++-
 hbase-shell/src/main/ruby/shell/commands/flush.rb  |  4 +++-
 .../hadoop/hbase/thrift2/client/ThriftAdmin.java   |  5 +++++
 13 files changed, 126 insertions(+), 15 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index d5483eb..b8153a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -883,6 +883,16 @@ public interface Admin extends Abortable, Closeable {
   void flush(TableName tableName) throws IOException;
 
   /**
+   * Flush the specified column family stores on all regions of the passed table.
+   * This runs as a synchronous operation.
+   *
+   * @param tableName table to flush
+   * @param columnFamily column family within a table
+   * @throws IOException if a remote or network exception occurs
+   */
+  void flush(TableName tableName, byte[] columnFamily) throws IOException;
+
+  /**
    * Flush an individual region. Synchronous operation.
    *
    * @param regionName region to flush
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index b272e75..87e1df9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -307,6 +307,14 @@ public interface AsyncAdmin {
   CompletableFuture<Void> flush(TableName tableName);
 
   /**
+   * Flush the specified column family stores on all regions of the passed table.
+   * This runs as a synchronous operation.
+   * @param tableName table to flush
+   * @param columnFamily column family within a table
+   */
+  CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);
+
+  /**
    * Flush an individual region.
    * @param regionName region to flush
    */
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index c004d9f..376c5dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -245,6 +245,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
+  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
+    return wrap(rawAdmin.flush(tableName, columnFamily));
+  }
+
+  @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
     return wrap(rawAdmin.flushRegion(regionName));
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index a24c17c..ac78578 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -1151,12 +1151,21 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public void flush(final TableName tableName) throws IOException {
+    flush(tableName, null);
+  }
+
+  @Override
+  public void flush(final TableName tableName, byte[] columnFamily) throws IOException {
     checkTableExists(tableName);
     if (isTableDisabled(tableName)) {
       LOG.info("Table is disabled: " + tableName.getNameAsString());
       return;
     }
-    execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>());
+    Map<String, String> props = new HashMap<>();
+    if (columnFamily != null) {
+      props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
+    }
+    execProcedure("flush-table-proc", tableName.getNameAsString(), props);
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index b25b287..02cbcef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -891,9 +891,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
           locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
     }
   }
-
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
+    return flush(tableName, null);
+  }
+
+  @Override
+  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
     CompletableFuture<Void> future = new CompletableFuture<>();
     addListener(tableExists(tableName), (exists, err) -> {
       if (err != null) {
@@ -907,8 +911,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
           } else if (!tableEnabled) {
             future.completeExceptionally(new TableNotEnabledException(tableName));
           } else {
+            Map<String, String> props = new HashMap<>();
+            if (columnFamily != null) {
+              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
+            }
             addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
-              new HashMap<>()), (ret, err3) -> {
+              props), (ret, err3) -> {
                 if (err3 != null) {
                   future.completeExceptionally(err3);
                 } else {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 16bee93..e50f30b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -616,6 +616,9 @@ public final class HConstants {
    */
   public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v");
 
+  /** The family str as a key in map*/
+  public static final String FAMILY_KEY_STR = "family";
+
   /**
    * The current version of the meta table.
    * - pre-hbase 0.92.  There is no META_VERSION column in the root table
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index 5c005a7..d124039 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.procedure.flush;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 
@@ -28,7 +29,9 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * This flush region implementation uses the distributed procedure framework to flush
@@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure {
   private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);
 
   private final String table;
+  private final String family;
   private final List<HRegion> regions;
   private final FlushTableSubprocedurePool taskManager;
 
   public FlushTableSubprocedure(ProcedureMember member,
       ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
-      List<HRegion> regions, String table,
+      List<HRegion> regions, String table, String family,
       FlushTableSubprocedurePool taskManager) {
     super(member, table, errorListener, wakeFrequency, timeout);
     this.table = table;
+    this.family = family;
     this.regions = regions;
     this.taskManager = taskManager;
   }
 
   private static class RegionFlushTask implements Callable<Void> {
     HRegion region;
-    RegionFlushTask(HRegion region) {
+    List<byte[]> families;
+    RegionFlushTask(HRegion region, List<byte[]> families) {
       this.region = region;
+      this.families = families;
     }
 
     @Override
@@ -65,7 +72,11 @@ public class FlushTableSubprocedure extends Subprocedure {
       region.startRegionOperation();
       try {
         LOG.debug("Flush region " + region.toString() + " started...");
-        region.flush(true);
+        if (families == null) {
+          region.flush(true);
+        } else {
+          region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
+        }
         // TODO: flush result is not checked?
       } finally {
         LOG.debug("Closing region operation on " + region);
@@ -88,11 +99,15 @@ public class FlushTableSubprocedure extends Subprocedure {
       throw new IllegalStateException("Attempting to flush "
           + table + " but we currently have outstanding tasks");
     }
-
+    List<byte[]> families = null;
+    if (family != null) {
+      LOG.debug("About to flush family {} on all regions for table {}", family, table);
+      families = Arrays.asList(Bytes.toBytes(family));
+    }
     // Add all hfiles already existing in region.
     for (HRegion region : regions) {
       // submit one task per region for parallelize by region.
-      taskManager.submitTask(new RegionFlushTask(region));
+      taskManager.submitTask(new RegionFlushTask(region, families));
       monitor.rethrowException();
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index 510fbcf..5e62c42 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@@ -149,11 +151,19 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
 
     ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
 
+    HBaseProtos.NameStringPair family = null;
+    for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
+      if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
+        family = nsp;
+      }
+    }
+    byte[] procArgs = family != null ? family.toByteArray() : new byte[0];
+
     // Kick of the global procedure from the master coordinator to the region servers.
     // We rely on the existing Distributed Procedure framework to prevent any concurrent
     // procedure with the same name.
     Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
-      new byte[0], Lists.newArrayList(regionServers));
+      procArgs, Lists.newArrayList(regionServers));
     monitor.rethrowException();
     if (proc == null) {
       String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index ddd667f..cb5d54f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -51,6 +51,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
 /**
  * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
@@ -128,10 +129,11 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
    * Because this gets the local list of regions to flush and not the set the master had,
    * there is a possibility of a race where regions may be missed.
    *
-   * @param table
+   * @param table table to flush
+   * @param family column family within a table
    * @return Subprocedure to submit to the ProcedureMemeber.
    */
-  public Subprocedure buildSubprocedure(String table) {
+  public Subprocedure buildSubprocedure(String table, String family) {
 
     // don't run the subprocedure if the parent is stop(ping)
     if (rss.isStopping() || rss.isStopped()) {
@@ -162,7 +164,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     FlushTableSubprocedurePool taskManager =
         new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
     return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
-      timeoutMillis, involvedRegions, table, taskManager);
+      timeoutMillis, involvedRegions, table, family, taskManager);
   }
 
   /**
@@ -183,8 +185,19 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
 
     @Override
     public Subprocedure buildSubprocedure(String name, byte[] data) {
+      String family = null;
+      // Currently we do not put other data except family, so it is ok to
+      // judge by length that if family was specified
+      if (data.length > 0) {
+        try {
+          HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
+          family = nsp.getValue();
+        } catch (Exception e) {
+          LOG.error("fail to get family by parsing from data", e);
+        }
+      }
       // The name of the procedure instance from the master is the table name.
-      return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
+      return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
     }
 
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
index 3085296..cd496be 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
@@ -118,6 +118,16 @@ public class TestFlushFromClient {
   }
 
   @Test
+  public void testFlushTableFamily() throws Exception {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
+      admin.flush(tableName, FAMILY_1);
+      assertFalse(getRegionInfo().stream().
+        anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
+    }
+  }
+
+  @Test
   public void testAsyncFlushTable() throws Exception {
     AsyncAdmin admin = asyncConn.getAdmin();
     admin.flush(tableName).get();
@@ -125,6 +135,15 @@ public class TestFlushFromClient {
   }
 
   @Test
+  public void testAsyncFlushTableFamily() throws Exception {
+    AsyncAdmin admin = asyncConn.getAdmin();
+    long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
+    admin.flush(tableName, FAMILY_1).get();
+    assertFalse(getRegionInfo().stream().
+      anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
+  }
+
+  @Test
   public void testFlushRegion() throws Exception {
     try (Admin admin = TEST_UTIL.getAdmin()) {
       for (HRegion r : getRegionInfo()) {
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 2ce5588..1849aad 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -64,7 +64,11 @@ module Hbase
     rescue java.lang.IllegalArgumentException
       # Unknown region. Try table.
       begin
-        @admin.flush(TableName.valueOf(name))
+        if family_bytes.nil?
+          @admin.flush(TableName.valueOf(name))
+        else
+          @admin.flush(TableName.valueOf(name), family_bytes)
+        end
       rescue java.lang.IllegalArgumentException
         # Unknown table. Try region server.
         @admin.flushRegionServer(ServerName.valueOf(name))
diff --git a/hbase-shell/src/main/ruby/shell/commands/flush.rb b/hbase-shell/src/main/ruby/shell/commands/flush.rb
index f34999c..69bcf13 100644
--- a/hbase-shell/src/main/ruby/shell/commands/flush.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/flush.rb
@@ -25,10 +25,12 @@ module Shell
 Flush all regions in passed table or pass a region row to
 flush an individual region or a region server name whose format
 is 'host,port,startcode', to flush all its regions.
-You can also flush a single column family within a region.
+You can also flush a single column family for all regions within a table,
+or for an specific region only.
 For example:
 
   hbase> flush 'TABLENAME'
+  hbase> flush 'TABLENAME','FAMILYNAME'
   hbase> flush 'REGIONNAME'
   hbase> flush 'REGIONNAME','FAMILYNAME'
   hbase> flush 'ENCODED_REGIONNAME'
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 8d0b43c..1d0980b 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -631,6 +631,11 @@ public class ThriftAdmin implements Admin {
   }
 
   @Override
+  public void flush(TableName tableName, byte[] columnFamily) {
+    throw new NotImplementedException("flush not supported in ThriftAdmin");
+  }
+
+  @Override
   public void flushRegion(byte[] regionName) {
     throw new NotImplementedException("flushRegion not supported in ThriftAdmin");