You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/08/10 09:36:26 UTC
[hbase] branch branch-2 updated: HBASE-24694 Support flush a single
column family of table (#2218)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new ebf493f HBASE-24694 Support flush a single column family of table (#2218)
ebf493f is described below
commit ebf493f07523825f0e1045091c595e3f15ede33d
Author: bsglz <18...@qq.com>
AuthorDate: Mon Aug 10 17:36:13 2020 +0800
HBASE-24694 Support flush a single column family of table (#2218)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../java/org/apache/hadoop/hbase/client/Admin.java | 10 +++++++++
.../org/apache/hadoop/hbase/client/AsyncAdmin.java | 8 +++++++
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 +++++
.../org/apache/hadoop/hbase/client/HBaseAdmin.java | 11 +++++++++-
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 12 +++++++++--
.../java/org/apache/hadoop/hbase/HConstants.java | 3 +++
.../procedure/flush/FlushTableSubprocedure.java | 25 +++++++++++++++++-----
.../flush/MasterFlushTableProcedureManager.java | 12 ++++++++++-
.../RegionServerFlushTableProcedureManager.java | 21 ++++++++++++++----
.../hadoop/hbase/client/TestFlushFromClient.java | 19 ++++++++++++++++
hbase-shell/src/main/ruby/hbase/admin.rb | 6 +++++-
hbase-shell/src/main/ruby/shell/commands/flush.rb | 4 +++-
.../hadoop/hbase/thrift2/client/ThriftAdmin.java | 5 +++++
13 files changed, 126 insertions(+), 15 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index d5483eb..b8153a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -883,6 +883,16 @@ public interface Admin extends Abortable, Closeable {
void flush(TableName tableName) throws IOException;
/**
+ * Flush the specified column family stores on all regions of the passed table.
+ * This runs as a synchronous operation.
+ *
+ * @param tableName table to flush
+ * @param columnFamily column family within a table
+ * @throws IOException if a remote or network exception occurs
+ */
+ void flush(TableName tableName, byte[] columnFamily) throws IOException;
+
+ /**
* Flush an individual region. Synchronous operation.
*
* @param regionName region to flush
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index b272e75..87e1df9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -307,6 +307,14 @@ public interface AsyncAdmin {
CompletableFuture<Void> flush(TableName tableName);
/**
+ * Flush the specified column family stores on all regions of the passed table.
+ * This runs as a synchronous operation.
+ * @param tableName table to flush
+ * @param columnFamily column family within a table
+ */
+ CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);
+
+ /**
* Flush an individual region.
* @param regionName region to flush
*/
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index c004d9f..376c5dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -245,6 +245,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
+ return wrap(rawAdmin.flush(tableName, columnFamily));
+ }
+
+ @Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return wrap(rawAdmin.flushRegion(regionName));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index a24c17c..ac78578 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -1151,12 +1151,21 @@ public class HBaseAdmin implements Admin {
@Override
public void flush(final TableName tableName) throws IOException {
+ flush(tableName, null);
+ }
+
+ @Override
+ public void flush(final TableName tableName, byte[] columnFamily) throws IOException {
checkTableExists(tableName);
if (isTableDisabled(tableName)) {
LOG.info("Table is disabled: " + tableName.getNameAsString());
return;
}
- execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>());
+ Map<String, String> props = new HashMap<>();
+ if (columnFamily != null) {
+ props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
+ }
+ execProcedure("flush-table-proc", tableName.getNameAsString(), props);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index b25b287..02cbcef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -891,9 +891,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
}
}
-
@Override
public CompletableFuture<Void> flush(TableName tableName) {
+ return flush(tableName, null);
+ }
+
+ @Override
+ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
@@ -907,8 +911,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} else if (!tableEnabled) {
future.completeExceptionally(new TableNotEnabledException(tableName));
} else {
+ Map<String, String> props = new HashMap<>();
+ if (columnFamily != null) {
+ props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
+ }
addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
- new HashMap<>()), (ret, err3) -> {
+ props), (ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 16bee93..e50f30b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -616,6 +616,9 @@ public final class HConstants {
*/
public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v");
+ /** The family str as a key in map*/
+ public static final String FAMILY_KEY_STR = "family";
+
/**
* The current version of the meta table.
* - pre-hbase 0.92. There is no META_VERSION column in the root table
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index 5c005a7..d124039 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.procedure.flush;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
@@ -28,7 +29,9 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* This flush region implementation uses the distributed procedure framework to flush
@@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure {
private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);
private final String table;
+ private final String family;
private final List<HRegion> regions;
private final FlushTableSubprocedurePool taskManager;
public FlushTableSubprocedure(ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
- List<HRegion> regions, String table,
+ List<HRegion> regions, String table, String family,
FlushTableSubprocedurePool taskManager) {
super(member, table, errorListener, wakeFrequency, timeout);
this.table = table;
+ this.family = family;
this.regions = regions;
this.taskManager = taskManager;
}
private static class RegionFlushTask implements Callable<Void> {
HRegion region;
- RegionFlushTask(HRegion region) {
+ List<byte[]> families;
+ RegionFlushTask(HRegion region, List<byte[]> families) {
this.region = region;
+ this.families = families;
}
@Override
@@ -65,7 +72,11 @@ public class FlushTableSubprocedure extends Subprocedure {
region.startRegionOperation();
try {
LOG.debug("Flush region " + region.toString() + " started...");
- region.flush(true);
+ if (families == null) {
+ region.flush(true);
+ } else {
+ region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
+ }
// TODO: flush result is not checked?
} finally {
LOG.debug("Closing region operation on " + region);
@@ -88,11 +99,15 @@ public class FlushTableSubprocedure extends Subprocedure {
throw new IllegalStateException("Attempting to flush "
+ table + " but we currently have outstanding tasks");
}
-
+ List<byte[]> families = null;
+ if (family != null) {
+ LOG.debug("About to flush family {} on all regions for table {}", family, table);
+ families = Arrays.asList(Bytes.toBytes(family));
+ }
// Add all hfiles already existing in region.
for (HRegion region : regions) {
// submit one task per region for parallelize by region.
- taskManager.submitTask(new RegionFlushTask(region));
+ taskManager.submitTask(new RegionFlushTask(region, families));
monitor.rethrowException();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index 510fbcf..5e62c42 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@@ -149,11 +151,19 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+ HBaseProtos.NameStringPair family = null;
+ for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
+ if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
+ family = nsp;
+ }
+ }
+ byte[] procArgs = family != null ? family.toByteArray() : new byte[0];
+
// Kick of the global procedure from the master coordinator to the region servers.
// We rely on the existing Distributed Procedure framework to prevent any concurrent
// procedure with the same name.
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
- new byte[0], Lists.newArrayList(regionServers));
+ procArgs, Lists.newArrayList(regionServers));
monitor.rethrowException();
if (proc == null) {
String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index ddd667f..cb5d54f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -51,6 +51,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/**
* This manager class handles flushing of the regions for table on a {@link HRegionServer}.
@@ -128,10 +129,11 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
* Because this gets the local list of regions to flush and not the set the master had,
* there is a possibility of a race where regions may be missed.
*
- * @param table
+ * @param table table to flush
+ * @param family column family within a table
* @return Subprocedure to submit to the ProcedureMemeber.
*/
- public Subprocedure buildSubprocedure(String table) {
+ public Subprocedure buildSubprocedure(String table, String family) {
// don't run the subprocedure if the parent is stop(ping)
if (rss.isStopping() || rss.isStopped()) {
@@ -162,7 +164,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
FlushTableSubprocedurePool taskManager =
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
- timeoutMillis, involvedRegions, table, taskManager);
+ timeoutMillis, involvedRegions, table, family, taskManager);
}
/**
@@ -183,8 +185,19 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
@Override
public Subprocedure buildSubprocedure(String name, byte[] data) {
+ String family = null;
+ // Currently we do not put other data except family, so it is ok to
+ // judge by length that if family was specified
+ if (data.length > 0) {
+ try {
+ HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
+ family = nsp.getValue();
+ } catch (Exception e) {
+ LOG.error("fail to get family by parsing from data", e);
+ }
+ }
// The name of the procedure instance from the master is the table name.
- return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
+ return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
index 3085296..cd496be 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
@@ -118,6 +118,16 @@ public class TestFlushFromClient {
}
@Test
+ public void testFlushTableFamily() throws Exception {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
+ admin.flush(tableName, FAMILY_1);
+ assertFalse(getRegionInfo().stream().
+ anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
+ }
+ }
+
+ @Test
public void testAsyncFlushTable() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
admin.flush(tableName).get();
@@ -125,6 +135,15 @@ public class TestFlushFromClient {
}
@Test
+ public void testAsyncFlushTableFamily() throws Exception {
+ AsyncAdmin admin = asyncConn.getAdmin();
+ long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
+ admin.flush(tableName, FAMILY_1).get();
+ assertFalse(getRegionInfo().stream().
+ anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
+ }
+
+ @Test
public void testFlushRegion() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
for (HRegion r : getRegionInfo()) {
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 2ce5588..1849aad 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -64,7 +64,11 @@ module Hbase
rescue java.lang.IllegalArgumentException
# Unknown region. Try table.
begin
- @admin.flush(TableName.valueOf(name))
+ if family_bytes.nil?
+ @admin.flush(TableName.valueOf(name))
+ else
+ @admin.flush(TableName.valueOf(name), family_bytes)
+ end
rescue java.lang.IllegalArgumentException
# Unknown table. Try region server.
@admin.flushRegionServer(ServerName.valueOf(name))
diff --git a/hbase-shell/src/main/ruby/shell/commands/flush.rb b/hbase-shell/src/main/ruby/shell/commands/flush.rb
index f34999c..69bcf13 100644
--- a/hbase-shell/src/main/ruby/shell/commands/flush.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/flush.rb
@@ -25,10 +25,12 @@ module Shell
Flush all regions in passed table or pass a region row to
flush an individual region or a region server name whose format
is 'host,port,startcode', to flush all its regions.
-You can also flush a single column family within a region.
+You can also flush a single column family for all regions within a table,
+or for an specific region only.
For example:
hbase> flush 'TABLENAME'
+ hbase> flush 'TABLENAME','FAMILYNAME'
hbase> flush 'REGIONNAME'
hbase> flush 'REGIONNAME','FAMILYNAME'
hbase> flush 'ENCODED_REGIONNAME'
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 8d0b43c..1d0980b 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -631,6 +631,11 @@ public class ThriftAdmin implements Admin {
}
@Override
+ public void flush(TableName tableName, byte[] columnFamily) {
+ throw new NotImplementedException("flush not supported in ThriftAdmin");
+ }
+
+ @Override
public void flushRegion(byte[] regionName) {
throw new NotImplementedException("flushRegion not supported in ThriftAdmin");