You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/07/22 15:48:37 UTC
[hbase] 02/09: HBASE-24389 Introduce new master rpc methods to
locate meta region through root region (#1774)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-24950
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 12bff02b2af3b5d919326b03613066837677154b
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Jun 27 15:47:51 2020 +0800
HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)
Signed-off-by: stack <st...@apache.org>
---
.../apache/hadoop/hbase/CatalogFamilyFormat.java | 30 +
.../hadoop/hbase/ClientMetaTableAccessor.java | 29 +-
.../client/AbstractAsyncTableRegionLocator.java | 310 +++++++++
.../hadoop/hbase/client/AsyncConnectionImpl.java | 4 +-
.../hbase/client/AsyncMetaRegionLocator.java | 146 -----
.../hbase/client/AsyncMetaTableRegionLocator.java | 155 +++++
.../hbase/client/AsyncNonMetaRegionLocator.java | 725 ---------------------
.../client/AsyncNonMetaTableRegionLocator.java | 206 ++++++
.../hadoop/hbase/client/AsyncRegionLocator.java | 170 +++--
.../hbase/client/AsyncRegionLocatorHelper.java | 42 +-
.../hbase/client/AsyncTableRegionLocator.java | 4 +-
.../hbase/client/AsyncTableRegionLocatorImpl.java | 11 +-
.../hadoop/hbase/client/ConnectionRegistry.java | 6 -
.../hadoop/hbase/client/ConnectionUtils.java | 18 -
.../apache/hadoop/hbase/client/MasterRegistry.java | 2 +-
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 47 +-
.../hadoop/hbase/client/RegionLocateType.java | 5 +-
.../hbase/client/TableRegionLocationCache.java | 226 +++++++
.../hadoop/hbase/client/ZKConnectionRegistry.java | 10 +-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 27 +
.../hbase/client/DoNothingConnectionRegistry.java | 6 -
.../client/TestAsyncMetaRegionLocatorFailFast.java | 67 --
.../client/TestAsyncRegionLocatorTracing.java | 75 ++-
.../apache/hadoop/hbase/MetaCellComparator.java | 2 +-
.../src/main/protobuf/server/master/Master.proto | 35 +
.../hadoop/hbase/coprocessor/MasterObserver.java | 44 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 115 +++-
.../hadoop/hbase/master/MasterCoprocessorHost.java | 40 ++
.../hadoop/hbase/master/MasterRpcServices.java | 125 +++-
.../apache/hadoop/hbase/master/MasterServices.java | 9 +
.../hbase/master/MetaRegionLocationCache.java | 9 +-
.../hbase/master/assignment/RegionStateStore.java | 6 +-
.../hbase/master/http/MasterStatusServlet.java | 5 +-
.../master/procedure/CreateTableProcedure.java | 2 -
.../hbase/master/procedure/ProcedureSyncWait.java | 14 -
.../master/snapshot/MasterSnapshotVerifier.java | 8 +-
.../hbase/master/snapshot/TakeSnapshotHandler.java | 14 +-
.../flush/MasterFlushTableProcedureManager.java | 18 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 18 +-
.../main/resources/hbase-webapps/master/table.jsp | 34 +-
.../apache/hadoop/hbase/TestMetaTableAccessor.java | 8 -
.../apache/hadoop/hbase/TestMetaTableLocator.java | 207 ------
.../hbase/client/AbstractTestRegionLocator.java | 5 +-
.../hbase/client/DummyConnectionRegistry.java | 6 -
.../hbase/client/MetaWithReplicasTestBase.java | 9 +-
.../hbase/client/RegionReplicaTestHelper.java | 15 +-
.../client/TestAsyncAdminWithRegionReplicas.java | 5 +-
.../hbase/client/TestAsyncMetaRegionLocator.java | 21 +-
.../client/TestAsyncNonMetaRegionLocator.java | 43 +-
.../hbase/client/TestAsyncRegionAdminApi2.java | 45 +-
... => TestAsyncRegionLocatorConcurrenyLimit.java} | 18 +-
.../hbase/client/TestAsyncTableAdminApi.java | 52 +-
.../hbase/client/TestAsyncTableAdminApi3.java | 24 +-
.../hbase/client/TestAsyncTableLocatePrefetch.java | 14 +-
.../hbase/client/TestAsyncTableRSCrashPublish.java | 3 +-
.../client/TestAsyncTableUseMetaReplicas.java | 4 +-
...estCatalogReplicaLoadBalanceSimpleSelector.java | 13 +-
.../hadoop/hbase/client/TestMasterRegistry.java | 2 +-
.../hbase/client/TestMetaRegionLocationCache.java | 39 +-
.../TestMetaWithReplicasShutdownHandling.java | 15 +-
.../hadoop/hbase/client/TestReplicasClient.java | 14 +-
.../hbase/client/TestZKConnectionRegistry.java | 3 +-
.../hbase/master/MockNoopMasterServices.java | 6 +
.../hadoop/hbase/master/TestMasterFailover.java | 53 +-
.../master/TestMetaAssignmentWithStopMaster.java | 16 +-
.../hbase/master/TestMetaShutdownHandler.java | 12 +-
.../master/assignment/TestRegionReplicaSplit.java | 5 +-
.../TestCompactionLifeCycleTracker.java | 4 +-
.../hbase/regionserver/TestRegionReplicas.java | 2 +-
.../regionserver/TestRegionServerNoMaster.java | 42 +-
...stRegionReplicaReplicationEndpointNoMaster.java | 58 +-
.../hadoop/hbase/util/TestHBaseFsckEncryption.java | 2 +-
.../hadoop/hbase/zookeeper/MetaTableLocator.java | 300 +--------
.../org/apache/hadoop/hbase/zookeeper/ZKUtil.java | 7 -
74 files changed, 1865 insertions(+), 2026 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
index 978198b..0178aeb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase;
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
+
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -31,8 +35,11 @@ import java.util.regex.Pattern;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -418,4 +425,27 @@ public class CatalogFamilyFormat {
}
return deleteReplicaLocations;
}
+
+ private static byte[] buildRegionLocateStartRow(TableName tableName, byte[] row,
+ RegionLocateType locateType) {
+ if (locateType.equals(RegionLocateType.BEFORE)) {
+ if (Bytes.equals(row, HConstants.EMPTY_END_ROW)) {
+ byte[] binaryTableName = tableName.getName();
+ return Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+ } else {
+ return createRegionName(tableName, row, ZEROES, false);
+ }
+ } else {
+ return createRegionName(tableName, row, NINES, false);
+ }
+ }
+
+ public static Scan createRegionLocateScan(TableName tableName, byte[] row,
+ RegionLocateType locateType, int prefetchLimit) {
+ byte[] startRow = buildRegionLocateStartRow(tableName, row, locateType);
+ byte[] stopRow = RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
+ return new Scan().withStartRow(startRow).withStopRow(stopRow, true)
+ .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(prefetchLimit)
+ .setReadType(ReadType.PREAD);
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
index ecc6573..74d2322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
@@ -164,26 +164,27 @@ public final class ClientMetaTableAccessor {
/**
* Used to get all region locations for the specific table.
- * @param metaTable
* @param tableName table we're looking for, can be null for getting all regions
* @return the list of region locations. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
- AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
+ AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName,
+ boolean excludeOfflinedSplitParents) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (locations == null || locations.isEmpty()) {
- future.complete(Collections.emptyList());
- } else {
- List<HRegionLocation> regionLocations =
- locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
- .collect(Collectors.toList());
- future.complete(regionLocations);
- }
- });
+ addListener(getTableRegionsAndLocations(metaTable, tableName, excludeOfflinedSplitParents),
+ (locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (locations == null || locations.isEmpty()) {
+ future.complete(Collections.emptyList());
+ } else {
+ List<HRegionLocation> regionLocations =
+ locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+ .collect(Collectors.toList());
+ future.complete(regionLocations);
+ }
+ });
return future;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
new file mode 100644
index 0000000..aa7d7e4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base class for locating region of a table.
+ */
+@InterfaceAudience.Private
+abstract class AbstractAsyncTableRegionLocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncTableRegionLocator.class);
+
+ protected final AsyncConnectionImpl conn;
+
+ protected final TableName tableName;
+
+ protected final int maxConcurrent;
+
+ protected final TableRegionLocationCache cache;
+
+ protected static final class LocateRequest {
+
+ final byte[] row;
+
+ final RegionLocateType locateType;
+
+ public LocateRequest(byte[] row, RegionLocateType locateType) {
+ this.row = row;
+ this.locateType = locateType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(row) ^ locateType.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != LocateRequest.class) {
+ return false;
+ }
+ LocateRequest that = (LocateRequest) obj;
+ return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
+ }
+ }
+
+ private final Set<LocateRequest> pendingRequests = new HashSet<>();
+
+ private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
+ new LinkedHashMap<>();
+
+ AbstractAsyncTableRegionLocator(AsyncConnectionImpl conn, TableName tableName,
+ int maxConcurrent, Comparator<byte[]> comparator) {
+ this.conn = conn;
+ this.tableName = tableName;
+ this.maxConcurrent = maxConcurrent;
+ this.cache = new TableRegionLocationCache(comparator, conn.getConnectionMetrics());
+ }
+
+ private boolean hasQuota() {
+ return pendingRequests.size() < maxConcurrent;
+ }
+
+ protected final Optional<LocateRequest> getCandidate() {
+ return allRequests.keySet().stream().filter(r -> !pendingRequests.contains(r)).findFirst();
+ }
+
+ void clearCompletedRequests(RegionLocations locations) {
+ for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
+ allRequests.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+ if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+ iter.remove();
+ }
+ }
+ }
+
+ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+ RegionLocations locations) {
+ if (future.isDone()) {
+ return true;
+ }
+ if (locations == null) {
+ return false;
+ }
+ HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
+ // we should at least have one location available, otherwise the request should fail and
+ // should not arrive here
+ assert loc != null;
+ boolean completed;
+ if (req.locateType.equals(RegionLocateType.BEFORE)) {
+ // for locating the row before current row, the common case is to find the previous region
+ // in reverse scan, so we check the endKey first. In general, the condition should be
+ // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
+ // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
+ // endKey.
+ byte[] endKey = loc.getRegion().getEndKey();
+ int c = Bytes.compareTo(endKey, req.row);
+ completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
+ Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
+ } else {
+ completed = loc.getRegion().containsRow(req.row);
+ }
+ if (completed) {
+ future.complete(locations);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ protected final void onLocateComplete(LocateRequest req, RegionLocations locs, Throwable error) {
+ if (error != null) {
+ LOG.warn("Failed to locate region in '" + tableName + "', row='" +
+ Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
+ }
+ Optional<LocateRequest> toSend = Optional.empty();
+ if (locs != null) {
+ RegionLocations addedLocs = cache.add(locs);
+ synchronized (this) {
+ pendingRequests.remove(req);
+ clearCompletedRequests(addedLocs);
+ // Remove a complete locate request in a synchronized block, so the table cache must have
+ // quota to send a candidate request.
+ toSend = getCandidate();
+ toSend.ifPresent(pendingRequests::add);
+ }
+ toSend.ifPresent(this::locate);
+ } else {
+ // we meet an error
+ assert error != null;
+ synchronized (this) {
+ pendingRequests.remove(req);
+ // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
+ // already retried several times
+ CompletableFuture<?> future = allRequests.remove(req);
+ if (future != null) {
+ future.completeExceptionally(error);
+ }
+ clearCompletedRequests(null);
+ // Remove a complete locate request in a synchronized block, so the table cache must have
+ // quota to send a candidate request.
+ toSend = getCandidate();
+ toSend.ifPresent(pendingRequests::add);
+ }
+ toSend.ifPresent(this::locate);
+ }
+ }
+
+ // return false means you do not need to go on, just return. And you do not need to call the above
+ // onLocateComplete either when returning false, as we will call it in this method for you, this
+ // is why we need to pass the LocateRequest as a parameter.
+ protected final boolean validateRegionLocations(RegionLocations locs, LocateRequest req) {
+ // remove HRegionLocation with null location, i.e, getServerName returns null.
+ if (locs != null) {
+ locs = locs.removeElementsWithNullLocation();
+ }
+
+ // the default region location should always be presented when fetching from meta, otherwise
+ // let's fail the request.
+ if (locs == null || locs.getDefaultRegionLocation() == null) {
+ onLocateComplete(req, null,
+ new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
+ tableName, Bytes.toStringBinary(req.row), req.locateType)));
+ return false;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ RegionInfo info = loc.getRegion();
+ if (info == null) {
+ onLocateComplete(req, null,
+ new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+ tableName, Bytes.toStringBinary(req.row), req.locateType)));
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract void locate(LocateRequest req);
+
+ abstract CompletableFuture<List<HRegionLocation>>
+ getAllRegionLocations(boolean excludeOfflinedSplitParents);
+
+ CompletableFuture<RegionLocations> getRegionLocations(byte[] row, int replicaId,
+ RegionLocateType locateType, boolean reload) {
+ if (locateType.equals(RegionLocateType.AFTER)) {
+ row = createClosestRowAfter(row);
+ locateType = RegionLocateType.CURRENT;
+ }
+ if (!reload) {
+ RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
+ }
+ }
+ CompletableFuture<RegionLocations> future;
+ LocateRequest req;
+ boolean sendRequest = false;
+ synchronized (this) {
+ // check again
+ if (!reload) {
+ RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
+ }
+ }
+ req = new LocateRequest(row, locateType);
+ future = allRequests.get(req);
+ if (future == null) {
+ future = new CompletableFuture<>();
+ allRequests.put(req, future);
+ if (hasQuota() && !pendingRequests.contains(req)) {
+ pendingRequests.add(req);
+ sendRequest = true;
+ }
+ }
+ }
+ if (sendRequest) {
+ locate(req);
+ }
+ return future;
+ }
+
+ void addToCache(RegionLocations locs) {
+ cache.add(locs);
+ }
+
+ // notice that this is not a constant time operation, do not call it on critical path.
+ int getCacheSize() {
+ return cache.size();
+ }
+
+ void clearPendingRequests() {
+ synchronized (this) {
+ if (!allRequests.isEmpty()) {
+ IOException error = new IOException("Cache cleared");
+ for (CompletableFuture<?> future : allRequests.values()) {
+ future.completeExceptionally(error);
+ }
+ }
+ }
+ }
+
+ void clearCache(ServerName serverName) {
+ cache.clearCache(serverName);
+ }
+
+ void removeLocationFromCache(HRegionLocation loc) {
+ cache.removeLocationFromCache(loc);
+ }
+
+ RegionLocations getInCache(byte[] key) {
+ return cache.get(key);
+ }
+
+ // only used for testing whether we have cached the location for a region.
+ @RestrictedApi(explanation = "Should only be called in AsyncRegionLocator",
+ link = "", allowedOnPath = ".*/AsyncRegionLocator.java")
+ RegionLocations locateInCache(byte[] row) {
+ return cache.locate(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+ RegionLocateType.CURRENT);
+ }
+
+ // only used for testing whether we have cached the location for a table.
+ @RestrictedApi(explanation = "Should only be called in AsyncRegionLocator",
+ link = "", allowedOnPath = ".*/AsyncRegionLocator.java")
+ int getNumberOfCachedRegionLocations() {
+ return cache.getNumberOfCachedRegionLocations();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 25a98ed..5c24d98 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -86,11 +86,11 @@ class AsyncConnectionImpl implements AsyncConnection {
final AsyncConnectionConfiguration connConf;
- private final User user;
+ final User user;
final ConnectionRegistry registry;
- private final int rpcTimeout;
+ final int rpcTimeout;
protected final RpcClient rpcClient;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
deleted file mode 100644
index 5ae9de6..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The asynchronous locator for meta region.
- */
-@InterfaceAudience.Private
-class AsyncMetaRegionLocator {
-
- private final ConnectionRegistry registry;
-
- private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
-
- private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
- new AtomicReference<>();
-
- AsyncMetaRegionLocator(ConnectionRegistry registry) {
- this.registry = registry;
- }
-
- /**
- * Get the region locations for meta region. If the location for the given replica is not
- * available in the cached locations, then fetch from the HBase cluster.
- * <p/>
- * The <code>replicaId</code> parameter is important. If the region replication config for meta
- * region is changed, then the cached region locations may not have the locations for new
- * replicas. If we do not check the location for the given replica, we will always return the
- * cached region locations and cause an infinite loop.
- */
- CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
- return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
- registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
- }
-
- private HRegionLocation getCacheLocation(HRegionLocation loc) {
- RegionLocations locs = metaRegionLocations.get();
- return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
- }
-
- private void addLocationToCache(HRegionLocation loc) {
- for (;;) {
- int replicaId = loc.getRegion().getReplicaId();
- RegionLocations oldLocs = metaRegionLocations.get();
- if (oldLocs == null) {
- RegionLocations newLocs = createRegionLocations(loc);
- if (metaRegionLocations.compareAndSet(null, newLocs)) {
- return;
- }
- }
- HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
- if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
- oldLoc.getServerName().equals(loc.getServerName()))) {
- return;
- }
- RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
- if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
- return;
- }
- }
- }
-
- private void removeLocationFromCache(HRegionLocation loc) {
- for (;;) {
- RegionLocations oldLocs = metaRegionLocations.get();
- if (oldLocs == null) {
- return;
- }
- HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
- if (!canUpdateOnError(loc, oldLoc)) {
- return;
- }
- RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
- if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
- return;
- }
- }
- }
-
- void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
- AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
- this::addLocationToCache, this::removeLocationFromCache, null);
- }
-
- void clearCache() {
- metaRegionLocations.set(null);
- }
-
- void clearCache(ServerName serverName) {
- for (;;) {
- RegionLocations locs = metaRegionLocations.get();
- if (locs == null) {
- return;
- }
- RegionLocations newLocs = locs.removeByServer(serverName);
- if (locs == newLocs) {
- return;
- }
- if (newLocs.isEmpty()) {
- newLocs = null;
- }
- if (metaRegionLocations.compareAndSet(locs, newLocs)) {
- return;
- }
- }
- }
-
- // only used for testing whether we have cached the location for a region.
- RegionLocations getRegionLocationInCache() {
- return metaRegionLocations.get();
- }
-
- // only used for testing whether we have cached the location for a table.
- int getNumberOfCachedRegionLocations() {
- RegionLocations locs = metaRegionLocations.get();
- return locs != null ? locs.numNonNullElements() : 0;
- }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
new file mode 100644
index 0000000..c8ffa24
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaCellComparator;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+
+/**
+ * The class for locating region for meta table.
+ */
+@InterfaceAudience.Private
+class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);
+
+ private final AtomicReference<Interface> stub = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
+ new AtomicReference<>();
+
+ AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
+ // for meta region we should use MetaCellComparator to compare the row keys
+ super(conn, tableName, maxConcurrent, (r1, r2) -> MetaCellComparator
+ .compareRows(r1, 0, r1.length, r2, 0, r2.length));
+ }
+
+ private Interface createStub(ServerName serverName) throws IOException {
+ return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
+ (int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
+ }
+
+ CompletableFuture<Interface> getStub() {
+ return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
+ CompletableFuture<Interface> future = new CompletableFuture<>();
+ addListener(conn.registry.getActiveMaster(), (addr, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else if (addr == null) {
+ future.completeExceptionally(new MasterNotRunningException(
+ "ZooKeeper available but no active master location found"));
+ } else {
+ LOG.debug("The fetched master address is {}", addr);
+ try {
+ future.complete(createStub(addr));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+
+ });
+ return future;
+ }, stub -> true, "ClientLocateMetaStub");
+ }
+
+ private void tryClearMasterStubCache(IOException error, Interface currentStub) {
+ if (ClientExceptionsUtil.isConnectionException(error) ||
+ error instanceof ServerNotRunningYetException) {
+ stub.compareAndSet(currentStub, null);
+ }
+ }
+
+ @Override
+ protected void locate(LocateRequest req) {
+ addListener(getStub(), (stub, error) -> {
+ if (error != null) {
+ onLocateComplete(req, null, error);
+ return;
+ }
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ stub.locateMetaRegion(controller,
+ LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
+ .setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
+ resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ tryClearMasterStubCache(ex, stub);
+ onLocateComplete(req, null, ex);
+ return;
+ }
+ RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+ if (validateRegionLocations(locs, req)) {
+ onLocateComplete(req, locs, null);
+ }
+ });
+ });
+ }
+
+ @Override
+ CompletableFuture<List<HRegionLocation>>
+ getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+ CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+ addListener(getStub(), (stub, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
+ .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ tryClearMasterStubCache(ex, stub);
+ future.completeExceptionally(ex);
+ return;
+ }
+ List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
+ future.complete(locs);
+ });
+ });
+ return future;
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
deleted file mode 100644
index 1c686ac..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.NINES;
-import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
-import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
-import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
-import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
-import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.hadoop.hbase.CatalogFamilyFormat;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.base.Objects;
-
-/**
- * The asynchronous locator for regions other than meta.
- */
-@InterfaceAudience.Private
-class AsyncNonMetaRegionLocator {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
-
- static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
- "hbase.client.meta.max.concurrent.locate.per.table";
-
- private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
-
- static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
-
- private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
-
- private final AsyncConnectionImpl conn;
-
- private final int maxConcurrentLocateRequestPerTable;
-
- private final int locatePrefetchLimit;
-
- // The mode tells if HedgedRead, LoadBalance mode is supported.
- // The default mode is CatalogReplicaMode.None.
- private CatalogReplicaMode metaReplicaMode;
- private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
-
- private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
-
- private static final class LocateRequest {
-
- private final byte[] row;
-
- private final RegionLocateType locateType;
-
- public LocateRequest(byte[] row, RegionLocateType locateType) {
- this.row = row;
- this.locateType = locateType;
- }
-
- @Override
- public int hashCode() {
- return Bytes.hashCode(row) ^ locateType.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || obj.getClass() != LocateRequest.class) {
- return false;
- }
- LocateRequest that = (LocateRequest) obj;
- return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
- }
- }
-
- private static final class TableCache {
-
- private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
- new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
-
- private final Set<LocateRequest> pendingRequests = new HashSet<>();
-
- private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
- new LinkedHashMap<>();
-
- public boolean hasQuota(int max) {
- return pendingRequests.size() < max;
- }
-
- public boolean isPending(LocateRequest req) {
- return pendingRequests.contains(req);
- }
-
- public void send(LocateRequest req) {
- pendingRequests.add(req);
- }
-
- public Optional<LocateRequest> getCandidate() {
- return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
- }
-
- public void clearCompletedRequests(RegionLocations locations) {
- for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
- allRequests.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
- iter.remove();
- }
- }
- }
-
- private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
- RegionLocations locations) {
- if (future.isDone()) {
- return true;
- }
- if (locations == null) {
- return false;
- }
- HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
- // we should at least have one location available, otherwise the request should fail and
- // should not arrive here
- assert loc != null;
- boolean completed;
- if (req.locateType.equals(RegionLocateType.BEFORE)) {
- // for locating the row before current row, the common case is to find the previous region
- // in reverse scan, so we check the endKey first. In general, the condition should be
- // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
- // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
- // endKey.
- byte[] endKey = loc.getRegion().getEndKey();
- int c = Bytes.compareTo(endKey, req.row);
- completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
- Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
- } else {
- completed = loc.getRegion().containsRow(req.row);
- }
- if (completed) {
- future.complete(locations);
- return true;
- } else {
- return false;
- }
- }
- }
-
- AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
- this.conn = conn;
- this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
- MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
- this.locatePrefetchLimit =
- conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
-
- // Get the region locator's meta replica mode.
- this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
- .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
-
- switch (this.metaReplicaMode) {
- case LOAD_BALANCE:
- String replicaSelectorClass = conn.getConfiguration().
- get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
- CatalogReplicaLoadBalanceSimpleSelector.class.getName());
-
- this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
- replicaSelectorClass, META_TABLE_NAME, conn, () -> {
- int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
- try {
- RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get(
- conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
- numOfReplicas = metaLocations.size();
- } catch (Exception e) {
- LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
- }
- return numOfReplicas;
- });
- break;
- case NONE:
- // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
- boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
- DEFAULT_USE_META_REPLICAS);
- if (useMetaReplicas) {
- this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
- }
- break;
- default:
- // Doing nothing
- }
- }
-
- private TableCache getTableCache(TableName tableName) {
- return computeIfAbsent(cache, tableName, TableCache::new);
- }
-
- private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
- HRegionLocation[] locArr1 = locs1.getRegionLocations();
- HRegionLocation[] locArr2 = locs2.getRegionLocations();
- if (locArr1.length != locArr2.length) {
- return false;
- }
- for (int i = 0; i < locArr1.length; i++) {
- // do not need to compare region info
- HRegionLocation loc1 = locArr1[i];
- HRegionLocation loc2 = locArr2[i];
- if (loc1 == null) {
- if (loc2 != null) {
- return false;
- }
- } else {
- if (loc2 == null) {
- return false;
- }
- if (loc1.getSeqNum() != loc2.getSeqNum()) {
- return false;
- }
- if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
- return false;
- }
- }
- }
- return true;
- }
-
- // if we successfully add the locations to cache, return the locations, otherwise return the one
- // which prevents us being added. The upper layer can use this value to complete pending requests.
- private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
- LOG.trace("Try adding {} to cache", locs);
- byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
- for (;;) {
- RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
- if (oldLocs == null) {
- return locs;
- }
- // check whether the regions are the same, this usually happens when table is split/merged, or
- // deleted and recreated again.
- RegionInfo region = locs.getRegionLocation().getRegion();
- RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
- if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
- RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
- if (isEqual(mergedLocs, oldLocs)) {
- // the merged one is the same with the old one, give up
- LOG.trace("Will not add {} to cache because the old value {} " +
- " is newer than us or has the same server name." +
- " Maybe it is updated before we replace it", locs, oldLocs);
- return oldLocs;
- }
- if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
- return mergedLocs;
- }
- } else {
- // the region is different, here we trust the one we fetched. This maybe wrong but finally
- // the upper layer can detect this and trigger removal of the wrong locations
- if (LOG.isDebugEnabled()) {
- LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
- " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
- }
- if (tableCache.cache.replace(startKey, oldLocs, locs)) {
- return locs;
- }
- }
- }
- }
-
- private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
- Throwable error) {
- if (error != null) {
- LOG.warn("Failed to locate region in '" + tableName + "', row='" +
- Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
- }
- Optional<LocateRequest> toSend = Optional.empty();
- TableCache tableCache = getTableCache(tableName);
- if (locs != null) {
- RegionLocations addedLocs = addToCache(tableCache, locs);
- synchronized (tableCache) {
- tableCache.pendingRequests.remove(req);
- tableCache.clearCompletedRequests(addedLocs);
- // Remove a complete locate request in a synchronized block, so the table cache must have
- // quota to send a candidate request.
- toSend = tableCache.getCandidate();
- toSend.ifPresent(r -> tableCache.send(r));
- }
- toSend.ifPresent(r -> locateInMeta(tableName, r));
- } else {
- // we meet an error
- assert error != null;
- synchronized (tableCache) {
- tableCache.pendingRequests.remove(req);
- // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
- // already retried several times
- CompletableFuture<?> future = tableCache.allRequests.remove(req);
- if (future != null) {
- future.completeExceptionally(error);
- }
- tableCache.clearCompletedRequests(null);
- // Remove a complete locate request in a synchronized block, so the table cache must have
- // quota to send a candidate request.
- toSend = tableCache.getCandidate();
- toSend.ifPresent(r -> tableCache.send(r));
- }
- toSend.ifPresent(r -> locateInMeta(tableName, r));
- }
- }
-
- // return whether we should stop the scan
- private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
- RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
- if (LOG.isDebugEnabled()) {
- LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
- Bytes.toStringBinary(req.row), req.locateType, locs);
- }
- // remove HRegionLocation with null location, i.e, getServerName returns null.
- if (locs != null) {
- locs = locs.removeElementsWithNullLocation();
- }
-
- // the default region location should always be presented when fetching from meta, otherwise
- // let's fail the request.
- if (locs == null || locs.getDefaultRegionLocation() == null) {
- complete(tableName, req, null,
- new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
- tableName, Bytes.toStringBinary(req.row), req.locateType)));
- return true;
- }
- HRegionLocation loc = locs.getDefaultRegionLocation();
- RegionInfo info = loc.getRegion();
- if (info == null) {
- complete(tableName, req, null,
- new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
- tableName, Bytes.toStringBinary(req.row), req.locateType)));
- return true;
- }
- if (info.isSplitParent()) {
- return false;
- }
- complete(tableName, req, locs, null);
- return true;
- }
-
- private void recordCacheHit() {
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
- }
-
- private void recordCacheMiss() {
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
- }
-
- private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
- int replicaId) {
- Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
- if (entry == null) {
- recordCacheMiss();
- return null;
- }
- RegionLocations locs = entry.getValue();
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- recordCacheMiss();
- return null;
- }
- byte[] endKey = loc.getRegion().getEndKey();
- if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
- Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
- }
- recordCacheHit();
- return locs;
- } else {
- recordCacheMiss();
- return null;
- }
- }
-
- private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
- byte[] row, int replicaId) {
- boolean isEmptyStopRow = isEmptyStopRow(row);
- Map.Entry<byte[], RegionLocations> entry =
- isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
- if (entry == null) {
- recordCacheMiss();
- return null;
- }
- RegionLocations locs = entry.getValue();
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- recordCacheMiss();
- return null;
- }
- if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
- (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
- Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
- }
- recordCacheHit();
- return locs;
- } else {
- recordCacheMiss();
- return null;
- }
- }
-
- private void locateInMeta(TableName tableName, LocateRequest req) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
- "', locateType=" + req.locateType + " in meta");
- }
- byte[] metaStartKey;
- if (req.locateType.equals(RegionLocateType.BEFORE)) {
- if (isEmptyStopRow(req.row)) {
- byte[] binaryTableName = tableName.getName();
- metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
- } else {
- metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
- }
- } else {
- metaStartKey = createRegionName(tableName, req.row, NINES, false);
- }
- byte[] metaStopKey =
- RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
- Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
- .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
- .setReadType(ReadType.PREAD);
-
- switch (this.metaReplicaMode) {
- case LOAD_BALANCE:
- int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
- if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
- // If the selector gives a non-primary meta replica region, then go with it.
- // Otherwise, just go to primary in non-hedgedRead mode.
- scan.setConsistency(Consistency.TIMELINE);
- scan.setReplicaId(metaReplicaId);
- }
- break;
- case HEDGED_READ:
- scan.setConsistency(Consistency.TIMELINE);
- break;
- default:
- // do nothing
- }
-
- conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
-
- private boolean completeNormally = false;
-
- private boolean tableNotFound = true;
-
- @Override
- public void onError(Throwable error) {
- complete(tableName, req, null, error);
- }
-
- @Override
- public void onComplete() {
- if (tableNotFound) {
- complete(tableName, req, null, new TableNotFoundException(tableName));
- } else if (!completeNormally) {
- complete(tableName, req, null, new IOException(
- "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
- }
- }
-
- @Override
- public void onNext(Result[] results, ScanController controller) {
- if (results.length == 0) {
- return;
- }
- tableNotFound = false;
- int i = 0;
- for (; i < results.length; i++) {
- if (onScanNext(tableName, req, results[i])) {
- completeNormally = true;
- controller.terminate();
- i++;
- break;
- }
- }
- // Add the remaining results into cache
- if (i < results.length) {
- TableCache tableCache = getTableCache(tableName);
- for (; i < results.length; i++) {
- RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
- if (locs == null) {
- continue;
- }
- HRegionLocation loc = locs.getDefaultRegionLocation();
- if (loc == null) {
- continue;
- }
- RegionInfo info = loc.getRegion();
- if (info == null || info.isOffline() || info.isSplitParent()) {
- continue;
- }
- RegionLocations addedLocs = addToCache(tableCache, locs);
- synchronized (tableCache) {
- tableCache.clearCompletedRequests(addedLocs);
- }
- }
- }
- }
- });
- }
-
- private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
- int replicaId, RegionLocateType locateType) {
- return locateType.equals(RegionLocateType.BEFORE)
- ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
- : locateRowInCache(tableCache, tableName, row, replicaId);
- }
-
- // locateToPrevious is true means we will use the start key of a region to locate the region
- // placed before it. Used for reverse scan. See the comment of
- // AsyncRegionLocator.getPreviousRegionLocation.
- private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
- byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
- // AFTER should be convert to CURRENT before calling this method
- assert !locateType.equals(RegionLocateType.AFTER);
- TableCache tableCache = getTableCache(tableName);
- if (!reload) {
- RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
- if (isGood(locs, replicaId)) {
- return CompletableFuture.completedFuture(locs);
- }
- }
- CompletableFuture<RegionLocations> future;
- LocateRequest req;
- boolean sendRequest = false;
- synchronized (tableCache) {
- // check again
- if (!reload) {
- RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
- if (isGood(locs, replicaId)) {
- return CompletableFuture.completedFuture(locs);
- }
- }
- req = new LocateRequest(row, locateType);
- future = tableCache.allRequests.get(req);
- if (future == null) {
- future = new CompletableFuture<>();
- tableCache.allRequests.put(req, future);
- if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
- tableCache.send(req);
- sendRequest = true;
- }
- }
- }
- if (sendRequest) {
- locateInMeta(tableName, req);
- }
- return future;
- }
-
- CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
- int replicaId, RegionLocateType locateType, boolean reload) {
- // as we know the exact row after us, so we can just create the new row, and use the same
- // algorithm to locate it.
- if (locateType.equals(RegionLocateType.AFTER)) {
- row = createClosestRowAfter(row);
- locateType = RegionLocateType.CURRENT;
- }
- return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
- }
-
- private void recordClearRegionCache() {
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
- }
-
- private void removeLocationFromCache(HRegionLocation loc) {
- TableCache tableCache = cache.get(loc.getRegion().getTable());
- if (tableCache == null) {
- return;
- }
- byte[] startKey = loc.getRegion().getStartKey();
- for (;;) {
- RegionLocations oldLocs = tableCache.cache.get(startKey);
- if (oldLocs == null) {
- return;
- }
- HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
- if (!canUpdateOnError(loc, oldLoc)) {
- return;
- }
- // Tell metaReplicaSelector that the location is stale. It will create a stale entry
- // with timestamp internally. Next time the client looks up the same location,
- // it will pick a different meta replica region.
- if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
- metaReplicaSelector.onError(loc);
- }
-
- RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
- if (newLocs == null) {
- if (tableCache.cache.remove(startKey, oldLocs)) {
- recordClearRegionCache();
- return;
- }
- } else {
- if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
- recordClearRegionCache();
- return;
- }
- }
- }
- }
-
- private void addLocationToCache(HRegionLocation loc) {
- addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
- }
-
- private HRegionLocation getCachedLocation(HRegionLocation loc) {
- TableCache tableCache = cache.get(loc.getRegion().getTable());
- if (tableCache == null) {
- return null;
- }
- RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
- return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
- }
-
- void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
- Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
- AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
- this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
- }
-
- void clearCache(TableName tableName) {
- TableCache tableCache = cache.remove(tableName);
- if (tableCache == null) {
- return;
- }
- synchronized (tableCache) {
- if (!tableCache.allRequests.isEmpty()) {
- IOException error = new IOException("Cache cleared");
- tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
- }
- }
- conn.getConnectionMetrics()
- .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
- }
-
- void clearCache() {
- cache.clear();
- }
-
- void clearCache(ServerName serverName) {
- for (TableCache tableCache : cache.values()) {
- for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
- byte[] regionName = entry.getKey();
- RegionLocations locs = entry.getValue();
- RegionLocations newLocs = locs.removeByServer(serverName);
- if (locs == newLocs) {
- continue;
- }
- if (newLocs.isEmpty()) {
- tableCache.cache.remove(regionName, locs);
- } else {
- tableCache.cache.replace(regionName, locs, newLocs);
- }
- }
- }
- }
-
- // only used for testing whether we have cached the location for a region.
- RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
- TableCache tableCache = cache.get(tableName);
- if (tableCache == null) {
- return null;
- }
- return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
- }
-
- // only used for testing whether we have cached the location for a table.
- int getNumberOfCachedRegionLocations(TableName tableName) {
- TableCache tableCache = cache.get(tableName);
- if (tableCache == null) {
- return 0;
- }
- return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
- }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
new file mode 100644
index 0000000..c03e37c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class for locating region for table other than meta.
+ */
+@InterfaceAudience.Private
+class AsyncNonMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaTableRegionLocator.class);
+
+ private final int prefetchLimit;
+
+ // The mode tells if HedgedRead, LoadBalance mode is supported.
+ // The default mode is CatalogReplicaMode.None.
+ private CatalogReplicaMode metaReplicaMode;
+
+ private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
+
+ AsyncNonMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent,
+ int prefetchLimit) {
+ super(conn, tableName, maxConcurrent, Bytes.BYTES_COMPARATOR);
+ this.prefetchLimit = prefetchLimit;
+ // Get the region locator's meta replica mode.
+ this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
+ .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
+
+ switch (this.metaReplicaMode) {
+ case LOAD_BALANCE:
+ String replicaSelectorClass = conn.getConfiguration().
+ get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
+ CatalogReplicaLoadBalanceSimpleSelector.class.getName());
+
+ this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
+ replicaSelectorClass, META_TABLE_NAME, conn, () -> {
+ int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
+ try {
+ RegionLocations metaLocations = conn.getLocator()
+ .getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
+ RegionLocateType.CURRENT, true, conn.connConf.getReadRpcTimeoutNs())
+ .get();
+ numOfReplicas = metaLocations.size();
+ } catch (Exception e) {
+ LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
+ }
+ return numOfReplicas;
+ });
+ break;
+ case NONE:
+ // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
+ boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
+ DEFAULT_USE_META_REPLICAS);
+ if (useMetaReplicas) {
+ this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
+ }
+ break;
+ default:
+ // Doing nothing
+ }
+ }
+
+ // return whether we should stop the scan
+ private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
+ Bytes.toStringBinary(req.row), req.locateType, locs);
+ }
+ if (!validateRegionLocations(locs, req)) {
+ return true;
+ }
+ if (locs.getDefaultRegionLocation().getRegion().isSplitParent()) {
+ return false;
+ }
+ onLocateComplete(req, locs, null);
+ return true;
+ }
+
+ @Override
+ protected void locate(LocateRequest req) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try locate '{}', row='{}', locateType={} in meta", tableName,
+ Bytes.toStringBinary(req.row), req.locateType);
+ }
+ Scan scan =
+ CatalogFamilyFormat.createRegionLocateScan(tableName, req.row, req.locateType, prefetchLimit);
+ switch (this.metaReplicaMode) {
+ case LOAD_BALANCE:
+ int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
+ if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
+ // If the selector gives a non-primary meta replica region, then go with it.
+ // Otherwise, just go to primary in non-hedgedRead mode.
+ scan.setConsistency(Consistency.TIMELINE);
+ scan.setReplicaId(metaReplicaId);
+ }
+ break;
+ case HEDGED_READ:
+ scan.setConsistency(Consistency.TIMELINE);
+ break;
+ default:
+ // do nothing
+ }
+ conn.getTable(TableName.META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
+
+ private boolean completeNormally = false;
+
+ private boolean tableNotFound = true;
+
+ @Override
+ public void onError(Throwable error) {
+ onLocateComplete(req, null, error);
+ }
+
+ @Override
+ public void onComplete() {
+ if (tableNotFound) {
+ onLocateComplete(req, null, new TableNotFoundException(tableName));
+ } else if (!completeNormally) {
+ onLocateComplete(req, null, new IOException(
+ "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
+ }
+ }
+
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ if (results.length == 0) {
+ return;
+ }
+ tableNotFound = false;
+ int i = 0;
+ for (; i < results.length; i++) {
+ if (onScanNext(tableName, req, results[i])) {
+ completeNormally = true;
+ controller.terminate();
+ i++;
+ break;
+ }
+ }
+ // Add the remaining results into cache
+ if (i < results.length) {
+ for (; i < results.length; i++) {
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
+ if (locs == null) {
+ continue;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ if (loc == null) {
+ continue;
+ }
+ RegionInfo info = loc.getRegion();
+ if (info == null || info.isOffline() || info.isSplitParent()) {
+ continue;
+ }
+ RegionLocations addedLocs = cache.add(locs);
+ synchronized (this) {
+ clearCompletedRequests(addedLocs);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ CompletableFuture<List<HRegionLocation>>
+ getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+ return ClientMetaTableAccessor.getTableHRegionLocations(
+ conn.getTable(TableName.META_TABLE_NAME), tableName, excludeOfflinedSplitParents);
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 716598a..7bfb7f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,20 +17,26 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.trace.TraceUtil.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import com.google.errorprone.annotations.RestrictedApi;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -43,8 +49,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -55,21 +59,40 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@InterfaceAudience.Private
class AsyncRegionLocator {
- private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class);
+ static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
+ "hbase.client.meta.max.concurrent.locate.per.table";
+
+ private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+
+ static final String MAX_CONCURRENT_LOCATE_META_REQUEST =
+ "hbase.client.meta.max.concurrent.locate";
+
+ static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
+
+ private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
- private final AsyncMetaRegionLocator metaRegionLocator;
+ private final int maxConcurrentLocateRequestPerTable;
+
+ private final int maxConcurrentLocateMetaRequest;
- private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
+ private final int locatePrefetchLimit;
- AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+ private final ConcurrentMap<TableName, AbstractAsyncTableRegionLocator> table2Locator =
+ new ConcurrentHashMap<>();
+
+ public AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
this.conn = conn;
- this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
- this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
this.retryTimer = retryTimer;
+ this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
+ MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+ this.maxConcurrentLocateMetaRequest = conn.getConfiguration()
+ .getInt(MAX_CONCURRENT_LOCATE_META_REQUEST, maxConcurrentLocateRequestPerTable);
+ this.locatePrefetchLimit =
+ conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
}
private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
@@ -127,13 +150,30 @@ class AsyncRegionLocator {
return names;
}
+ private AbstractAsyncTableRegionLocator getOrCreateTableRegionLocator(TableName tableName) {
+ return computeIfAbsent(table2Locator, tableName, () -> {
+ if (isMeta(tableName)) {
+ return new AsyncMetaTableRegionLocator(conn, tableName, maxConcurrentLocateMetaRequest);
+ } else {
+ return new AsyncNonMetaTableRegionLocator(conn, tableName,
+ maxConcurrentLocateRequestPerTable, locatePrefetchLimit);
+ }
+ });
+ }
+
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType, boolean reload) {
+ return tracedLocationFuture(() -> {
+ return getOrCreateTableRegionLocator(tableName).getRegionLocations(row, replicaId, locateType,
+ reload);
+ }, this::getRegionName, tableName, "getRegionLocations");
+ }
+
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
return tracedLocationFuture(() -> {
- CompletableFuture<RegionLocations> future = isMeta(tableName) ?
- metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
- nonMetaRegionLocator.getRegionLocations(tableName, row,
- RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+ CompletableFuture<RegionLocations> future =
+ getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
@@ -148,8 +188,7 @@ class AsyncRegionLocator {
// Change it later if the meta table can have more than one regions.
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
CompletableFuture<RegionLocations> locsFuture =
- isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) :
- nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+ getRegionLocations(tableName, row, replicaId, type, reload);
addListener(locsFuture, (locs, error) -> {
if (error != null) {
future.completeExceptionally(error);
@@ -193,30 +232,61 @@ class AsyncRegionLocator {
return getRegionLocation(tableName, row, type, false, timeoutNs);
}
- void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
- if (loc.getRegion().isMetaRegion()) {
- metaRegionLocator.updateCachedLocationOnError(loc, exception);
- } else {
- nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
+ /**
+ * Get all region locations for a table.
+ * <p/>
+ * Notice that this method will not read from cache.
+ */
+ CompletableFuture<List<HRegionLocation>> getAllRegionLocations(TableName tableName,
+ boolean excludeOfflinedSplitParents) {
+ CompletableFuture<List<HRegionLocation>> future =
+ getOrCreateTableRegionLocator(tableName).getAllRegionLocations(excludeOfflinedSplitParents);
+ addListener(future, (locs, error) -> {
+ if (error != null) {
+ return;
+ }
+ // add locations to cache
+ AbstractAsyncTableRegionLocator locator = getOrCreateTableRegionLocator(tableName);
+ Map<RegionInfo, List<HRegionLocation>> map = new HashMap<>();
+ for (HRegionLocation loc : locs) {
+ // do not cache split parent
+ if (loc.getRegion() != null && !loc.getRegion().isSplitParent()) {
+ map.computeIfAbsent(RegionReplicaUtil.getRegionInfoForDefaultReplica(loc.getRegion()),
+ k -> new ArrayList<>()).add(loc);
+ }
+ }
+ for (List<HRegionLocation> l : map.values()) {
+ locator.addToCache(new RegionLocations(l));
+ }
+ });
+ return future;
+ }
+
+ private void removeLocationFromCache(HRegionLocation loc) {
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+ if (locator == null) {
+ return;
}
+ locator.removeLocationFromCache(loc);
}
void clearCache(TableName tableName) {
TraceUtil.trace(() -> {
- LOG.debug("Clear meta cache for {}", tableName);
- if (tableName.equals(META_TABLE_NAME)) {
- metaRegionLocator.clearCache();
- } else {
- nonMetaRegionLocator.clearCache(tableName);
- }
+ AbstractAsyncTableRegionLocator locator = table2Locator.remove(tableName);
+ if (locator == null) {
+ return;
+ }
+ locator.clearPendingRequests();
+ conn.getConnectionMetrics()
+ .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(locator.getCacheSize()));
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}
void clearCache(ServerName serverName) {
TraceUtil.trace(() -> {
- LOG.debug("Clear meta cache for {}", serverName);
- metaRegionLocator.clearCache(serverName);
- nonMetaRegionLocator.clearCache(serverName);
+ for (AbstractAsyncTableRegionLocator locator : table2Locator.values()) {
+ locator.clearCache(serverName);
+ }
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
@@ -224,30 +294,48 @@ class AsyncRegionLocator {
void clearCache() {
TraceUtil.trace(() -> {
- metaRegionLocator.clearCache();
- nonMetaRegionLocator.clearCache();
+ table2Locator.clear();
}, "AsyncRegionLocator.clearCache");
}
- AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
- return nonMetaRegionLocator;
+ private void addLocationToCache(HRegionLocation loc) {
+ getOrCreateTableRegionLocator(loc.getRegion().getTable())
+ .addToCache(createRegionLocations(loc));
+ }
+
+ private HRegionLocation getCachedLocation(HRegionLocation loc) {
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+ if (locator == null) {
+ return null;
+ }
+ RegionLocations locs = locator.getInCache(loc.getRegion().getStartKey());
+ return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+ }
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+ this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
}
// only used for testing whether we have cached the location for a region.
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
- if (TableName.isMetaTableName(tableName)) {
- return metaRegionLocator.getRegionLocationInCache();
- } else {
- return nonMetaRegionLocator.getRegionLocationInCache(tableName, row);
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+ if (locator == null) {
+ return null;
}
+ return locator.locateInCache(row);
}
// only used for testing whether we have cached the location for a table.
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
int getNumberOfCachedRegionLocations(TableName tableName) {
- if (TableName.isMetaTableName(tableName)) {
- return metaRegionLocator.getNumberOfCachedRegionLocations();
- } else {
- return nonMetaRegionLocator.getNumberOfCachedRegionLocations(tableName);
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+ if (locator == null) {
+ return 0;
}
+ return locator.getNumberOfCachedRegionLocations();
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4c6cd5a..c34cc5a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import java.util.Arrays;
+import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
@@ -30,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
+
/**
* Helper class for asynchronous region locator.
*/
@@ -55,9 +58,9 @@ final class AsyncRegionLocatorHelper {
}
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
- Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
- Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
- MetricsConnection metrics) {
+ Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+ Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
+ Optional<MetricsConnection> metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -85,9 +88,7 @@ final class AsyncRegionLocatorHelper {
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
- if (metrics != null) {
- metrics.incrCacheDroppingExceptions(exception);
- }
+ metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
removeFromCache.accept(loc);
}
}
@@ -146,4 +147,33 @@ final class AsyncRegionLocatorHelper {
HRegionLocation loc = locs.getRegionLocation(replicaId);
return loc != null && loc.getServerName() != null;
}
+
+ static boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+ HRegionLocation[] locArr1 = locs1.getRegionLocations();
+ HRegionLocation[] locArr2 = locs2.getRegionLocations();
+ if (locArr1.length != locArr2.length) {
+ return false;
+ }
+ for (int i = 0; i < locArr1.length; i++) {
+ // do not need to compare region info
+ HRegionLocation loc1 = locArr1[i];
+ HRegionLocation loc2 = locArr2[i];
+ if (loc1 == null) {
+ if (loc2 != null) {
+ return false;
+ }
+ } else {
+ if (loc2 == null) {
+ return false;
+ }
+ if (loc1.getSeqNum() != loc2.getSeqNum()) {
+ return false;
+ }
+ if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index 96e3ec4..5af3283 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -104,8 +104,8 @@ public interface AsyncTableRegionLocator {
/**
* Retrieves all of the regions associated with this table.
* <p/>
- * Usually we will go to meta table directly in this method so there is no {@code reload}
- * parameter.
+ * We will go to meta table directly in this method so there is no {@code reload} parameter. So
+ * please use with caution as this could generate great load to a cluster.
* <p/>
* Notice that the location for region replicas other than the default replica are also returned.
* @return a {@link List} of all regions associated with this table.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 35bf0e0..2adf4df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -56,14 +55,8 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
- return tracedFuture(() -> {
- if (TableName.isMetaTableName(tableName)) {
- return conn.registry.getMetaRegionLocations()
- .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
- }
- return ClientMetaTableAccessor
- .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
- }, getClass().getSimpleName() + ".getAllRegionLocations");
+ return tracedFuture(() -> conn.getLocator().getAllRegionLocations(tableName, true),
+ getClass().getSimpleName() + ".getAllRegionLocations");
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
index cd22d78..569d728 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,11 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
interface ConnectionRegistry extends Closeable {
/**
- * Get the location of meta region(s).
- */
- CompletableFuture<RegionLocations> getMetaRegionLocations();
-
- /**
* Should only be called once.
* <p>
* The upper layer should store this value somewhere as it will not be change any more.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 70312aa..d74e4aa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -172,24 +172,6 @@ public final class ConnectionUtils {
return Arrays.copyOf(row, row.length + 1);
}
- /**
- * Create a row before the specified row and very close to the specified row.
- */
- static byte[] createCloseRowBefore(byte[] row) {
- if (row.length == 0) {
- return MAX_BYTE_ARRAY;
- }
- if (row[row.length - 1] == 0) {
- return Arrays.copyOf(row, row.length - 1);
- } else {
- byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
- System.arraycopy(row, 0, nextRow, 0, row.length - 1);
- nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
- System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length);
- return nextRow;
- }
- }
-
static boolean isEmptyStartRow(byte[] row) {
return Bytes.equals(row, EMPTY_START_ROW);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 9223935..6caa8d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -266,7 +266,7 @@ public class MasterRegistry implements ConnectionRegistry {
return new RegionLocations(regionLocations);
}
- @Override
+ // keep the method here just for testing compatibility
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(
() -> this
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 1cbcf10..e691bb7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -745,8 +745,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
if (TableName.isMetaTableName(tableName)) {
- return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
- .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
+ return getTableHRegionLocations(tableName).thenApply(
+ locs -> locs.stream().allMatch(loc -> loc != null && loc.getServerName() != null));
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
addListener(isTableEnabled(tableName), (enabled, error) -> {
@@ -762,7 +762,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.complete(false);
} else {
addListener(
- ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true),
(locations, error1) -> {
if (error1 != null) {
future.completeExceptionally(error1);
@@ -882,15 +882,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
- if (tableName.equals(META_TABLE_NAME)) {
- return connection.registry.getMetaRegionLocations()
- .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
- .collect(Collectors.toList()));
- } else {
- return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
- .thenApply(
- locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
- }
+ return getTableHRegionLocations(tableName).thenApply(
+ locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
}
@Override
public CompletableFuture<Void> flush(TableName tableName) {
@@ -1129,23 +1122,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
* List all region locations for the specific table.
*/
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (metaRegions == null || metaRegions.isEmpty() ||
- metaRegions.getDefaultRegionLocation() == null) {
- future.completeExceptionally(new IOException("meta region does not found"));
- } else {
- future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
- }
- });
- return future;
- } else {
- // For non-meta table, we fetch all locations by scanning hbase:meta table
- return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
- }
+ return connection.getRegionLocator(tableName).getAllRegionLocations();
}
/**
@@ -2394,9 +2371,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
// old format encodedName, should be meta region
- future = connection.registry.getMetaRegionLocations()
- .thenApply(locs -> Stream.of(locs.getRegionLocations())
- .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
+ future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+ .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
} else {
future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
regionNameOrEncodedRegionName);
@@ -2413,10 +2389,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
if (regionInfo.isMetaRegion()) {
- future = connection.registry.getMetaRegionLocations()
- .thenApply(locs -> Stream.of(locs.getRegionLocations())
- .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
- .findFirst());
+ future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+ .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
+ .findFirst());
} else {
future =
ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
index 950123c..2380fd8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -27,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* <li>{@link #AFTER} locate the region which contains the row after the given row.</li>
* </ul>
*/
-@InterfaceAudience.Private
-enum RegionLocateType {
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+public enum RegionLocateType {
BEFORE, CURRENT, AFTER
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
new file mode 100644
index 0000000..0745796
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isEqual;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The location cache for regions of a table.
+ */
+@InterfaceAudience.Private
+class TableRegionLocationCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableRegionLocationCache.class);
+
+ private final Optional<MetricsConnection> metrics;
+
+ private final ConcurrentNavigableMap<byte[], RegionLocations> cache;
+
+ TableRegionLocationCache(Comparator<byte[]> comparator, Optional<MetricsConnection> metrics) {
+ this.metrics = metrics;
+ this.cache = new ConcurrentSkipListMap<>(comparator);
+ }
+
+ private void recordCacheHit() {
+ metrics.ifPresent(MetricsConnection::incrMetaCacheHit);
+ }
+
+ private void recordCacheMiss() {
+ metrics.ifPresent(MetricsConnection::incrMetaCacheMiss);
+ }
+
+ private void recordClearRegionCache() {
+ metrics.ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
+ }
+
+ private RegionLocations locateRow(TableName tableName, byte[] row, int replicaId) {
+ Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+ if (entry == null) {
+ recordCacheMiss();
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ recordCacheMiss();
+ return null;
+ }
+ byte[] endKey = loc.getRegion().getEndKey();
+ if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+ }
+ recordCacheHit();
+ return locs;
+ } else {
+ recordCacheMiss();
+ return null;
+ }
+ }
+
+ private RegionLocations locateRowBefore(TableName tableName, byte[] row, int replicaId) {
+ boolean isEmptyStopRow = isEmptyStopRow(row);
+ Map.Entry<byte[], RegionLocations> entry =
+ isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+ if (entry == null) {
+ recordCacheMiss();
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ recordCacheMiss();
+ return null;
+ }
+ if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+ (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+ }
+ recordCacheHit();
+ return locs;
+ } else {
+ recordCacheMiss();
+ return null;
+ }
+ }
+
+ RegionLocations locate(TableName tableName, byte[] row, int replicaId,
+ RegionLocateType locateType) {
+ return locateType.equals(RegionLocateType.BEFORE) ? locateRowBefore(tableName, row, replicaId) :
+ locateRow(tableName, row, replicaId);
+ }
+
+ // if we successfully add the locations to cache, return the locations, otherwise return the one
+ // which prevents us being added. The upper layer can use this value to complete pending requests.
+ RegionLocations add(RegionLocations locs) {
+ LOG.trace("Try adding {} to cache", locs);
+ byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
+ if (oldLocs == null) {
+ return locs;
+ }
+ // check whether the regions are the same, this usually happens when table is split/merged, or
+ // deleted and recreated again.
+ RegionInfo region = locs.getRegionLocation().getRegion();
+ RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
+ if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
+ RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
+ if (isEqual(mergedLocs, oldLocs)) {
+ // the merged one is the same with the old one, give up
+ LOG.trace("Will not add {} to cache because the old value {} " +
+ " is newer than us or has the same server name." +
+ " Maybe it is updated before we replace it", locs, oldLocs);
+ return oldLocs;
+ }
+ if (cache.replace(startKey, oldLocs, mergedLocs)) {
+ return mergedLocs;
+ }
+ } else {
+ // the region is different, here we trust the one we fetched. This maybe wrong but finally
+ // the upper layer can detect this and trigger removal of the wrong locations
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
+ " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
+ }
+ if (cache.replace(startKey, oldLocs, locs)) {
+ return locs;
+ }
+ }
+ }
+ }
+
+ // notice that this is not a constant time operation, do not call it on critical path.
+ int size() {
+ return cache.size();
+ }
+
+ void clearCache(ServerName serverName) {
+ for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
+ byte[] regionName = entry.getKey();
+ RegionLocations locs = entry.getValue();
+ RegionLocations newLocs = locs.removeByServer(serverName);
+ if (locs == newLocs) {
+ continue;
+ }
+ if (newLocs.isEmpty()) {
+ cache.remove(regionName, locs);
+ } else {
+ cache.replace(regionName, locs, newLocs);
+ }
+ }
+ }
+
+ void removeLocationFromCache(HRegionLocation loc) {
+ byte[] startKey = loc.getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = cache.get(startKey);
+ if (oldLocs == null) {
+ return;
+ }
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
+ }
+ RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+ if (newLocs == null) {
+ if (cache.remove(startKey, oldLocs)) {
+ recordClearRegionCache();
+ return;
+ }
+ } else {
+ if (cache.replace(startKey, oldLocs, newLocs)) {
+ recordClearRegionCache();
+ return;
+ }
+ }
+ }
+ }
+
+ RegionLocations get(byte[] key) {
+ return cache.get(key);
+ }
+
+ // only used for testing whether we have cached the location for a table.
+ @RestrictedApi(explanation = "Should only be called in AbstractAsyncTableRegionLocator",
+ link = "", allowedOnPath = ".*/AbstractAsyncTableRegionLocator.java")
+ int getNumberOfCachedRegionLocations() {
+ return cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
index 3918dbc..bf93776 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
@@ -115,7 +115,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
- CompletableFuture<RegionLocations> future) {
+ CompletableFuture<RegionLocations> future) {
remaining.decrement();
if (remaining.intValue() > 0) {
return;
@@ -123,8 +123,8 @@ class ZKConnectionRegistry implements ConnectionRegistry {
future.complete(new RegionLocations(locs));
}
- private Pair<RegionState.State, ServerName> getStateAndServerName(
- ZooKeeperProtos.MetaRegionServer proto) {
+ private Pair<RegionState.State, ServerName>
+ getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
RegionState.State state;
if (proto.hasState()) {
state = RegionState.State.convert(proto.getState());
@@ -137,7 +137,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
- List<String> metaReplicaZNodes) {
+ List<String> metaReplicaZNodes) {
if (metaReplicaZNodes.isEmpty()) {
future.completeExceptionally(new IOException("No meta znode available"));
}
@@ -193,7 +193,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
}
- @Override
+ // keep the method here just for testing compatibility
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 329894c..b6918ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLoadStats;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
@@ -3826,6 +3827,7 @@ public final class ProtobufUtil {
.build();
}
+
public static HBaseProtos.LogRequest toBalancerRejectionRequest(int limit) {
MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest =
MasterProtos.BalancerRejectionsRequest.newBuilder().setLimit(limit).build();
@@ -3835,4 +3837,29 @@ public final class ProtobufUtil {
.build();
}
+ public static MasterProtos.RegionLocateType toProtoRegionLocateType(RegionLocateType pojo) {
+ switch (pojo) {
+ case BEFORE:
+ return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_BEFORE;
+ case CURRENT:
+ return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_CURRENT;
+ case AFTER:
+ return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_AFTER;
+ default:
+ throw new IllegalArgumentException("Unknown RegionLocateType: " + pojo);
+ }
+ }
+
+ public static RegionLocateType toRegionLocateType(MasterProtos.RegionLocateType proto) {
+ switch (proto) {
+ case REGION_LOCATE_TYPE_BEFORE:
+ return RegionLocateType.BEFORE;
+ case REGION_LOCATE_TYPE_CURRENT:
+ return RegionLocateType.CURRENT;
+ case REGION_LOCATE_TYPE_AFTER:
+ return RegionLocateType.AFTER;
+ default:
+ throw new IllegalArgumentException("Unknown proto RegionLocateType: " + proto);
+ }
+ }
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
index 4bd66877..64ded7f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,11 +32,6 @@ class DoNothingConnectionRegistry implements ConnectionRegistry {
}
@Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
public CompletableFuture<String> getClusterId() {
return CompletableFuture.completedFuture(null);
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
deleted file mode 100644
index b306500..0000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ClientTests.class, SmallTests.class })
-public class TestAsyncMetaRegionLocatorFailFast {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncMetaRegionLocatorFailFast.class);
-
- private static Configuration CONF = HBaseConfiguration.create();
-
- private static AsyncMetaRegionLocator LOCATOR;
-
- private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry {
-
- public FaultyConnectionRegistry(Configuration conf) {
- super(conf);
- }
-
- @Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error"));
- }
- }
-
- @BeforeClass
- public static void setUp() {
- LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF));
- }
-
- @Test(expected = DoNotRetryIOException.class)
- public void test() throws IOException {
- FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false));
- }
-}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
index 15b00f6..180d294 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
@@ -23,6 +23,7 @@ import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -35,6 +36,9 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -42,12 +46,22 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncRegionLocatorTracing {
@@ -60,14 +74,63 @@ public class TestAsyncRegionLocatorTracing {
private AsyncConnectionImpl conn;
- private RegionLocations locs;
+ private static RegionLocations locs;
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+ public static final class RpcClientForTest implements RpcClient {
+
+ public RpcClientForTest(Configuration configuration, String clusterId,
+ SocketAddress localAddress, MetricsConnection metrics) {
+ }
+
+ @Override
+ public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) {
+ throw new UnsupportedOperationException("should not be called");
+ }
+
+ @Override
+ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
+ return new RpcChannel() {
+
+ @Override
+ public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+ Message responsePrototype, RpcCallback<Message> done) {
+ LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
+ for (HRegionLocation loc : locs) {
+ if (loc != null) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ }
+ }
+ done.run(builder.build());
+ }
+ };
+ }
+
+ @Override
+ public void cancelConnections(ServerName sn) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean hasCellBlockSupport() {
+ return false;
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ CONF.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientForTest.class,
+ RpcClient.class);
+ }
+
@Before
public void setUp() throws IOException {
- RegionInfo metaRegionInfo = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build();
+ RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO;
locs = new RegionLocations(
new HRegionLocation(metaRegionInfo,
ServerName.valueOf("127.0.0.1", 12345, EnvironmentEdgeManager.currentTime())),
@@ -78,8 +141,9 @@ public class TestAsyncRegionLocatorTracing {
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) {
@Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return CompletableFuture.completedFuture(locs);
+ public CompletableFuture<ServerName> getActiveMaster() {
+ return CompletableFuture.completedFuture(
+ ServerName.valueOf("127.0.0.1", 12345, EnvironmentEdgeManager.currentTime()));
}
}, "test", null, UserProvider.instantiate(CONF).getCurrent());
}
@@ -104,8 +168,7 @@ public class TestAsyncRegionLocatorTracing {
@Test
public void testClearCacheServerName() {
- ServerName sn = ServerName.valueOf("127.0.0.1", 12345,
- EnvironmentEdgeManager.currentTime());
+ ServerName sn = ServerName.valueOf("127.0.0.1", 12345, EnvironmentEdgeManager.currentTime());
conn.getLocator().clearCache(sn);
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
index 4c18cfe..1620d9f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
@@ -71,7 +71,7 @@ public class MetaCellComparator extends CellComparatorImpl {
return ignoreSequenceid ? diff : Longs.compare(b.getSequenceId(), a.getSequenceId());
}
- private static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
+ public static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
int rlength) {
int leftDelimiter = Bytes.searchDelimiterIndex(left, loffset, llength, HConstants.DELIMITER);
int rightDelimiter = Bytes.searchDelimiterIndex(right, roffset, rlength, HConstants.DELIMITER);
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 3d265dd..5302d51 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -1331,6 +1331,29 @@ message GetMetaRegionLocationsResponse {
repeated RegionLocation meta_locations = 1;
}
+enum RegionLocateType {
+ REGION_LOCATE_TYPE_BEFORE =1;
+ REGION_LOCATE_TYPE_CURRENT = 2;
+ REGION_LOCATE_TYPE_AFTER = 3;
+}
+
+message LocateMetaRegionRequest {
+ required bytes row = 1;
+ required RegionLocateType locateType = 2;
+}
+
+message LocateMetaRegionResponse {
+ repeated RegionLocation meta_locations = 1;
+}
+
+message GetAllMetaRegionLocationsRequest {
+ required bool exclude_offlined_split_parents = 1;
+}
+
+message GetAllMetaRegionLocationsResponse {
+ repeated RegionLocation meta_locations = 1;
+}
+
/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for
* connection establishment.
@@ -1356,4 +1379,16 @@ service ClientMetaService {
* Get current meta replicas' region locations.
*/
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
+
+ /**
+ * Get meta region locations for a given row
+ */
+ rpc LocateMetaRegion(LocateMetaRegionRequest)
+ returns(LocateMetaRegionResponse);
+
+ /**
+ * Get all meta regions locations
+ */
+ rpc GetAllMetaRegionLocations(GetAllMetaRegionLocationsRequest)
+ returns(GetAllMetaRegionLocationsResponse);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ac35caa..8ca8972 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.RegionPlan;
@@ -1763,7 +1765,7 @@ public interface MasterObserver {
throws IOException {
}
- /*
+ /**
* Called before checking if user has permissions.
* @param ctx the coprocessor instance's environment
* @param userName the user name
@@ -1782,4 +1784,44 @@ public interface MasterObserver {
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}
+
+ /**
+ * Called before locating meta region.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param row the row key to locate
+ * @param locateType the direction of the locate operation
+ */
+ default void preLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+ RegionLocateType locateType) throws IOException {
+ }
+
+ /**
+ * Called after locating meta region.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param row the row key to locate
+ * @param locateType the direction of the locate operation
+ * @param locs the locations of the given meta region, including meta replicas if any.
+ */
+ default void postLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+ RegionLocateType locateType, List<HRegionLocation> locs) throws IOException {
+ }
+
+ /**
+ * Called before getting all locations for meta regions.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param excludeOfflinedSplitParents don't return split parents
+ */
+ default void preGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ boolean excludeOfflinedSplitParents) {
+ }
+
+ /**
+ * Called after getting all locations for meta regions.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param excludeOfflinedSplitParents don't return split parents
+ * @param locs the locations of all meta regions, including meta replicas if any.
+ */
+ default void postGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ boolean excludeOfflinedSplitParents, List<HRegionLocation> locs) {
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 961c929..171966f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
@@ -67,12 +67,14 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.PleaseRestartMasterException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerMetrics;
@@ -88,12 +90,15 @@ import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
@@ -221,7 +226,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -232,6 +236,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@@ -242,6 +247,8 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@@ -773,6 +780,27 @@ public class HMaster extends HRegionServer implements MasterServices {
return new AssignmentManager(master, masterRegion);
}
+ /**
+ * Load the meta region state from the meta region server ZNode.
+ * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
+ * @param replicaId the ID of the replica
+ * @return regionstate
+ * @throws KeeperException if a ZooKeeper operation fails
+ */
+ private static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
+ throws KeeperException {
+ RegionState regionState = null;
+ try {
+ byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
+ regionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return regionState;
+ }
+
private void tryMigrateRootTableFromZooKeeper() throws IOException, KeeperException {
// try migrate data from zookeeper
try (RegionScanner scanner =
@@ -794,7 +822,7 @@ public class HMaster extends HRegionServer implements MasterServices {
StringBuilder info = new StringBuilder("Migrating meta location:");
for (String metaReplicaNode : metaReplicaNodes) {
int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZNode(metaReplicaNode);
- RegionState state = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
+ RegionState state = getMetaRegionState(zooKeeper, replicaId);
info.append(" ").append(state);
put.setTimestamp(state.getStamp());
MetaTableAccessor.addRegionInfo(put, state.getRegion());
@@ -3922,4 +3950,85 @@ public class HMaster extends HRegionServer implements MasterServices {
public MetaLocationSyncer getMetaLocationSyncer() {
return metaLocationSyncer;
}
+
+ public RegionLocations locateMeta(byte[] row, RegionLocateType locateType) throws IOException {
+ if (locateType == RegionLocateType.AFTER) {
+ // as we know the exact row after us, so we can just create the new row, and use the same
+ // algorithm to locate it.
+ row = Arrays.copyOf(row, row.length + 1);
+ locateType = RegionLocateType.CURRENT;
+ }
+ Scan scan =
+ CatalogFamilyFormat.createRegionLocateScan(TableName.META_TABLE_NAME, row, locateType, 1);
+ try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+ boolean moreRows;
+ List<Cell> cells = new ArrayList<>();
+ do {
+ moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Result result = Result.create(cells);
+ cells.clear();
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+ if (locs == null || locs.getDefaultRegionLocation() == null) {
+ LOG.warn("No location found when locating meta region with row='{}', locateType={}",
+ Bytes.toStringBinary(row), locateType);
+ return null;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ RegionInfo info = loc.getRegion();
+ if (info == null) {
+ LOG.warn("HRegionInfo is null when locating meta region with row='{}', locateType={}",
+ Bytes.toStringBinary(row), locateType);
+ return null;
+ }
+ if (info.isSplitParent()) {
+ continue;
+ }
+ return locs;
+ } while (moreRows);
+ LOG.warn("No location available when locating meta region with row='{}', locateType={}",
+ Bytes.toStringBinary(row), locateType);
+ return null;
+ }
+ }
+
+ public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+ throws IOException {
+ Scan scan = new Scan().addFamily(HConstants.CATALOG_FAMILY);
+ List<RegionLocations> list = new ArrayList<>();
+ try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+ boolean moreRows;
+ List<Cell> cells = new ArrayList<>();
+ do {
+ moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Result result = Result.create(cells);
+ cells.clear();
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+ if (locs == null) {
+ LOG.warn("No locations in {}", result);
+ continue;
+ }
+ HRegionLocation loc = locs.getRegionLocation();
+ if (loc == null) {
+ LOG.warn("No non null location in {}", result);
+ continue;
+ }
+ RegionInfo info = loc.getRegion();
+ if (info == null) {
+ LOG.warn("No serialized RegionInfo in {}", result);
+ continue;
+ }
+ if (excludeOfflinedSplitParents && info.isSplitParent()) {
+ continue;
+ }
+ list.add(locs);
+ } while (moreRows);
+ }
+ return list;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 01d1a62..728da5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -2038,4 +2040,42 @@ public class MasterCoprocessorHost
}
});
}
+
+ public void preLocateMetaRegion(byte[] row, RegionLocateType locateType) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.preLocateMetaRegion(this, row, locateType);
+ }
+ });
+ }
+
+ public void postLocateMetaRegion(byte[] row, RegionLocateType locateType,
+ List<HRegionLocation> locs) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.postLocateMetaRegion(this, row, locateType, locs);
+ }
+ });
+ }
+
+ public void preGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetAllMetaRegionLocations(this, excludeOfflinedSplitParents);
+ }
+ });
+ }
+
+ public void postGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents,
+ List<HRegionLocation> locs) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.postGetAllMetaRegionLocations(this, excludeOfflinedSplitParents, locs);
+ }
+ });
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index e7bf96d..81ea0c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
@@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
@@ -75,7 +76,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -125,7 +125,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -210,6 +209,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaReq
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
@@ -268,6 +269,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
@@ -408,10 +411,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
-public class MasterRpcServices extends RSRpcServices implements
- MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
- LockService.BlockingInterface, HbckService.BlockingInterface,
- ClientMetaService.BlockingInterface {
+public class MasterRpcServices extends RSRpcServices implements MasterService.BlockingInterface,
+ RegionServerStatusService.BlockingInterface, LockService.BlockingInterface,
+ HbckService.BlockingInterface, ClientMetaService.BlockingInterface {
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
private static final Logger AUDITLOG =
@@ -545,18 +547,17 @@ public class MasterRpcServices extends RSRpcServices implements
@Override
protected List<BlockingServiceAndInterface> getServices() {
List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
- bssi.add(new BlockingServiceAndInterface(
- MasterService.newReflectiveBlockingService(this),
- MasterService.BlockingInterface.class));
- bssi.add(new BlockingServiceAndInterface(
- RegionServerStatusService.newReflectiveBlockingService(this),
+ bssi.add(new BlockingServiceAndInterface(MasterService.newReflectiveBlockingService(this),
+ MasterService.BlockingInterface.class));
+ bssi.add(
+ new BlockingServiceAndInterface(RegionServerStatusService.newReflectiveBlockingService(this),
RegionServerStatusService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this),
- LockService.BlockingInterface.class));
+ LockService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this),
- HbckService.BlockingInterface.class));
+ HbckService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
- ClientMetaService.BlockingInterface.class));
+ ClientMetaService.BlockingInterface.class));
bssi.addAll(super.getServices());
return bssi;
}
@@ -1718,39 +1719,31 @@ public class MasterRpcServices extends RSRpcServices implements
}
@Override
- public UnassignRegionResponse unassignRegion(RpcController controller,
- UnassignRegionRequest req) throws ServiceException {
+ public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
+ throws ServiceException {
try {
- final byte [] regionName = req.getRegion().getValue().toByteArray();
+ final byte[] regionName = req.getRegion().getValue().toByteArray();
RegionSpecifierType type = req.getRegion().getType();
UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
master.checkInitialized();
if (type != RegionSpecifierType.REGION_NAME) {
- LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
- + " actual: " + type);
+ LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME +
+ " actual: " + type);
}
- Pair<RegionInfo, ServerName> pair =
- MetaTableAccessor.getRegion(master.getConnection(), regionName);
- if (Bytes.equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName(), regionName)) {
- pair = new Pair<>(RegionInfoBuilder.FIRST_META_REGIONINFO,
- MetaTableLocator.getMetaRegionLocation(master.getZooKeeper()));
- }
- if (pair == null) {
- throw new UnknownRegionException(Bytes.toString(regionName));
+ final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
+ if (regionInfo == null) {
+ throw new UnknownRegionException(Bytes.toStringBinary(regionName));
}
-
- RegionInfo hri = pair.getFirst();
if (master.cpHost != null) {
- master.cpHost.preUnassign(hri);
+ master.cpHost.preUnassign(regionInfo);
}
- LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
+ LOG.debug(master.getClientIdAuditPrefix() + " unassign " + regionInfo.getRegionNameAsString()
+ " in current location if it is online");
- master.getAssignmentManager().unassign(hri);
+ master.getAssignmentManager().unassign(regionInfo);
if (master.cpHost != null) {
- master.cpHost.postUnassign(hri);
+ master.cpHost.postUnassign(regionInfo);
}
-
return urr;
} catch (IOException ioe) {
throw new ServiceException(ioe);
@@ -3460,4 +3453,66 @@ public class MasterRpcServices extends RSRpcServices implements
.addAllBalancerRejection(balancerRejections).build();
}
+ @Override
+ public LocateMetaRegionResponse locateMetaRegion(RpcController controller,
+ LocateMetaRegionRequest request) throws ServiceException {
+ byte[] row = request.getRow().toByteArray();
+ RegionLocateType locateType = ProtobufUtil.toRegionLocateType(request.getLocateType());
+ try {
+ master.checkServiceStarted();
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preLocateMetaRegion(row, locateType);
+ }
+ RegionLocations locs = master.locateMeta(row, locateType);
+ List<HRegionLocation> list = new ArrayList<>();
+ LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
+ if (locs != null) {
+ for (HRegionLocation loc : locs) {
+ if (loc != null) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ list.add(loc);
+ }
+ }
+ }
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postLocateMetaRegion(row, locateType, list);
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GetAllMetaRegionLocationsResponse getAllMetaRegionLocations(RpcController controller,
+ GetAllMetaRegionLocationsRequest request) throws ServiceException {
+ boolean excludeOfflinedSplitParents = request.getExcludeOfflinedSplitParents();
+ try {
+ master.checkServiceStarted();
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preGetAllMetaRegionLocations(excludeOfflinedSplitParents);
+ }
+ List<RegionLocations> locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
+ List<HRegionLocation> list = new ArrayList<>();
+ GetAllMetaRegionLocationsResponse.Builder builder =
+ GetAllMetaRegionLocationsResponse.newBuilder();
+ if (locs != null) {
+ for (RegionLocations ls : locs) {
+ for (HRegionLocation loc : ls) {
+ if (loc != null) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ list.add(loc);
+ }
+ }
+ }
+ }
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postGetAllMetaRegionLocations(excludeOfflinedSplitParents,
+ list);
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index f24ecd4..1cee59f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -577,4 +578,12 @@ public interface MasterServices extends Server {
* We need to get this in MTP to tell the syncer the new meta replica count.
*/
MetaLocationSyncer getMetaLocationSyncer();
+
+ /**
+ * Get locations for all meta regions.
+ * @param excludeOfflinedSplitParents don't return split parents
+ * @return The locations of all the meta regions
+ */
+ List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+ throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
index 07512d1..b192a67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
@@ -39,11 +39,14 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
- * A cache of meta region location metadata. Registers a listener on ZK to track changes to the
- * meta table znodes. Clients are expected to retry if the meta information is stale. This class
- * is thread-safe (a single instance of this class can be shared by multiple threads without race
+ * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta
+ * table znodes. Clients are expected to retry if the meta information is stale. This class is
+ * thread-safe (a single instance of this class can be shared by multiple threads without race
* conditions).
+ * @deprecated Now we store meta location in the local store at master side so we should get the
+ * meta location from active master instead of zk, keep it here only for compatibility.
*/
+@Deprecated
@InterfaceAudience.Private
public class MetaRegionLocationCache extends ZKListener {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 87c04da..cce7a81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -191,8 +191,8 @@ public class RegionStateStore {
final Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
MetaTableAccessor.addRegionInfo(put, regionInfo);
final StringBuilder info =
- new StringBuilder("pid=").append(pid).append(" updating hbase:meta row=")
- .append(regionInfo.getEncodedName()).append(", regionState=").append(state);
+ new StringBuilder("pid=").append(pid).append(" updating catalog row=")
+ .append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state);
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
"Open region should be on a server");
@@ -228,7 +228,7 @@ public class RegionStateStore {
}
public void mirrorMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
- throws IOException {
+ throws IOException {
try {
MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(),
state);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java
index 51790aa..3d00e49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java
@@ -27,11 +27,11 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.tmpl.master.MasterStatusTmpl;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -82,7 +82,8 @@ public class MasterStatusServlet extends HttpServlet {
}
private ServerName getMetaLocationOrNull(HMaster master) {
- return MetaTableLocator.getMetaRegionLocation(master.getZooKeeper());
+ return master.getAssignmentManager().getRegionStates()
+ .getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO).getServerName();
}
private Map<String, Integer> getFragmentationInfo(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 2313e70..30c3458 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -368,8 +368,6 @@ public class CreateTableProcedure
final TableDescriptor tableDescriptor, final List<RegionInfo> regions) throws IOException {
assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
- ProcedureSyncWait.waitMetaRegions(env);
-
// Add replicas if needed
// we need to create regions with replicaIds starting from 1
List<RegionInfo> newRegions =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 46621da..b28c95f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -222,18 +220,6 @@ public final class ProcedureSyncWait {
throw new TimeoutIOException("Timed out while waiting on " + purpose);
}
- protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
- int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
- try {
- if (MetaTableLocator.waitMetaRegionLocation(env.getMasterServices().getZooKeeper(),
- timeout) == null) {
- throw new NotAllMetaRegionsOnlineException();
- }
- } catch (InterruptedException e) {
- throw (InterruptedIOException) new InterruptedIOException().initCause(e);
- }
- }
-
protected static void waitRegionInTransition(final MasterProcedureEnv env,
final List<RegionInfo> regions) throws IOException {
final RegionStates states = env.getAssignmentManager().getRegionStates();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
index 23d0263..507058b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
@@ -21,8 +21,11 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -36,7 +39,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -159,7 +161,9 @@ public final class MasterSnapshotVerifier {
private void verifyRegions(final SnapshotManifest manifest) throws IOException {
List<RegionInfo> regions;
if (TableName.META_TABLE_NAME.equals(tableName)) {
- regions = MetaTableLocator.getMetaRegions(services.getZooKeeper());
+ regions = services.getAllMetaRegionLocations(false).stream()
+ .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+ .map(HRegionLocation::getRegion).collect(Collectors.toList());
} else {
regions = MetaTableAccessor.getTableRegions(services.getConnection(), tableName);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index 5ff8a49..108f0fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -22,6 +22,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -49,7 +51,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -197,12 +198,15 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
monitor.rethrowException();
List<Pair<RegionInfo, ServerName>> regionsAndLocations;
+
if (TableName.META_TABLE_NAME.equals(snapshotTable)) {
- regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
- server.getZooKeeper());
+ regionsAndLocations = master.getAllMetaRegionLocations(false).stream()
+ .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+ .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+ .collect(Collectors.toList());
} else {
- regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
- server.getConnection(), snapshotTable, false);
+ regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(server.getConnection(),
+ snapshotTable, false);
}
// run the snapshot
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index c6a3b92..2b28886 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -24,13 +24,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -44,7 +45,6 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -131,20 +131,16 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
// Each region server will get its own online regions for the table.
// We may still miss regions that need to be flushed.
List<Pair<RegionInfo, ServerName>> regionsAndLocations;
-
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
- master.getZooKeeper());
- } else {
- regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
- master.getConnection(), tableName, false);
+ try (RegionLocator locator =
+ master.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ regionsAndLocations = locator.getAllRegionLocations().stream()
+ .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+ .collect(Collectors.toList());
}
Set<String> regionServers = new HashSet<>(regionsAndLocations.size());
for (Pair<RegionInfo, ServerName> region : regionsAndLocations) {
if (region != null && region.getFirst() != null && region.getSecond() != null) {
- RegionInfo hri = region.getFirst();
- if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
regionServers.add(region.getSecond().toString());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index c00a8b7..ae602a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
-import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
@@ -178,7 +177,6 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -2416,8 +2414,8 @@ public class HRegionServer extends Thread implements
}
/**
- * Helper method for use in tests. Skip the region transition report when there's no master
- * around to receive it.
+ * Helper method for use in tests. Skip the region transition report when there's no master around
+ * to receive it.
*/
private boolean skipReportingTransition(final RegionStateTransitionContext context) {
final TransitionCode code = context.getCode();
@@ -2428,17 +2426,13 @@ public class HRegionServer extends Thread implements
if (code == TransitionCode.OPENED) {
Preconditions.checkArgument(hris != null && hris.length == 1);
if (hris[0].isMetaRegion()) {
- try {
- MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
- hris[0].getReplicaId(), RegionState.State.OPEN);
- } catch (KeeperException e) {
- LOG.info("Failed to update meta location", e);
- return false;
- }
+ LOG.warn(
+ "meta table location is stored in master local store, so we can not skip reporting");
+ return false;
} else {
try {
MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
- serverName, openSeqNum, masterSystemTime);
+ serverName, openSeqNum, masterSystemTime);
} catch (IOException e) {
LOG.info("Failed to update meta", e);
return false;
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
index f6753df..29913b5 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
@@ -68,7 +68,6 @@
<%@ page import="org.apache.hadoop.hbase.quotas.ThrottleSettings" %>
<%@ page import="org.apache.hadoop.hbase.util.Bytes" %>
<%@ page import="org.apache.hadoop.hbase.util.FSUtils" %>
-<%@ page import="org.apache.hadoop.hbase.zookeeper.MetaTableLocator" %>
<%@ page import="org.apache.hadoop.util.StringUtils" %>
<%@ page import="org.apache.hbase.thirdparty.com.google.protobuf.ByteString" %>
<%@ page import="org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos" %>
@@ -314,14 +313,9 @@
for (int j = 0; j < numMetaReplicas; j++) {
RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
- //If a metaLocation is null, All of its info would be empty here to be displayed.
- ServerName metaLocation = null;
- try {
- metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
- } catch (NotAllMetaRegionsOnlineException e) {
- //Region in transition state here throw a NotAllMetaRegionsOnlineException causes
- //the UI crash.
- }
+ RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+ // If a metaLocation is null, All of its info would be empty here to be displayed.
+ ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
for (int i = 0; i < 1; i++) {
//If metaLocation is null, default value below would be displayed in UI.
String hostAndPort = "";
@@ -387,14 +381,9 @@
for (int j = 0; j < numMetaReplicas; j++) {
RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
- //If a metaLocation is null, All of its info would be empty here to be displayed.
- ServerName metaLocation = null;
- try {
- metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
- } catch (NotAllMetaRegionsOnlineException e) {
- //Region in transition state here throw a NotAllMetaRegionsOnlineException causes
- //the UI crash.
- }
+ RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+ // If a metaLocation is null, All of its info would be empty here to be displayed.
+ ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
for (int i = 0; i < 1; i++) {
//If metaLocation is null, default value below would be displayed in UI.
String hostAndPort = "";
@@ -443,14 +432,9 @@
for (int j = 0; j < numMetaReplicas; j++) {
RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
- //If a metaLocation is null, All of its info would be empty here to be displayed.
- ServerName metaLocation = null;
- try {
- metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
- } catch (NotAllMetaRegionsOnlineException e) {
- //Region in transition state here throw a NotAllMetaRegionsOnlineException causes
- //the UI crash.
- }
+ RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+ // If a metaLocation is null, All of its info would be empty here to be displayed.
+ ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
for (int i = 0; i < 1; i++) {
//If metaLocation is null, default value below would be displayed in UI.
String hostAndPort = "";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 922da6f..9169c22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -243,13 +242,6 @@ public class TestMetaTableAccessor {
}
@Test
- public void testGetRegionsFromMetaTable() throws IOException, InterruptedException {
- List<RegionInfo> regions = MetaTableLocator.getMetaRegions(UTIL.getZooKeeperWatcher());
- assertTrue(regions.size() >= 1);
- assertTrue(MetaTableLocator.getMetaRegionsAndLocations(UTIL.getZooKeeperWatcher()).size() >= 1);
- }
-
- @Test
public void testGetRegion() throws IOException, InterruptedException {
final String name = this.name.getMethodName();
LOG.info("Started " + name);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
deleted file mode 100644
index 3bea0a7..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
-
-/**
- * Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}
- */
-@Category({ MiscTests.class, MediumTests.class })
-public class TestMetaTableLocator {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMetaTableLocator.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableLocator.class);
- private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
- private static final ServerName SN =
- ServerName.valueOf("example.org", 1234, EnvironmentEdgeManager.currentTime());
- private ZKWatcher watcher;
- private Abortable abortable;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // Set this down so tests run quicker
- UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
- UTIL.startMiniZKCluster();
- }
-
- @AfterClass
- public static void afterClass() throws IOException {
- UTIL.getZkCluster().shutdown();
- }
-
- @Before
- public void before() throws IOException {
- this.abortable = new Abortable() {
- @Override
- public void abort(String why, Throwable e) {
- LOG.info(why, e);
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- };
- this.watcher =
- new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
- }
-
- @After
- public void after() {
- try {
- // Clean out meta location or later tests will be confused... they presume
- // start fresh in zk.
- MetaTableLocator.deleteMetaLocation(this.watcher);
- } catch (KeeperException e) {
- LOG.warn("Unable to delete hbase:meta location", e);
- }
-
- this.watcher.close();
- }
-
- /**
- * Test normal operations
- */
- @Test
- public void testMetaLookup()
- throws IOException, InterruptedException, ServiceException, KeeperException {
- final ClientProtos.ClientService.BlockingInterface client =
- Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
-
- Mockito.when(client.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
- .thenReturn(GetResponse.newBuilder().build());
-
- assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
- for (RegionState.State state : RegionState.State.values()) {
- if (state.equals(RegionState.State.OPEN)) {
- continue;
- }
- MetaTableLocator.setMetaLocation(this.watcher, SN, state);
- assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
- assertEquals(state, MetaTableLocator.getMetaRegionState(this.watcher).getState());
- }
- MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
- assertEquals(SN, MetaTableLocator.getMetaRegionLocation(this.watcher));
- assertEquals(RegionState.State.OPEN,
- MetaTableLocator.getMetaRegionState(this.watcher).getState());
-
- MetaTableLocator.deleteMetaLocation(this.watcher);
- assertNull(MetaTableLocator.getMetaRegionState(this.watcher).getServerName());
- assertEquals(RegionState.State.OFFLINE,
- MetaTableLocator.getMetaRegionState(this.watcher).getState());
- assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
- }
-
- @Test(expected = NotAllMetaRegionsOnlineException.class)
- public void testTimeoutWaitForMeta() throws IOException, InterruptedException {
- MetaTableLocator.waitMetaRegionLocation(watcher, 100);
- }
-
- /**
- * Test waiting on meat w/ no timeout specified.
- */
- @Test
- public void testNoTimeoutWaitForMeta() throws IOException, InterruptedException, KeeperException {
- ServerName hsa = MetaTableLocator.getMetaRegionLocation(watcher);
- assertNull(hsa);
-
- // Now test waiting on meta location getting set.
- Thread t = new WaitOnMetaThread();
- startWaitAliveThenWaitItLives(t, 1);
- // Set a meta location.
- MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
- hsa = SN;
- // Join the thread... should exit shortly.
- t.join();
- // Now meta is available.
- assertTrue(MetaTableLocator.getMetaRegionLocation(watcher).equals(hsa));
- }
-
- private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
- t.start();
- UTIL.waitFor(2000, t::isAlive);
- // Wait one second.
- Threads.sleep(ms);
- assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
- }
-
- /**
- * Wait on META.
- */
- class WaitOnMetaThread extends Thread {
-
- WaitOnMetaThread() {
- super("WaitOnMeta");
- }
-
- @Override
- public void run() {
- try {
- doWaiting();
- } catch (InterruptedException e) {
- throw new RuntimeException("Failed wait", e);
- }
- LOG.info("Exiting " + getName());
- }
-
- void doWaiting() throws InterruptedException {
- try {
- for (;;) {
- if (MetaTableLocator.waitMetaRegionLocation(watcher, 10000) != null) {
- break;
- }
- }
- } catch (NotAllMetaRegionsOnlineException e) {
- // Ignore
- }
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index f14faf7..277f7bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -58,10 +58,7 @@ public abstract class AbstractTestRegionLocator {
}
UTIL.getAdmin().createTable(td, SPLIT_KEYS);
UTIL.waitTableAvailable(TABLE_NAME);
- try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
- }
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
UTIL.getAdmin().balancerSwitch(false, true);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index c9d67f4..ea1122c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
/**
@@ -32,11 +31,6 @@ public class DummyConnectionRegistry implements ConnectionRegistry {
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
@Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return null;
- }
-
- @Override
public CompletableFuture<String> getClusterId() {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
index d34b419..3b0fbe8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
@@ -34,7 +35,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.AfterClass;
import org.junit.Rule;
import org.slf4j.Logger;
@@ -65,8 +65,11 @@ public class MetaWithReplicasTestBase {
HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3);
AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
Set<ServerName> sns = new HashSet<ServerName>();
- ServerName hbaseMetaServerName =
- MetaTableLocator.getMetaRegionLocation(TEST_UTIL.getZooKeeperWatcher());
+ ServerName hbaseMetaServerName;
+ try (RegionLocator locator =
+ TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ hbaseMetaServerName = locator.getRegionLocation(HConstants.EMPTY_START_ROW).getServerName();
+ }
LOG.info("HBASE:META DEPLOY: on " + hbaseMetaServerName);
sns.add(hbaseMetaServerName);
for (int replicaId = 1; replicaId < 3; replicaId++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index abb0c11..05a1051 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -43,8 +43,7 @@ public final class RegionReplicaTestHelper {
}
// waits for all replicas to have region location
- static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtil util,
- ConnectionRegistry registry) throws IOException {
+ static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtil util) throws IOException {
Configuration conf = util.getConfiguration();
int regionReplicaCount =
util.getAdmin().getDescriptor(TableName.META_TABLE_NAME).getRegionReplication();
@@ -58,16 +57,20 @@ public final class RegionReplicaTestHelper {
@Override
public boolean evaluate() {
try {
- RegionLocations locs = registry.getMetaRegionLocations().get();
+ List<HRegionLocation> locs;
+ try (RegionLocator locator =
+ util.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locs = locator.getAllRegionLocations();
+ }
if (locs.size() < regionReplicaCount) {
return false;
}
for (int i = 0; i < regionReplicaCount; i++) {
- HRegionLocation loc = locs.getRegionLocation(i);
+ HRegionLocation loc = locs.get(i);
// Wait until the replica is served by a region server. There could be delay between
// the replica being available to the connection and region server opening it.
Optional<ServerName> rsCarryingReplica =
- getRSCarryingReplica(util, loc.getRegion().getTable(), i);
+ getRSCarryingReplica(util, loc.getRegion().getTable(), i);
if (!rsCarryingReplica.isPresent()) {
return false;
}
@@ -120,7 +123,7 @@ public final class RegionReplicaTestHelper {
interface Locator {
RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
- throws Exception;
+ throws Exception;
void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index 81dafae..13e13bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -54,10 +54,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
public static void setUpBeforeClass() throws Exception {
TestAsyncAdminBase.setUpBeforeClass();
HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
- try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
- }
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
}
private void testMoveNonDefaultReplica(TableName tableName)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 480d797..6a06f32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -44,24 +45,25 @@ public class TestAsyncMetaRegionLocator {
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
- private static ConnectionRegistry REGISTRY;
+ private static AsyncConnectionImpl CONN;
- private static AsyncMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
TEST_UTIL.waitUntilNoRegionsInTransition();
- REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+ CONN = (AsyncConnectionImpl) ConnectionFactory
+ .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
- LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
}
@AfterClass
public static void tearDown() throws Exception {
- Closeables.close(REGISTRY, true);
+ Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -71,14 +73,15 @@ public class TestAsyncMetaRegionLocator {
@Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
- throws Exception {
+ throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
- throws Exception {
- return LOCATOR.getRegionLocations(replicaId, reload).get();
+ throws Exception {
+ return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+ RegionLocateType.CURRENT, reload).get();
}
});
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 44b954e..9d9090e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -59,8 +59,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@@ -72,43 +70,34 @@ public class TestAsyncNonMetaRegionLocator {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
-
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
- private static final int META_STOREFILE_REFRESH_PERIOD = 100;
private static final int NB_SERVERS = 4;
private static int numOfMetaReplica = NB_SERVERS - 1;
+ private static byte[][] SPLIT_KEYS;
private static AsyncConnectionImpl CONN;
- private static AsyncNonMetaRegionLocator LOCATOR;
- private static ConnectionRegistry registry;
-
- private static byte[][] SPLIT_KEYS;
- private CatalogReplicaMode metaReplicaMode;
+ private AsyncRegionLocator LOCATOR;
+ private ConnectionRegistry registry;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Enable hbase:meta replication.
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
- conf.setLong("replication.source.sleepforretries", 10); // 10 ms
-
+ conf.setLong("replication.source.sleepforretries", 10); // 10 ms
TEST_UTIL.startMiniCluster(NB_SERVERS);
Admin admin = TEST_UTIL.getAdmin();
admin.balancerSwitch(false, true);
// Enable hbase:meta replication.
HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica);
- TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions(
- TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
-
- registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-
+ TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster()
+ .getRegions(TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@@ -146,12 +135,11 @@ public class TestAsyncNonMetaRegionLocator {
// Enable meta replica LoadBalance mode for this connection.
if (clientMetaReplicaMode != null) {
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
- metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
}
-
+ registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(c, registry,
registry.getClusterId().get(), null, User.getCurrent());
- LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
}
private void createSingleRegionTable() throws IOException, InterruptedException {
@@ -160,7 +148,7 @@ public class TestAsyncNonMetaRegionLocator {
}
private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
- byte[] row, RegionLocateType locateType, boolean reload) {
+ byte[] row, RegionLocateType locateType, boolean reload) {
return LOCATOR
.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
.thenApply(RegionLocations::getDefaultRegionLocation);
@@ -191,7 +179,7 @@ public class TestAsyncNonMetaRegionLocator {
}
private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
- HRegionLocation loc) {
+ HRegionLocation loc) {
RegionInfo info = loc.getRegion();
assertEquals(TABLE_NAME, info.getTable());
assertArrayEquals(startKey, info.getStartKey());
@@ -418,7 +406,7 @@ public class TestAsyncNonMetaRegionLocator {
// Testcase for HBASE-20822
@Test
public void testLocateBeforeLastRegion()
- throws IOException, InterruptedException, ExecutionException {
+ throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable();
getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc =
@@ -436,13 +424,13 @@ public class TestAsyncNonMetaRegionLocator {
@Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
- throws Exception {
+ throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
- throws Exception {
+ throws Exception {
return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
RegionLocateType.CURRENT, reload).get();
}
@@ -464,9 +452,8 @@ public class TestAsyncNonMetaRegionLocator {
public void testConcurrentUpdateCachedLocationOnError() throws Exception {
createSingleRegionTable();
HRegionLocation loc =
- getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false)
- .get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
IntStream.range(0, 100).parallel()
- .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
+ .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
index c9d47dc..7fadef3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
@@ -51,8 +51,8 @@ import org.junit.runners.Parameterized;
/**
* Class to test asynchronous region admin operations.
- * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our ten
+ * minute timeout so they were split.
*/
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
@@ -60,7 +60,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
+ HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
@Test
public void testGetRegionLocation() throws Exception {
@@ -79,13 +79,13 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
@Test
public void testSplitSwitch() throws Exception {
createTableWithDefaultConf(tableName);
- byte[][] families = {FAMILY};
+ byte[][] families = { FAMILY };
final int rows = 10000;
TestAsyncRegionAdminApi.loadData(tableName, families, rows);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations =
- ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
int originalCount = regionLocations.size();
initSplitMergeSwitch();
@@ -93,7 +93,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
try {
admin.split(tableName, Bytes.toBytes(rows / 2)).join();
} catch (Exception e) {
- //Expected
+ // Expected
}
int count = admin.getRegions(tableName).get().size();
assertTrue(originalCount == count);
@@ -111,12 +111,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
// It was ignored in TestSplitOrMergeStatus, too
public void testMergeSwitch() throws Exception {
createTableWithDefaultConf(tableName);
- byte[][] families = {FAMILY};
+ byte[][] families = { FAMILY };
TestAsyncRegionAdminApi.loadData(tableName, families, 1000);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations =
- ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
int originalCount = regionLocations.size();
initSplitMergeSwitch();
@@ -126,7 +126,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
Threads.sleep(100);
}
assertTrue("originalCount=" + originalCount + ", postSplitCount=" + postSplitCount,
- originalCount != postSplitCount);
+ originalCount != postSplitCount);
// Merge switch is off so merge should NOT succeed.
assertTrue(admin.mergeSwitch(false).get());
@@ -156,12 +156,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
@Test
public void testMergeRegions() throws Exception {
- byte[][] splitRows = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("6")};
+ byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
createTableWithDefaultConf(tableName, splitRows);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
RegionInfo regionA;
RegionInfo regionB;
RegionInfo regionC;
@@ -175,7 +175,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(2, regionLocations.size());
for (HRegionLocation rl : regionLocations) {
@@ -195,11 +195,10 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
Thread.sleep(200);
}
// merge with encoded name
- admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(),
- false).get();
+ admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(), false).get();
- regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(1, regionLocations.size());
}
@@ -233,18 +232,18 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
- splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
+ splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true,
+ Bytes.toBytes("3"));
}
- private void
- splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
- throws Exception {
+ private void splitTest(TableName tableName, int rowCount, boolean isSplitRegion,
+ byte[] splitPoint) throws Exception {
// create table
createTableWithDefaultConf(tableName);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(1, regionLocations.size());
AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
@@ -273,8 +272,8 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
int count = 0;
for (int i = 0; i < 45; i++) {
try {
- regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
count = regionLocations.size();
if (count >= 2) {
break;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
similarity index 90%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
index 690a384..c2d001a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
@@ -57,11 +57,11 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
+public class TestAsyncRegionLocatorConcurrenyLimit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+ HBaseClassTestRule.forClass(TestAsyncRegionLocatorConcurrenyLimit.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
@@ -71,7 +71,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
private static AsyncConnectionImpl CONN;
- private static AsyncNonMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
private static byte[][] SPLIT_KEYS;
@@ -90,7 +90,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Override
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
- InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+ InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
int concurrency = CONCURRENCY.incrementAndGet();
for (;;) {
@@ -109,7 +109,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
- InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+ InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
CONCURRENCY.decrementAndGet();
}
@@ -125,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+ ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
- LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.toArray(byte[][]::new);
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
@@ -142,7 +142,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
}
private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
- throws InterruptedException, ExecutionException {
+ throws InterruptedException, ExecutionException {
assertEquals(256, futures.size());
for (int i = 0; i < futures.size(); i++) {
HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index 572a1d5..42d1919 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -45,8 +45,8 @@ import org.junit.runners.Parameterized;
/**
* Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ * minute timeout so they were split.
* @see TestAsyncTableAdminApi3 Another split out from this class so each runs under ten minutes.
*/
@RunWith(Parameterized.class)
@@ -55,7 +55,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
+ HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
@Test
public void testCreateTable() throws Exception {
@@ -65,13 +65,13 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
tables = admin.listTableDescriptors().get();
assertEquals(numTables + 1, tables.size());
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
}
static TableState.State getStateFromMeta(TableName table) throws Exception {
Optional<TableState> state = ClientMetaTableAccessor
- .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
+ .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
assertTrue(state.isPresent());
return state.get().getState();
}
@@ -82,19 +82,21 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
createTableWithDefaultConf(tableName);
List<HRegionLocation> regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals("Table should have only 1 region", 1, regionLocations.size());
final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
createTableWithDefaultConf(tableName2, new byte[][] { new byte[] { 42 } });
- regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
assertEquals("Table should have only 2 region", 2, regionLocations.size());
final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName3);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), Bytes.toBytes("a"), Bytes.toBytes("z"), 3).join();
- regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
assertEquals("Table should have only 3 region", 3, regionLocations.size());
final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
@@ -111,7 +113,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
builder = TableDescriptorBuilder.newBuilder(tableName5);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join();
- regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5, true).get();
assertEquals("Table should have 16 region", 16, regionLocations.size());
}
@@ -128,7 +131,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regions = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
Iterator<HRegionLocation> hris = regions.iterator();
assertEquals(
@@ -183,7 +186,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
- regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+ regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
@@ -231,8 +234,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
- regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3)
- .get();
+ regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
@@ -296,7 +298,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
}
private void testTruncateTable(final TableName tableName, boolean preserveSplits)
- throws Exception {
+ throws Exception {
byte[][] splitKeys = new byte[2][];
splitKeys[0] = Bytes.toBytes(4);
splitKeys[1] = Bytes.toBytes(8);
@@ -337,8 +339,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
testCloneTableSchema(tableName, newTableName, true);
}
- private void testCloneTableSchema(final TableName tableName,
- final TableName newTableName, boolean preserveSplits) throws Exception {
+ private void testCloneTableSchema(final TableName tableName, final TableName newTableName,
+ boolean preserveSplits) throws Exception {
byte[][] splitKeys = new byte[2][];
splitKeys[0] = Bytes.toBytes(4);
splitKeys[1] = Bytes.toBytes(8);
@@ -349,20 +351,16 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
boolean BLOCK_CACHE = false;
// Create the table
- TableDescriptor tableDesc = TableDescriptorBuilder
- .newBuilder(tableName)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
- .setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(FAMILY_1)
- .setBlocksize(BLOCK_SIZE)
- .setBlockCacheEnabled(BLOCK_CACHE)
- .setTimeToLive(TTL)
- .build()).build();
+ TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).setBlocksize(BLOCK_SIZE)
+ .setBlockCacheEnabled(BLOCK_CACHE).setTimeToLive(TTL).build())
+ .build();
admin.createTable(tableDesc, splitKeys).join();
assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
assertTrue("Table should be created with splitKyes + 1 rows in META",
- admin.isTableAvailable(tableName).get());
+ admin.isTableAvailable(tableName).get());
// Clone & Verify
admin.cloneTableSchema(tableName, newTableName, preserveSplits).join();
@@ -377,7 +375,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
if (preserveSplits) {
assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
assertTrue("New table should be created with splitKyes + 1 rows in META",
- admin.isTableAvailable(newTableName).get());
+ admin.isTableAvailable(newTableName).get());
} else {
assertEquals(1, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
index 4a71baf..793d159 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
@@ -46,15 +46,15 @@ import org.junit.runners.Parameterized;
/**
* Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ * minute timeout so they were split.
*/
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
+ HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
@Test
public void testTableExist() throws Exception {
@@ -122,7 +122,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
assertEquals(tables.length + 1, size);
for (int i = 0, j = 0; i < tables.length && j < size; i++, j++) {
assertTrue("tableName should be equal in order",
- tableDescs.get(j).getTableName().equals(tables[i]));
+ tableDescs.get(j).getTableName().equals(tables[i]));
}
assertTrue(tableDescs.get(size - 1).getTableName().equals(TableName.META_TABLE_NAME));
@@ -166,7 +166,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
this.admin.disableTable(tableName).join();
assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
// Test that table is disabled
@@ -188,7 +188,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
assertTrue(ok);
this.admin.enableTable(tableName).join();
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
assertEquals(TableState.State.ENABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
// Test that table is enabled
@@ -230,7 +230,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
table2.get(get).get();
admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
- .forEach(t -> admin.disableTable(t).join());
+ .forEach(t -> admin.disableTable(t).join());
// Test that tables are disabled
get = new Get(row);
@@ -254,7 +254,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName2));
admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
- .forEach(t -> admin.enableTable(t).join());
+ .forEach(t -> admin.enableTable(t).join());
// Test that tables are enabled
try {
@@ -281,8 +281,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
createTableWithDefaultConf(tableName, splitKeys);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
- List<HRegionLocation> regions = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ List<HRegionLocation> regions =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
@@ -292,8 +292,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
// Enable table, use retain assignment to assign regions.
admin.enableTable(tableName).join();
- List<HRegionLocation> regions2 = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ List<HRegionLocation> regions2 =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
// Check the assignment.
assertEquals(regions.size(), regions2.size());
assertTrue(regions2.containsAll(regions));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 245d755..ff186f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
@@ -47,18 +48,19 @@ public class TestAsyncTableLocatePrefetch {
private static byte[] FAMILY = Bytes.toBytes("cf");
- private static AsyncConnection CONN;
+ private static AsyncConnectionImpl CONN;
- private static AsyncNonMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
@BeforeClass
public static void setUp() throws Exception {
- TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
+ TEST_UTIL.getConfiguration().setInt(AsyncRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
- CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
- LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+ CONN = (AsyncConnectionImpl) ConnectionFactory
+ .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
}
@AfterClass
@@ -70,7 +72,7 @@ public class TestAsyncTableLocatePrefetch {
@Test
public void test() throws InterruptedException, ExecutionException {
assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
- RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
+ RegionLocateType.CURRENT, false, TimeUnit.MINUTES.toNanos(1)).get());
// we finish the request before we adding the remaining results to cache so sleep a bit here
Thread.sleep(1000);
// confirm that the locations of all the regions have been cached.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
index 6c538f5..77ee820 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
@@ -84,8 +84,7 @@ public class TestAsyncTableRSCrashPublish {
public void test() throws IOException, ExecutionException, InterruptedException {
Configuration conf = UTIL.getHBaseCluster().getMaster().getConfiguration();
try (AsyncConnection connection = ConnectionFactory.createAsyncConnection(conf).get()) {
- AsyncNonMetaRegionLocator locator =
- ((AsyncConnectionImpl) connection).getLocator().getNonMetaRegionLocator();
+ AsyncRegionLocator locator = ((AsyncConnectionImpl) connection).getLocator();
connection.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
ServerName serverName =
locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 61bb163..6a32ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -92,9 +92,7 @@ public class TestAsyncTableUseMetaReplicas {
FailPrimaryMetaScanCp.class.getName());
UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
- try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
- }
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
index bebc843..4143d4a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
@@ -23,10 +23,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
@@ -96,8 +99,9 @@ public class TestCatalogReplicaLoadBalanceSimpleSelector {
.createSelector(replicaSelectorClass, META_TABLE_NAME, CONN, () -> {
int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
try {
- RegionLocations metaLocations = CONN.registry.getMetaRegionLocations().get
- (CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
+ List<HRegionLocation> metaLocations = CONN.getRegionLocator(TableName.META_TABLE_NAME)
+ .getRegionLocations(HConstants.EMPTY_START_ROW, true)
+ .get(CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
numOfReplicas = metaLocations.size();
} catch (Exception e) {
LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
@@ -119,8 +123,9 @@ public class TestCatalogReplicaLoadBalanceSimpleSelector {
replicaSelectorClass, META_TABLE_NAME, CONN, () -> {
int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
try {
- RegionLocations metaLocations = CONN.registry.getMetaRegionLocations().get(
- CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
+ List<HRegionLocation> metaLocations = CONN.getRegionLocator(TableName.META_TABLE_NAME)
+ .getRegionLocations(HConstants.EMPTY_START_ROW, true)
+ .get(CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
numOfReplicas = metaLocations.size();
} catch (Exception e) {
LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index e4bdff9..1044d2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -119,7 +119,7 @@ public class TestMasterRegistry {
try (MasterRegistry registry = new MasterRegistry(conf)) {
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index 2197a21..d79f8ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -49,11 +49,11 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-@Category({SmallTests.class, MasterTests.class })
+@Category({ SmallTests.class, MasterTests.class })
public class TestMetaRegionLocationCache {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
+ HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static ConnectionRegistry REGISTRY;
@@ -63,7 +63,7 @@ public class TestMetaRegionLocationCache {
TEST_UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
}
@@ -75,7 +75,7 @@ public class TestMetaRegionLocationCache {
private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Exception {
List<HRegionLocation> result = new ArrayList<>();
- for (String znode: zk.getMetaReplicaNodes()) {
+ for (String znode : zk.getMetaReplicaNodes()) {
String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode);
int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path);
RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId);
@@ -95,7 +95,7 @@ public class TestMetaRegionLocationCache {
}
}
List<HRegionLocation> metaHRLs =
- master.getMetaRegionLocationCache().getMetaRegionLocations().get();
+ master.getMetaRegionLocationCache().getMetaRegionLocations().get();
assertFalse(metaHRLs.isEmpty());
ZKWatcher zk = master.getZooKeeper();
List<String> metaZnodes = zk.getMetaReplicaNodes();
@@ -115,11 +115,13 @@ public class TestMetaRegionLocationCache {
assertEquals(actualHRLs, metaHRLs);
}
- @Test public void testInitialMetaLocations() throws Exception {
+ @Test
+ public void testInitialMetaLocations() throws Exception {
verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
}
- @Test public void testStandByMetaLocations() throws Exception {
+ @Test
+ public void testStandByMetaLocations() throws Exception {
HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
standBy.isInitialized();
verifyCachedMetaLocations(standBy);
@@ -128,16 +130,17 @@ public class TestMetaRegionLocationCache {
/*
* Shuffles the meta region replicas around the cluster and makes sure the cache is not stale.
*/
- @Test public void testMetaLocationsChange() throws Exception {
+ @Test
+ public void testMetaLocationsChange() throws Exception {
List<HRegionLocation> currentMetaLocs =
- getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
+ getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
// Move these replicas to random servers.
- for (HRegionLocation location: currentMetaLocs) {
+ for (HRegionLocation location : currentMetaLocs) {
RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
}
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
- for (JVMClusterUtil.MasterThread masterThread:
- TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
verifyCachedMetaLocations(masterThread.getMaster());
}
}
@@ -146,7 +149,8 @@ public class TestMetaRegionLocationCache {
* Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base
* znode for notifications.
*/
- @Test public void testMetaRegionLocationCache() throws Exception {
+ @Test
+ public void testMetaRegionLocationCache() throws Exception {
final String parentZnodeName = "/randomznodename";
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName);
@@ -156,7 +160,8 @@ public class TestMetaRegionLocationCache {
// some ZK activity in the background.
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
- @Override public void doAnAction() throws Exception {
+ @Override
+ public void doAnAction() throws Exception {
final String testZnode = parentZnodeName + "/child";
ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes());
ZKUtil.deleteNode(zkWatcher, testZnode);
@@ -176,8 +181,8 @@ public class TestMetaRegionLocationCache {
// Wait until the meta cache is populated.
int iters = 0;
while (iters++ < 10) {
- if (metaCache.getMetaRegionLocations().isPresent()
- && metaCache.getMetaRegionLocations().get().size() == 3) {
+ if (metaCache.getMetaRegionLocations().isPresent() &&
+ metaCache.getMetaRegionLocations().get().size() == 3) {
break;
}
Thread.sleep(1000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
index e7c872d..ba5c9d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
@@ -118,6 +118,10 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
Thread.sleep(
conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 30000) * 3);
}
+ // cache the location for all the meta regions.
+ try (RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
// Ensure all metas are not on same hbase:meta replica=0 server!
master = util.getHBaseClusterInterface().getClusterMetrics().getMasterName();
@@ -131,7 +135,6 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
util.getHBaseClusterInterface().killRegionServer(primary);
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
}
- c.clearRegionLocationCache();
}
LOG.info("Running GETs");
try (Table htable = c.getTable(TABLE)) {
@@ -150,15 +153,15 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
LOG.info("Master active!");
- c.clearRegionLocationCache();
}
}
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
LOG.info("Running GETs no replicas");
- try (Connection c = ConnectionFactory.createConnection(conf);
- Table htable = c.getTable(TABLE)) {
- Result r = htable.get(new Get(row));
- assertArrayEquals(row, r.getRow());
+ try (Connection c = ConnectionFactory.createConnection(conf)) {
+ try (Table htable = c.getTable(TABLE)) {
+ Result r = htable.get(new Get(row));
+ assertArrayEquals(r.getRow(), row);
+ }
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 4828cea..6bf61ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -210,7 +210,7 @@ public class TestReplicasClient {
// No master
LOG.info("Master is going to be stopped");
- TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
Configuration c = new Configuration(HTU.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
LOG.info("Master has stopped");
@@ -224,7 +224,9 @@ public class TestReplicasClient {
@Before
public void before() throws IOException {
- HTU.getConnection().clearRegionLocationCache();
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+ locator.clearRegionLocationCache();
+ }
try {
openRegion(hriPrimary);
} catch (Exception ignored) {
@@ -246,7 +248,6 @@ public class TestReplicasClient {
closeRegion(hriPrimary);
} catch (Exception ignored) {
}
- HTU.getConnection().clearRegionLocationCache();
}
private HRegionServer getRS() {
@@ -325,16 +326,15 @@ public class TestReplicasClient {
byte[] b1 = Bytes.toBytes("testLocations");
openRegion(hriSecondary);
- try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
- RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
- conn.clearRegionLocationCache();
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+ locator.clearRegionLocationCache();
List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
Assert.assertEquals(2, rl.size());
rl = locator.getRegionLocations(b1, false);
Assert.assertEquals(2, rl.size());
- conn.clearRegionLocationCache();
+ locator.clearRegionLocationCache();
rl = locator.getRegionLocations(b1, false);
Assert.assertEquals(2, rl.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
index 4894c52..c00dd39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
@@ -83,8 +83,7 @@ public class TestZKConnectionRegistry {
clusterId);
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getActiveMaster().get());
- RegionReplicaTestHelper
- .waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 933addf..21dd128 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
@@ -520,4 +521,9 @@ public class MockNoopMasterServices implements MasterServices {
public MetaLocationSyncer getMetaLocationSyncer() {
return null;
}
+
+ public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+ throws IOException {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index 9e0333b..d7b4eda 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ClusterMetrics;
@@ -28,12 +29,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -166,54 +164,5 @@ public class TestMasterFailover {
TEST_UTIL.shutdownMiniCluster();
}
}
-
- /**
- * Test meta in transition when master failover.
- * This test used to manipulate region state up in zk. That is not allowed any more in hbase2
- * so I removed that messing. That makes this test anemic.
- */
- @Test
- public void testMetaInTransitionWhenMasterFailover() throws Exception {
- // Start the cluster
- HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
- TEST_UTIL.startMiniCluster();
- try {
- SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
- LOG.info("Cluster started");
-
- HMaster activeMaster = cluster.getMaster();
- ServerName metaServerName = cluster.getServerHoldingMeta();
- HRegionServer hrs = cluster.getRegionServer(metaServerName);
-
- // Now kill master, meta should remain on rs, where we placed it before.
- LOG.info("Aborting master");
- activeMaster.abort("test-kill");
- cluster.waitForMasterToStop(activeMaster.getServerName(), 30000);
- LOG.info("Master has aborted");
-
- // meta should remain where it was
- RegionState metaState = MetaTableLocator.getMetaRegionState(hrs.getZooKeeper());
- assertEquals("hbase:meta should be online on RS",
- metaState.getServerName(), metaServerName);
- assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
- // Start up a new master
- LOG.info("Starting up a new master");
- activeMaster = cluster.startMaster().getMaster();
- LOG.info("Waiting for master to be ready");
- cluster.waitForActiveAndReadyMaster();
- LOG.info("Master is ready");
-
- // ensure meta is still deployed on RS
- metaState = MetaTableLocator.getMetaRegionState(activeMaster.getZooKeeper());
- assertEquals("hbase:meta should be online on RS",
- metaState.getServerName(), metaServerName);
- assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
- // Done, shutdown the cluster
- } finally {
- TEST_UTIL.shutdownMiniCluster();
- }
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
index 6ad4f08..b216eb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
@@ -90,7 +91,20 @@ public class TestMetaAssignmentWithStopMaster {
}
}
- ServerName newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+ ServerName newMetaServer;
+ startTime = System.currentTimeMillis();
+ for (;;) {
+ try {
+ newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+ break;
+ } catch (IOException e) {
+ LOG.warn("failed to get all locations, retry...", e);
+ }
+ Thread.sleep(3000);
+ if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
+ fail("Wait too long for getting the new meta location");
+ }
+ }
assertTrue("The new meta server " + newMetaServer + " should be same with" +
" the old meta server " + oldMetaServer, newMetaServer.equals(oldMetaServer));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
index cf35ae2..29bafe3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
@@ -100,8 +98,6 @@ public class TestMetaShutdownHandler {
metaServerName =
regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
}
- RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
- assertEquals("Wrong state for meta!", RegionState.State.OPEN, metaState.getState());
assertNotEquals("Meta is on master!", metaServerName, master.getServerName());
HRegionServer metaRegionServer = cluster.getRegionServer(metaServerName);
@@ -129,11 +125,9 @@ public class TestMetaShutdownHandler {
assertTrue("Meta should be assigned",
regionStates.isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO));
// Now, make sure meta is registered in zk
- metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
- assertEquals("Meta should not be in transition", RegionState.State.OPEN, metaState.getState());
- assertEquals("Meta should be assigned", metaState.getServerName(),
- regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO));
- assertNotEquals("Meta should be assigned on a different server", metaState.getServerName(),
+ ServerName newMetaServerName =
+ regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ assertNotEquals("Meta should be assigned on a different server", newMetaServerName,
metaServerName);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
index f308a71..b1f5491 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -50,15 +50,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestRegionReplicaSplit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaSplit.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaSplit.class);
private static final int NB_SERVERS = 4;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
index a4a3f86..0ea4f75 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
@@ -165,7 +165,7 @@ public class TestCompactionLifeCycleTracker {
.setValue(Bytes.toBytes(i))
.build()));
}
- UTIL.getAdmin().flush(NAME);
+ UTIL.flush(NAME);
for (int i = 100; i < 200; i++) {
byte[] row = Bytes.toBytes(i);
table.put(new Put(row)
@@ -178,7 +178,7 @@ public class TestCompactionLifeCycleTracker {
.setValue(Bytes.toBytes(i))
.build()));
}
- UTIL.getAdmin().flush(NAME);
+ UTIL.flush(NAME);
}
region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
assertEquals(2, region.getStore(CF1).getStorefilesCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 61f8fe8..f7af83b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -111,7 +111,7 @@ public class TestRegionReplicas {
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
// No master
- TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
}
@AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 4260b1d..71e92fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -35,10 +34,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -90,15 +88,24 @@ public class TestRegionServerNoMaster {
}
regionName = hri.getRegionName();
- stopMasterAndAssignMeta(HTU);
+ stopMasterAndCacheMetaLocation(HTU);
}
- public static void stopMasterAndAssignMeta(HBaseTestingUtil HTU)
- throws IOException, InterruptedException {
+ public static void stopMasterAndCacheMetaLocation(HBaseTestingUtil HTU)
+ throws IOException, InterruptedException {
+ // cache meta location, so we will not go to master to lookup meta region location
+ for (JVMClusterUtil.RegionServerThread t : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
+ try (RegionLocator locator =
+ t.getRegionServer().getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
+ }
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
// Stop master
HMaster master = HTU.getHBaseCluster().getMaster();
Thread masterThread = HTU.getHBaseCluster().getMasterThread();
- ServerName masterAddr = master.getServerName();
master.stopMaster();
LOG.info("Waiting until master thread exits");
@@ -107,27 +114,6 @@ public class TestRegionServerNoMaster {
}
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = true;
- // Master is down, so is the meta. We need to assign it somewhere
- // so that regions can be assigned during the mocking phase.
- HRegionServer hrs = HTU.getHBaseCluster()
- .getLiveRegionServerThreads().get(0).getRegionServer();
- ZKWatcher zkw = hrs.getZooKeeper();
- ServerName sn = MetaTableLocator.getMetaRegionLocation(zkw);
- if (sn != null && !masterAddr.equals(sn)) {
- return;
- }
-
- ProtobufUtil.openRegion(null, hrs.getRSRpcServices(),
- hrs.getServerName(), RegionInfoBuilder.FIRST_META_REGIONINFO);
- while (true) {
- sn = MetaTableLocator.getMetaRegionLocation(zkw);
- if (sn != null && sn.equals(hrs.getServerName())
- && hrs.getOnlineRegions().containsKey(
- RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
- break;
- }
- Thread.sleep(100);
- }
}
/** Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush() */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 74de90b..7f56095 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
@@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -133,8 +131,12 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
// mock a secondary region info to open
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
+ // cache the location for meta regions
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
// No master
- TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
}
@@ -186,11 +188,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
- try (AsyncClusterConnection conn = ClusterConnectionFactory
- .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(conn, entries);
- }
+ AsyncClusterConnection conn = HTU.getAsyncConnection();
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(conn, entries);
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
@@ -216,36 +216,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
- try (AsyncClusterConnection conn = ClusterConnectionFactory
- .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
- openRegion(HTU, rs0, hriSecondary);
- // load some data to primary
- HTU.loadNumericRows(table, f, 0, 1000);
+ AsyncClusterConnection conn = HTU.getAsyncConnection();
+ openRegion(HTU, rs0, hriSecondary);
+ // load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
- Assert.assertEquals(1000, entries.size());
+ Assert.assertEquals(1000, entries.size());
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(conn, entries);
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(conn, entries);
- Region region = rs0.getRegion(hriSecondary.getEncodedName());
- HTU.verifyNumericRows(region, f, 0, 1000);
+ Region region = rs0.getRegion(hriSecondary.getEncodedName());
+ HTU.verifyNumericRows(region, f, 0, 1000);
- HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+ HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
- // move the secondary region from RS0 to RS1
- closeRegion(HTU, rs0, hriSecondary);
- openRegion(HTU, rs1, hriSecondary);
+ // move the secondary region from RS0 to RS1
+ closeRegion(HTU, rs0, hriSecondary);
+ openRegion(HTU, rs1, hriSecondary);
- // replicate the new data
- replicateUsingCallable(conn, entries);
+ // replicate the new data
+ replicateUsingCallable(conn, entries);
- region = rs1.getRegion(hriSecondary.getEncodedName());
- // verify the new data. old data may or may not be there
- HTU.verifyNumericRows(region, f, 1000, 2000);
+ region = rs1.getRegion(hriSecondary.getEncodedName());
+ // verify the new data. old data may or may not be there
+ HTU.verifyNumericRows(region, f, 1000, 2000);
- HTU.deleteNumericRows(table, f, 0, 2000);
- closeRegion(HTU, rs1, hriSecondary);
- }
+ HTU.deleteNumericRows(table, f, 0, 2000);
+ closeRegion(HTU, rs1, hriSecondary);
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
index 90fe71e..59ca67d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
@@ -129,7 +129,7 @@ public class TestHBaseFsckEncryption {
table.close();
}
// Flush it
- TEST_UTIL.getAdmin().flush(tableDescriptor.getTableName());
+ TEST_UTIL.flush(tableDescriptor.getTableName());
// Verify we have encrypted store files on disk
final List<Path> paths = findStorefilePaths(tableDescriptor.getTableName());
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index 807bf6f..82d4621 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -17,24 +17,16 @@
*/
package org.apache.hadoop.hbase.zookeeper;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
@@ -47,9 +39,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaReg
* <p/>
* Meta region location is set by <code>RegionServerServices</code>. This class doesn't use ZK
* watchers, rather accesses ZK directly.
- * <p/>
- * TODO: rewrite using RPC calls to master to find out about hbase:meta.
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we store the meta location in the local
+ * store of master, the location on zk is only a mirror of the first meta region to keep
+ * compatibility.
*/
+@Deprecated
@InterfaceAudience.Private
public final class MetaTableLocator {
private static final Logger LOG = LoggerFactory.getLogger(MetaTableLocator.class);
@@ -58,166 +52,16 @@ public final class MetaTableLocator {
}
/**
- * @param zkw ZooKeeper watcher to be used
- * @return meta table regions and their locations.
- */
- public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) {
- return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
- * Gets the meta regions and their locations for the given path and replica ID.
- *
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @return meta table regions and their locations.
- */
- public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw,
- int replicaId) {
- ServerName serverName = getMetaRegionLocation(zkw, replicaId);
- List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1);
- list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica(
- RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName));
- return list;
- }
-
- /**
- * Gets the meta regions for the given path with the default replica ID.
- *
- * @param zkw ZooKeeper watcher to be used
- * @return List of meta regions
- */
- public static List<RegionInfo> getMetaRegions(ZKWatcher zkw) {
- return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
- * Gets the meta regions for the given path and replica ID.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @return List of meta regions
- */
- public static List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) {
- List<Pair<RegionInfo, ServerName>> result;
- result = getMetaRegionsAndLocations(zkw, replicaId);
- return getListOfRegionInfos(result);
- }
-
- private static List<RegionInfo> getListOfRegionInfos(
- final List<Pair<RegionInfo, ServerName>> pairs) {
- if (pairs == null || pairs.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<RegionInfo> result = new ArrayList<>(pairs.size());
- for (Pair<RegionInfo, ServerName> pair : pairs) {
- result.add(pair.getFirst());
- }
- return result;
- }
-
- /**
- * Gets the meta region location, if available. Does not block.
- * @param zkw zookeeper connection to use
- * @return server name or null if we failed to get the data.
- */
- public static ServerName getMetaRegionLocation(final ZKWatcher zkw) {
- try {
- RegionState state = getMetaRegionState(zkw);
- return state.isOpened() ? state.getServerName() : null;
- } catch (KeeperException ke) {
- return null;
- }
- }
-
- /**
- * Gets the meta region location, if available. Does not block.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @return server name
- */
- public static ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
- try {
- RegionState state = getMetaRegionState(zkw, replicaId);
- return state.isOpened() ? state.getServerName() : null;
- } catch (KeeperException ke) {
- return null;
- }
- }
-
- /**
- * Gets the meta region location, if available, and waits for up to the specified timeout if not
- * immediately available. Given the zookeeper notification could be delayed, we will try to get
- * the latest data.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout maximum time to wait, in millis
- * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
- * if none available
- * @throws InterruptedException if interrupted while waiting
- * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
- */
- public static ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout)
- throws InterruptedException, NotAllMetaRegionsOnlineException {
- return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
- }
-
- /**
- * Gets the meta region location, if available, and waits for up to the specified timeout if not
- * immediately available. Given the zookeeper notification could be delayed, we will try to get
- * the latest data.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @param timeout maximum time to wait, in millis
- * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
- * if none available
- * @throws InterruptedException if waiting for the socket operation fails
- * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
- */
- public static ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout)
- throws InterruptedException, NotAllMetaRegionsOnlineException {
- try {
- if (ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) == -1) {
- String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. " +
- "There could be a mismatch with the one configured in the master.";
- LOG.error(errorMsg);
- throw new IllegalArgumentException(errorMsg);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
- }
- ServerName sn = blockUntilAvailable(zkw, replicaId, timeout);
-
- if (sn == null) {
- throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
- }
-
- return sn;
- }
-
- /**
- * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
- * specified server address.
- * @param zookeeper zookeeper reference
- * @param serverName The server hosting <code>hbase:meta</code>
- * @param state The region transition state
- * @throws KeeperException unexpected zookeeper exception
- */
- public static void setMetaLocation(ZKWatcher zookeeper,
- ServerName serverName, RegionState.State state) throws KeeperException {
- setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state);
- }
-
- /**
* Sets the location of <code>hbase:meta</code> in ZooKeeper to the specified server address.
* @param zookeeper reference to the {@link ZKWatcher} which also contains configuration and
- * operation
+ * operation
* @param serverName the name of the server
* @param replicaId the ID of the replica
* @param state the state of the region
* @throws KeeperException if a ZooKeeper operation fails
*/
public static void setMetaLocation(ZKWatcher zookeeper, ServerName serverName, int replicaId,
- RegionState.State state) throws KeeperException {
+ RegionState.State state) throws KeeperException {
if (serverName == null) {
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
return;
@@ -226,15 +70,13 @@ public final class MetaTableLocator {
serverName, state);
// Make the MetaRegionServer pb and then get its bytes and save this as
// the znode content.
- MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
- .setServer(ProtobufUtil.toServerName(serverName))
- .setRpcVersion(HConstants.RPC_CURRENT_VERSION)
- .setState(state.convert()).build();
+ MetaRegionServer pbrsr =
+ MetaRegionServer.newBuilder().setServer(ProtobufUtil.toServerName(serverName))
+ .setRpcVersion(HConstants.RPC_CURRENT_VERSION).setState(state.convert()).build();
byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
try {
- ZKUtil.setData(zookeeper,
- zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
- } catch(KeeperException.NoNodeException nne) {
+ ZKUtil.setData(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
+ } catch (KeeperException.NoNodeException nne) {
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
LOG.debug("hbase:meta region location doesn't exist, create it");
} else {
@@ -242,27 +84,19 @@ public final class MetaTableLocator {
", create it");
}
ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),
- data);
+ data);
}
}
/**
- * Load the meta region state from the meta server ZNode.
- */
- public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException {
- return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
* Load the meta region state from the meta region server ZNode.
- *
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
* @param replicaId the ID of the replica
* @return regionstate
* @throws KeeperException if a ZooKeeper operation fails
*/
public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
- throws KeeperException {
+ throws KeeperException {
RegionState regionState = null;
try {
byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
@@ -274,110 +108,4 @@ public final class MetaTableLocator {
}
return regionState;
}
-
- /**
- * Deletes the location of <code>hbase:meta</code> in ZooKeeper.
- * @param zookeeper zookeeper reference
- * @throws KeeperException unexpected zookeeper exception
- */
- public static void deleteMetaLocation(ZKWatcher zookeeper)
- throws KeeperException {
- deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- public static void deleteMetaLocation(ZKWatcher zookeeper, int replicaId)
- throws KeeperException {
- if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
- LOG.info("Deleting hbase:meta region location in ZooKeeper");
- } else {
- LOG.info("Deleting hbase:meta for {} region location in ZooKeeper", replicaId);
- }
- try {
- // Just delete the node. Don't need any watches.
- ZKUtil.deleteNode(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId));
- } catch(KeeperException.NoNodeException nne) {
- // Has already been deleted
- }
- }
- /**
- * Wait until the primary meta region is available. Get the secondary locations as well but don't
- * block for those.
- *
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout maximum time to wait in millis
- * @param conf the {@link Configuration} to use
- * @return ServerName or null if we timed out.
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static List<ServerName> blockUntilAvailable(final ZKWatcher zkw, final long timeout,
- Configuration conf) throws InterruptedException {
- int numReplicasConfigured = 1;
-
- List<ServerName> servers = new ArrayList<>();
- // Make the blocking call first so that we do the wait to know
- // the znodes are all in place or timeout.
- ServerName server = blockUntilAvailable(zkw, timeout);
-
- if (server == null) {
- return null;
- }
-
- servers.add(server);
-
- try {
- List<String> metaReplicaNodes = zkw.getMetaReplicaNodes();
- numReplicasConfigured = metaReplicaNodes.size();
- } catch (KeeperException e) {
- LOG.warn("Got ZK exception {}", e);
- }
- for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) {
- // return all replica locations for the meta
- servers.add(getMetaRegionLocation(zkw, replicaId));
- }
- return servers;
- }
-
- /**
- * Wait until the meta region is available and is not in transition.
- * @param zkw zookeeper connection to use
- * @param timeout maximum time to wait, in millis
- * @return ServerName or null if we timed out.
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static ServerName blockUntilAvailable(final ZKWatcher zkw, final long timeout)
- throws InterruptedException {
- return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
- }
-
- /**
- * Wait until the meta region is available and is not in transition.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and constants
- * @param replicaId the ID of the replica
- * @param timeout maximum time to wait in millis
- * @return ServerName or null if we timed out.
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId,
- final long timeout) throws InterruptedException {
- if (timeout < 0) {
- throw new IllegalArgumentException();
- }
-
- if (zkw == null) {
- throw new IllegalArgumentException();
- }
-
- long startTime = EnvironmentEdgeManager.currentTime();
- ServerName sn = null;
- while (true) {
- sn = getMetaRegionLocation(zkw, replicaId);
- if (sn != null ||
- (EnvironmentEdgeManager.currentTime() - startTime) >
- timeout - HConstants.SOCKET_RETRY_WAIT_MS) {
- break;
- }
- Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
- }
- return sn;
- }
}
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index d124a9a..5bfb8bf 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1857,13 +1857,6 @@ public final class ZKUtil {
sb.append("\n ").append(child);
}
}
- sb.append("\nRegion server holding hbase:meta:");
- sb.append("\n ").append(MetaTableLocator.getMetaRegionLocation(zkw));
- int numMetaReplicas = zkw.getMetaReplicaNodes().size();
- for (int i = 1; i < numMetaReplicas; i++) {
- sb.append("\n replica" + i + ": "
- + MetaTableLocator.getMetaRegionLocation(zkw, i));
- }
sb.append("\nRegion servers:");
final List<String> rsChildrenNoWatchList =
listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);