You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/07/04 15:31:43 UTC
[hbase] 02/03: HBASE-24389 Introduce new master rpc methods to
locate meta region through root region (#1774)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-11288.splittable-meta
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit dd3e61ed54afa4c2d7764c1f8a67ebdbe6ae53f2
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Jun 27 15:47:51 2020 +0800
HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)
Signed-off-by: stack <st...@apache.org>
---
.../apache/hadoop/hbase/CatalogFamilyFormat.java | 30 +
.../hadoop/hbase/ClientMetaTableAccessor.java | 29 +-
.../client/AbstractAsyncTableRegionLocator.java | 308 ++++++++++
.../hadoop/hbase/client/AsyncConnectionImpl.java | 39 +-
.../hbase/client/AsyncMetaRegionLocator.java | 150 -----
.../hbase/client/AsyncMetaTableRegionLocator.java | 152 +++++
.../hbase/client/AsyncNonMetaRegionLocator.java | 670 ---------------------
.../client/AsyncNonMetaTableRegionLocator.java | 148 +++++
.../hadoop/hbase/client/AsyncRegionLocator.java | 191 ++++--
.../hbase/client/AsyncRegionLocatorHelper.java | 42 +-
.../hbase/client/AsyncTableRegionLocator.java | 4 +-
.../hbase/client/AsyncTableRegionLocatorImpl.java | 8 +-
.../hadoop/hbase/client/ConnectionRegistry.java | 6 -
.../hadoop/hbase/client/ConnectionUtils.java | 18 -
.../apache/hadoop/hbase/client/MasterRegistry.java | 2 +-
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 47 +-
.../hadoop/hbase/client/RegionLocateType.java | 5 +-
.../hbase/client/TableRegionLocationCache.java | 226 +++++++
.../hadoop/hbase/client/ZKConnectionRegistry.java | 33 +-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 27 +
.../hbase/client/DoNothingConnectionRegistry.java | 6 -
.../client/TestAsyncMetaRegionLocatorFailFast.java | 67 ---
.../src/main/protobuf/server/master/Master.proto | 35 ++
.../hadoop/hbase/coprocessor/MasterObserver.java | 44 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 109 +++-
.../hadoop/hbase/master/MasterCoprocessorHost.java | 40 ++
.../hadoop/hbase/master/MasterMetaBootstrap.java | 39 +-
.../hadoop/hbase/master/MasterRpcServices.java | 128 ++--
.../apache/hadoop/hbase/master/MasterServices.java | 9 +
.../hadoop/hbase/master/MasterStatusServlet.java | 5 +-
.../hbase/master/MetaRegionLocationCache.java | 9 +-
.../hbase/master/assignment/RegionStateStore.java | 6 +-
.../master/procedure/CreateTableProcedure.java | 2 -
.../hbase/master/procedure/ProcedureSyncWait.java | 14 -
.../master/snapshot/MasterSnapshotVerifier.java | 8 +-
.../hbase/master/snapshot/TakeSnapshotHandler.java | 14 +-
.../flush/MasterFlushTableProcedureManager.java | 18 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 4 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 18 +-
.../main/resources/hbase-webapps/master/table.jsp | 10 +-
.../apache/hadoop/hbase/TestMetaTableAccessor.java | 10 +-
.../apache/hadoop/hbase/TestMetaTableLocator.java | 206 -------
.../hbase/client/AbstractTestRegionLocator.java | 5 +-
.../hbase/client/DummyConnectionRegistry.java | 6 -
.../hbase/client/MetaWithReplicasTestBase.java | 9 +-
.../hbase/client/RegionReplicaTestHelper.java | 24 +-
.../client/TestAsyncAdminWithRegionReplicas.java | 8 +-
.../hbase/client/TestAsyncMetaRegionLocator.java | 24 +-
.../client/TestAsyncNonMetaRegionLocator.java | 28 +-
.../hbase/client/TestAsyncRegionAdminApi2.java | 45 +-
... => TestAsyncRegionLocatorConcurrenyLimit.java} | 23 +-
.../hbase/client/TestAsyncTableAdminApi.java | 53 +-
.../hbase/client/TestAsyncTableAdminApi3.java | 24 +-
.../hbase/client/TestAsyncTableLocatePrefetch.java | 14 +-
.../hbase/client/TestAsyncTableRSCrashPublish.java | 3 +-
.../client/TestAsyncTableUseMetaReplicas.java | 4 +-
.../hadoop/hbase/client/TestMasterRegistry.java | 2 +-
.../hbase/client/TestMetaRegionLocationCache.java | 45 +-
.../TestMetaWithReplicasShutdownHandling.java | 15 +-
.../hadoop/hbase/client/TestReplicasClient.java | 14 +-
.../hbase/client/TestZKConnectionRegistry.java | 11 +-
.../hadoop/hbase/master/AlwaysStandByHMaster.java | 2 +-
.../hbase/master/MockNoopMasterServices.java | 7 +
.../hadoop/hbase/master/TestMasterFailover.java | 52 --
.../master/TestMetaAssignmentWithStopMaster.java | 16 +-
.../hbase/master/TestMetaShutdownHandler.java | 12 +-
.../master/assignment/TestRegionReplicaSplit.java | 5 +-
.../TestCompactionLifeCycleTracker.java | 4 +-
.../hbase/regionserver/TestRegionReplicas.java | 2 +-
.../regionserver/TestRegionServerNoMaster.java | 42 +-
...stRegionReplicaReplicationEndpointNoMaster.java | 58 +-
.../hadoop/hbase/util/TestHBaseFsckEncryption.java | 2 +-
.../hadoop/hbase/zookeeper/MetaTableLocator.java | 301 +--------
.../org/apache/hadoop/hbase/zookeeper/ZKUtil.java | 13 +-
74 files changed, 1808 insertions(+), 2001 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
index a2c59de..710084d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase;
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
+
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,8 +34,11 @@ import java.util.regex.Pattern;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -363,4 +370,27 @@ public class CatalogFamilyFormat {
}
return deleteReplicaLocations;
}
+
+ private static byte[] buildRegionLocateStartRow(TableName tableName, byte[] row,
+ RegionLocateType locateType) {
+ if (locateType.equals(RegionLocateType.BEFORE)) {
+ if (Bytes.equals(row, HConstants.EMPTY_END_ROW)) {
+ byte[] binaryTableName = tableName.getName();
+ return Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+ } else {
+ return createRegionName(tableName, row, ZEROES, false);
+ }
+ } else {
+ return createRegionName(tableName, row, NINES, false);
+ }
+ }
+
+ public static Scan createRegionLocateScan(TableName tableName, byte[] row,
+ RegionLocateType locateType, int prefetchLimit) {
+ byte[] startRow = buildRegionLocateStartRow(tableName, row, locateType);
+ byte[] stopRow = RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
+ return new Scan().withStartRow(startRow).withStopRow(stopRow, true)
+ .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(prefetchLimit)
+ .setReadType(ReadType.PREAD);
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
index ecc6573..74d2322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
@@ -164,26 +164,27 @@ public final class ClientMetaTableAccessor {
/**
* Used to get all region locations for the specific table.
- * @param metaTable
* @param tableName table we're looking for, can be null for getting all regions
* @return the list of region locations. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
- AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
+ AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName,
+ boolean excludeOfflinedSplitParents) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (locations == null || locations.isEmpty()) {
- future.complete(Collections.emptyList());
- } else {
- List<HRegionLocation> regionLocations =
- locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
- .collect(Collectors.toList());
- future.complete(regionLocations);
- }
- });
+ addListener(getTableRegionsAndLocations(metaTable, tableName, excludeOfflinedSplitParents),
+ (locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (locations == null || locations.isEmpty()) {
+ future.complete(Collections.emptyList());
+ } else {
+ List<HRegionLocation> regionLocations =
+ locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+ .collect(Collectors.toList());
+ future.complete(regionLocations);
+ }
+ });
return future;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
new file mode 100644
index 0000000..8048661
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The base class for locating region of a table.
+ */
+@InterfaceAudience.Private
+abstract class AbstractAsyncTableRegionLocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncTableRegionLocator.class);
+
+ protected final AsyncConnectionImpl conn;
+
+ protected final TableName tableName;
+
+ protected final int maxConcurrent;
+
+ protected final TableRegionLocationCache cache;
+
+ protected static final class LocateRequest {
+
+ final byte[] row;
+
+ final RegionLocateType locateType;
+
+ public LocateRequest(byte[] row, RegionLocateType locateType) {
+ this.row = row;
+ this.locateType = locateType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(row) ^ locateType.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != LocateRequest.class) {
+ return false;
+ }
+ LocateRequest that = (LocateRequest) obj;
+ return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
+ }
+ }
+
+ private final Set<LocateRequest> pendingRequests = new HashSet<>();
+
+ private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
+ new LinkedHashMap<>();
+
+ AbstractAsyncTableRegionLocator(AsyncConnectionImpl conn, TableName tableName,
+ int maxConcurrent) {
+ this.conn = conn;
+ this.tableName = tableName;
+ this.maxConcurrent = maxConcurrent;
+ this.cache = new TableRegionLocationCache(conn.getConnectionMetrics());
+ }
+
+ private boolean hasQuota() {
+ return pendingRequests.size() < maxConcurrent;
+ }
+
+ protected final Optional<LocateRequest> getCandidate() {
+ return allRequests.keySet().stream().filter(r -> !pendingRequests.contains(r)).findFirst();
+ }
+
+ void clearCompletedRequests(RegionLocations locations) {
+ for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
+ allRequests.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+ if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+ iter.remove();
+ }
+ }
+ }
+
+ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+ RegionLocations locations) {
+ if (future.isDone()) {
+ return true;
+ }
+ if (locations == null) {
+ return false;
+ }
+ HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
+ // we should at least have one location available, otherwise the request should fail and
+ // should not arrive here
+ assert loc != null;
+ boolean completed;
+ if (req.locateType.equals(RegionLocateType.BEFORE)) {
+ // for locating the row before current row, the common case is to find the previous region
+ // in reverse scan, so we check the endKey first. In general, the condition should be
+ // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
+ // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
+ // endKey.
+ byte[] endKey = loc.getRegion().getEndKey();
+ int c = Bytes.compareTo(endKey, req.row);
+ completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
+ Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
+ } else {
+ completed = loc.getRegion().containsRow(req.row);
+ }
+ if (completed) {
+ future.complete(locations);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ protected final void onLocateComplete(LocateRequest req, RegionLocations locs, Throwable error) {
+ if (error != null) {
+ LOG.warn("Failed to locate region in '" + tableName + "', row='" +
+ Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
+ }
+ Optional<LocateRequest> toSend = Optional.empty();
+ if (locs != null) {
+ RegionLocations addedLocs = cache.add(locs);
+ synchronized (this) {
+ pendingRequests.remove(req);
+ clearCompletedRequests(addedLocs);
+ // Remove a complete locate request in a synchronized block, so the table cache must have
+ // quota to send a candidate request.
+ toSend = getCandidate();
+ toSend.ifPresent(pendingRequests::add);
+ }
+ toSend.ifPresent(this::locate);
+ } else {
+ // we meet an error
+ assert error != null;
+ synchronized (this) {
+ pendingRequests.remove(req);
+ // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
+ // already retried several times
+ CompletableFuture<?> future = allRequests.remove(req);
+ if (future != null) {
+ future.completeExceptionally(error);
+ }
+ clearCompletedRequests(null);
+ // Remove a complete locate request in a synchronized block, so the table cache must have
+ // quota to send a candidate request.
+ toSend = getCandidate();
+ toSend.ifPresent(pendingRequests::add);
+ }
+ toSend.ifPresent(this::locate);
+ }
+ }
+
+ // return false means you do not need to go on, just return. And you do not need to call the above
+ // onLocateComplete either when returning false, as we will call it in this method for you, this
+ // is why we need to pass the LocateRequest as a parameter.
+ protected final boolean validateRegionLocations(RegionLocations locs, LocateRequest req) {
+ // remove HRegionLocation with null location, i.e, getServerName returns null.
+ if (locs != null) {
+ locs = locs.removeElementsWithNullLocation();
+ }
+
+ // the default region location should always be presented when fetching from meta, otherwise
+ // let's fail the request.
+ if (locs == null || locs.getDefaultRegionLocation() == null) {
+ onLocateComplete(req, null,
+ new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
+ tableName, Bytes.toStringBinary(req.row), req.locateType)));
+ return false;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ RegionInfo info = loc.getRegion();
+ if (info == null) {
+ onLocateComplete(req, null,
+ new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+ tableName, Bytes.toStringBinary(req.row), req.locateType)));
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract void locate(LocateRequest req);
+
+ abstract CompletableFuture<List<HRegionLocation>>
+ getAllRegionLocations(boolean excludeOfflinedSplitParents);
+
+ CompletableFuture<RegionLocations> getRegionLocations(byte[] row, int replicaId,
+ RegionLocateType locateType, boolean reload) {
+ if (locateType.equals(RegionLocateType.AFTER)) {
+ row = createClosestRowAfter(row);
+ locateType = RegionLocateType.CURRENT;
+ }
+ if (!reload) {
+ RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
+ }
+ }
+ CompletableFuture<RegionLocations> future;
+ LocateRequest req;
+ boolean sendRequest = false;
+ synchronized (this) {
+ // check again
+ if (!reload) {
+ RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
+ }
+ }
+ req = new LocateRequest(row, locateType);
+ future = allRequests.get(req);
+ if (future == null) {
+ future = new CompletableFuture<>();
+ allRequests.put(req, future);
+ if (hasQuota() && !pendingRequests.contains(req)) {
+ pendingRequests.add(req);
+ sendRequest = true;
+ }
+ }
+ }
+ if (sendRequest) {
+ locate(req);
+ }
+ return future;
+ }
+
+ void addToCache(RegionLocations locs) {
+ cache.add(locs);
+ }
+
+ // notice that this is not a constant time operation, do not call it on critical path.
+ int getCacheSize() {
+ return cache.size();
+ }
+
+ void clearPendingRequests() {
+ synchronized (this) {
+ if (!allRequests.isEmpty()) {
+ IOException error = new IOException("Cache cleared");
+ for (CompletableFuture<?> future : allRequests.values()) {
+ future.completeExceptionally(error);
+ }
+ }
+ }
+ }
+
+ void clearCache(ServerName serverName) {
+ cache.clearCache(serverName);
+ }
+
+ void removeLocationFromCache(HRegionLocation loc) {
+ cache.removeLocationFromCache(loc);
+ }
+
+ RegionLocations getInCache(byte[] key) {
+ return cache.get(key);
+ }
+
+ // only used for testing whether we have cached the location for a region.
+ @VisibleForTesting
+ RegionLocations locateInCache(byte[] row) {
+ return cache.locate(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+ RegionLocateType.CURRENT);
+ }
+
+ // only used for testing whether we have cached the location for a table.
+ @VisibleForTesting
+ int getNumberOfCachedRegionLocations() {
+ return cache.getNumberOfCachedRegionLocations();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 0f12e90..9b80025 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -38,7 +38,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
@@ -59,6 +58,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -84,11 +84,11 @@ class AsyncConnectionImpl implements AsyncConnection {
final AsyncConnectionConfiguration connConf;
- private final User user;
+ final User user;
final ConnectionRegistry registry;
- private final int rpcTimeout;
+ final int rpcTimeout;
protected final RpcClient rpcClient;
@@ -124,7 +124,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile ConnectionOverAsyncConnection conn;
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
- SocketAddress localAddress, User user) {
+ SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
if (user.isLoginFromKeytab()) {
@@ -137,8 +137,8 @@ class AsyncConnectionImpl implements AsyncConnection {
} else {
this.metrics = Optional.empty();
}
- this.rpcClient = RpcClientFactory.createClient(
- conf, clusterId, localAddress, metrics.orElse(null));
+ this.rpcClient =
+ RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
@@ -162,14 +162,13 @@ class AsyncConnectionImpl implements AsyncConnection {
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
} else {
try {
- listener = new ClusterStatusListener(
- new ClusterStatusListener.DeadServerHandler() {
- @Override
- public void newDead(ServerName sn) {
- locator.clearCache(sn);
- rpcClient.cancelConnections(sn);
- }
- }, conf, listenerClass);
+ listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
+ @Override
+ public void newDead(ServerName sn) {
+ locator.clearCache(sn);
+ rpcClient.cancelConnections(sn);
+ }
+ }, conf, listenerClass);
} catch (IOException e) {
LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
}
@@ -194,13 +193,13 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
- public void close() {
+ public void close() throws IOException {
if (!closed.compareAndSet(false, true)) {
return;
}
- IOUtils.closeQuietly(clusterStatusListener);
- IOUtils.closeQuietly(rpcClient);
- IOUtils.closeQuietly(registry);
+ Closeables.close(clusterStatusListener, true);
+ Closeables.close(rpcClient, true);
+ Closeables.close(registry, true);
if (authService != null) {
authService.shutdown();
}
@@ -312,7 +311,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
- ExecutorService pool) {
+ ExecutorService pool) {
return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
@Override
@@ -353,7 +352,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
- ExecutorService pool) {
+ ExecutorService pool) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
deleted file mode 100644
index 3571f960..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-/**
- * The asynchronous locator for meta region.
- */
-@InterfaceAudience.Private
-class AsyncMetaRegionLocator {
-
- private final ConnectionRegistry registry;
-
- private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
-
- private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
- new AtomicReference<>();
-
- AsyncMetaRegionLocator(ConnectionRegistry registry) {
- this.registry = registry;
- }
-
- /**
- * Get the region locations for meta region. If the location for the given replica is not
- * available in the cached locations, then fetch from the HBase cluster.
- * <p/>
- * The <code>replicaId</code> parameter is important. If the region replication config for meta
- * region is changed, then the cached region locations may not have the locations for new
- * replicas. If we do not check the location for the given replica, we will always return the
- * cached region locations and cause an infinite loop.
- */
- CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
- return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
- registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
- }
-
- private HRegionLocation getCacheLocation(HRegionLocation loc) {
- RegionLocations locs = metaRegionLocations.get();
- return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
- }
-
- private void addLocationToCache(HRegionLocation loc) {
- for (;;) {
- int replicaId = loc.getRegion().getReplicaId();
- RegionLocations oldLocs = metaRegionLocations.get();
- if (oldLocs == null) {
- RegionLocations newLocs = createRegionLocations(loc);
- if (metaRegionLocations.compareAndSet(null, newLocs)) {
- return;
- }
- }
- HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
- if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
- oldLoc.getServerName().equals(loc.getServerName()))) {
- return;
- }
- RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
- if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
- return;
- }
- }
- }
-
- private void removeLocationFromCache(HRegionLocation loc) {
- for (;;) {
- RegionLocations oldLocs = metaRegionLocations.get();
- if (oldLocs == null) {
- return;
- }
- HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
- if (!canUpdateOnError(loc, oldLoc)) {
- return;
- }
- RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
- if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
- return;
- }
- }
- }
-
- void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
- AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
- this::addLocationToCache, this::removeLocationFromCache, null);
- }
-
- void clearCache() {
- metaRegionLocations.set(null);
- }
-
- void clearCache(ServerName serverName) {
- for (;;) {
- RegionLocations locs = metaRegionLocations.get();
- if (locs == null) {
- return;
- }
- RegionLocations newLocs = locs.removeByServer(serverName);
- if (locs == newLocs) {
- return;
- }
- if (newLocs.isEmpty()) {
- newLocs = null;
- }
- if (metaRegionLocations.compareAndSet(locs, newLocs)) {
- return;
- }
- }
- }
-
- // only used for testing whether we have cached the location for a region.
- @VisibleForTesting
- RegionLocations getRegionLocationInCache() {
- return metaRegionLocations.get();
- }
-
- // only used for testing whether we have cached the location for a table.
- @VisibleForTesting
- int getNumberOfCachedRegionLocations() {
- RegionLocations locs = metaRegionLocations.get();
- return locs != null ? locs.numNonNullElements() : 0;
- }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
new file mode 100644
index 0000000..16a1b8a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+
+/**
+ * The class for locating region for meta table.
+ */
+@InterfaceAudience.Private
+class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);
+
+ private final AtomicReference<Interface> stub = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
+ new AtomicReference<>();
+
+ AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
+ super(conn, tableName, maxConcurrent);
+ }
+
+ private Interface createStub(ServerName serverName) throws IOException {
+ return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
+ (int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
+ }
+
+ CompletableFuture<Interface> getStub() {
+ return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
+ CompletableFuture<Interface> future = new CompletableFuture<>();
+ addListener(conn.registry.getActiveMaster(), (addr, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else if (addr == null) {
+ future.completeExceptionally(new MasterNotRunningException(
+ "ZooKeeper available but no active master location found"));
+ } else {
+ LOG.debug("The fetched master address is {}", addr);
+ try {
+ future.complete(createStub(addr));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+
+ });
+ return future;
+ }, stub -> true, "ClientLocateMetaStub");
+ }
+
+ private void tryClearMasterStubCache(IOException error, Interface currentStub) {
+ if (ClientExceptionsUtil.isConnectionException(error) ||
+ error instanceof ServerNotRunningYetException) {
+ stub.compareAndSet(currentStub, null);
+ }
+ }
+
+ @Override
+ protected void locate(LocateRequest req) {
+ addListener(getStub(), (stub, error) -> {
+ if (error != null) {
+ onLocateComplete(req, null, error);
+ return;
+ }
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ stub.locateMetaRegion(controller,
+ LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
+ .setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
+ resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ tryClearMasterStubCache(ex, stub);
+ onLocateComplete(req, null, ex);
+ return;
+ }
+ RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+ if (validateRegionLocations(locs, req)) {
+ onLocateComplete(req, locs, null);
+ }
+ });
+ });
+ }
+
+ @Override
+ CompletableFuture<List<HRegionLocation>>
+ getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+ CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+ addListener(getStub(), (stub, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
+ .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ tryClearMasterStubCache(ex, stub);
+ future.completeExceptionally(ex);
+ return;
+ }
+ List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
+ future.complete(locs);
+ });
+ });
+ return future;
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
deleted file mode 100644
index b202168..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.NINES;
-import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
-import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
-import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
-import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.hadoop.hbase.CatalogFamilyFormat;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Objects;
-
-/**
- * The asynchronous locator for regions other than meta.
- */
-@InterfaceAudience.Private
-class AsyncNonMetaRegionLocator {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
-
- @VisibleForTesting
- static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
- "hbase.client.meta.max.concurrent.locate.per.table";
-
- private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
-
- @VisibleForTesting
- static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
-
- private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
-
- private final AsyncConnectionImpl conn;
-
- private final int maxConcurrentLocateRequestPerTable;
-
- private final int locatePrefetchLimit;
-
- private final boolean useMetaReplicas;
-
- private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
-
- private static final class LocateRequest {
-
- private final byte[] row;
-
- private final RegionLocateType locateType;
-
- public LocateRequest(byte[] row, RegionLocateType locateType) {
- this.row = row;
- this.locateType = locateType;
- }
-
- @Override
- public int hashCode() {
- return Bytes.hashCode(row) ^ locateType.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || obj.getClass() != LocateRequest.class) {
- return false;
- }
- LocateRequest that = (LocateRequest) obj;
- return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
- }
- }
-
- private static final class TableCache {
-
- private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
- new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
-
- private final Set<LocateRequest> pendingRequests = new HashSet<>();
-
- private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
- new LinkedHashMap<>();
-
- public boolean hasQuota(int max) {
- return pendingRequests.size() < max;
- }
-
- public boolean isPending(LocateRequest req) {
- return pendingRequests.contains(req);
- }
-
- public void send(LocateRequest req) {
- pendingRequests.add(req);
- }
-
- public Optional<LocateRequest> getCandidate() {
- return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
- }
-
- public void clearCompletedRequests(RegionLocations locations) {
- for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
- allRequests.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
- iter.remove();
- }
- }
- }
-
- private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
- RegionLocations locations) {
- if (future.isDone()) {
- return true;
- }
- if (locations == null) {
- return false;
- }
- HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
- // we should at least have one location available, otherwise the request should fail and
- // should not arrive here
- assert loc != null;
- boolean completed;
- if (req.locateType.equals(RegionLocateType.BEFORE)) {
- // for locating the row before current row, the common case is to find the previous region
- // in reverse scan, so we check the endKey first. In general, the condition should be
- // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
- // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
- // endKey.
- byte[] endKey = loc.getRegion().getEndKey();
- int c = Bytes.compareTo(endKey, req.row);
- completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
- Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
- } else {
- completed = loc.getRegion().containsRow(req.row);
- }
- if (completed) {
- future.complete(locations);
- return true;
- } else {
- return false;
- }
- }
- }
-
- AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
- this.conn = conn;
- this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
- MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
- this.locatePrefetchLimit =
- conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
- this.useMetaReplicas =
- conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
- }
-
- private TableCache getTableCache(TableName tableName) {
- return computeIfAbsent(cache, tableName, TableCache::new);
- }
-
- private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
- HRegionLocation[] locArr1 = locs1.getRegionLocations();
- HRegionLocation[] locArr2 = locs2.getRegionLocations();
- if (locArr1.length != locArr2.length) {
- return false;
- }
- for (int i = 0; i < locArr1.length; i++) {
- // do not need to compare region info
- HRegionLocation loc1 = locArr1[i];
- HRegionLocation loc2 = locArr2[i];
- if (loc1 == null) {
- if (loc2 != null) {
- return false;
- }
- } else {
- if (loc2 == null) {
- return false;
- }
- if (loc1.getSeqNum() != loc2.getSeqNum()) {
- return false;
- }
- if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
- return false;
- }
- }
- }
- return true;
- }
-
- // if we successfully add the locations to cache, return the locations, otherwise return the one
- // which prevents us being added. The upper layer can use this value to complete pending requests.
- private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
- LOG.trace("Try adding {} to cache", locs);
- byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
- for (;;) {
- RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
- if (oldLocs == null) {
- return locs;
- }
- // check whether the regions are the same, this usually happens when table is split/merged, or
- // deleted and recreated again.
- RegionInfo region = locs.getRegionLocation().getRegion();
- RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
- if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
- RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
- if (isEqual(mergedLocs, oldLocs)) {
- // the merged one is the same with the old one, give up
- LOG.trace("Will not add {} to cache because the old value {} " +
- " is newer than us or has the same server name." +
- " Maybe it is updated before we replace it", locs, oldLocs);
- return oldLocs;
- }
- if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
- return mergedLocs;
- }
- } else {
- // the region is different, here we trust the one we fetched. This maybe wrong but finally
- // the upper layer can detect this and trigger removal of the wrong locations
- if (LOG.isDebugEnabled()) {
- LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
- " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
- }
- if (tableCache.cache.replace(startKey, oldLocs, locs)) {
- return locs;
- }
- }
- }
- }
-
- private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
- Throwable error) {
- if (error != null) {
- LOG.warn("Failed to locate region in '" + tableName + "', row='" +
- Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
- }
- Optional<LocateRequest> toSend = Optional.empty();
- TableCache tableCache = getTableCache(tableName);
- if (locs != null) {
- RegionLocations addedLocs = addToCache(tableCache, locs);
- synchronized (tableCache) {
- tableCache.pendingRequests.remove(req);
- tableCache.clearCompletedRequests(addedLocs);
- // Remove a complete locate request in a synchronized block, so the table cache must have
- // quota to send a candidate request.
- toSend = tableCache.getCandidate();
- toSend.ifPresent(r -> tableCache.send(r));
- }
- toSend.ifPresent(r -> locateInMeta(tableName, r));
- } else {
- // we meet an error
- assert error != null;
- synchronized (tableCache) {
- tableCache.pendingRequests.remove(req);
- // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
- // already retried several times
- CompletableFuture<?> future = tableCache.allRequests.remove(req);
- if (future != null) {
- future.completeExceptionally(error);
- }
- tableCache.clearCompletedRequests(null);
- // Remove a complete locate request in a synchronized block, so the table cache must have
- // quota to send a candidate request.
- toSend = tableCache.getCandidate();
- toSend.ifPresent(r -> tableCache.send(r));
- }
- toSend.ifPresent(r -> locateInMeta(tableName, r));
- }
- }
-
- // return whether we should stop the scan
- private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
- RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
- if (LOG.isDebugEnabled()) {
- LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
- Bytes.toStringBinary(req.row), req.locateType, locs);
- }
- // remove HRegionLocation with null location, i.e, getServerName returns null.
- if (locs != null) {
- locs = locs.removeElementsWithNullLocation();
- }
-
- // the default region location should always be presented when fetching from meta, otherwise
- // let's fail the request.
- if (locs == null || locs.getDefaultRegionLocation() == null) {
- complete(tableName, req, null,
- new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
- tableName, Bytes.toStringBinary(req.row), req.locateType)));
- return true;
- }
- HRegionLocation loc = locs.getDefaultRegionLocation();
- RegionInfo info = loc.getRegion();
- if (info == null) {
- complete(tableName, req, null,
- new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
- tableName, Bytes.toStringBinary(req.row), req.locateType)));
- return true;
- }
- if (info.isSplitParent()) {
- return false;
- }
- complete(tableName, req, locs, null);
- return true;
- }
-
- private void recordCacheHit() {
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
- }
-
- private void recordCacheMiss() {
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
- }
-
- private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
- int replicaId) {
- Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
- if (entry == null) {
- recordCacheMiss();
- return null;
- }
- RegionLocations locs = entry.getValue();
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- recordCacheMiss();
- return null;
- }
- byte[] endKey = loc.getRegion().getEndKey();
- if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
- Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
- }
- recordCacheHit();
- return locs;
- } else {
- recordCacheMiss();
- return null;
- }
- }
-
- private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
- byte[] row, int replicaId) {
- boolean isEmptyStopRow = isEmptyStopRow(row);
- Map.Entry<byte[], RegionLocations> entry =
- isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
- if (entry == null) {
- recordCacheMiss();
- return null;
- }
- RegionLocations locs = entry.getValue();
- HRegionLocation loc = locs.getRegionLocation(replicaId);
- if (loc == null) {
- recordCacheMiss();
- return null;
- }
- if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
- (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
- Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
- }
- recordCacheHit();
- return locs;
- } else {
- recordCacheMiss();
- return null;
- }
- }
-
- private void locateInMeta(TableName tableName, LocateRequest req) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
- "', locateType=" + req.locateType + " in meta");
- }
- byte[] metaStartKey;
- if (req.locateType.equals(RegionLocateType.BEFORE)) {
- if (isEmptyStopRow(req.row)) {
- byte[] binaryTableName = tableName.getName();
- metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
- } else {
- metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
- }
- } else {
- metaStartKey = createRegionName(tableName, req.row, NINES, false);
- }
- byte[] metaStopKey =
- RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
- Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
- .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
- .setReadType(ReadType.PREAD);
- if (useMetaReplicas) {
- scan.setConsistency(Consistency.TIMELINE);
- }
- conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
-
- private boolean completeNormally = false;
-
- private boolean tableNotFound = true;
-
- @Override
- public void onError(Throwable error) {
- complete(tableName, req, null, error);
- }
-
- @Override
- public void onComplete() {
- if (tableNotFound) {
- complete(tableName, req, null, new TableNotFoundException(tableName));
- } else if (!completeNormally) {
- complete(tableName, req, null, new IOException(
- "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
- }
- }
-
- @Override
- public void onNext(Result[] results, ScanController controller) {
- if (results.length == 0) {
- return;
- }
- tableNotFound = false;
- int i = 0;
- for (; i < results.length; i++) {
- if (onScanNext(tableName, req, results[i])) {
- completeNormally = true;
- controller.terminate();
- i++;
- break;
- }
- }
- // Add the remaining results into cache
- if (i < results.length) {
- TableCache tableCache = getTableCache(tableName);
- for (; i < results.length; i++) {
- RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
- if (locs == null) {
- continue;
- }
- HRegionLocation loc = locs.getDefaultRegionLocation();
- if (loc == null) {
- continue;
- }
- RegionInfo info = loc.getRegion();
- if (info == null || info.isOffline() || info.isSplitParent()) {
- continue;
- }
- RegionLocations addedLocs = addToCache(tableCache, locs);
- synchronized (tableCache) {
- tableCache.clearCompletedRequests(addedLocs);
- }
- }
- }
- }
- });
- }
-
- private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
- int replicaId, RegionLocateType locateType) {
- return locateType.equals(RegionLocateType.BEFORE)
- ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
- : locateRowInCache(tableCache, tableName, row, replicaId);
- }
-
- // locateToPrevious is true means we will use the start key of a region to locate the region
- // placed before it. Used for reverse scan. See the comment of
- // AsyncRegionLocator.getPreviousRegionLocation.
- private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
- byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
- // AFTER should be convert to CURRENT before calling this method
- assert !locateType.equals(RegionLocateType.AFTER);
- TableCache tableCache = getTableCache(tableName);
- if (!reload) {
- RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
- if (isGood(locs, replicaId)) {
- return CompletableFuture.completedFuture(locs);
- }
- }
- CompletableFuture<RegionLocations> future;
- LocateRequest req;
- boolean sendRequest = false;
- synchronized (tableCache) {
- // check again
- if (!reload) {
- RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
- if (isGood(locs, replicaId)) {
- return CompletableFuture.completedFuture(locs);
- }
- }
- req = new LocateRequest(row, locateType);
- future = tableCache.allRequests.get(req);
- if (future == null) {
- future = new CompletableFuture<>();
- tableCache.allRequests.put(req, future);
- if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
- tableCache.send(req);
- sendRequest = true;
- }
- }
- }
- if (sendRequest) {
- locateInMeta(tableName, req);
- }
- return future;
- }
-
- CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
- int replicaId, RegionLocateType locateType, boolean reload) {
- // as we know the exact row after us, so we can just create the new row, and use the same
- // algorithm to locate it.
- if (locateType.equals(RegionLocateType.AFTER)) {
- row = createClosestRowAfter(row);
- locateType = RegionLocateType.CURRENT;
- }
- return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
- }
-
- private void recordClearRegionCache() {
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
- }
-
- private void removeLocationFromCache(HRegionLocation loc) {
- TableCache tableCache = cache.get(loc.getRegion().getTable());
- if (tableCache == null) {
- return;
- }
- byte[] startKey = loc.getRegion().getStartKey();
- for (;;) {
- RegionLocations oldLocs = tableCache.cache.get(startKey);
- if (oldLocs == null) {
- return;
- }
- HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
- if (!canUpdateOnError(loc, oldLoc)) {
- return;
- }
- RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
- if (newLocs == null) {
- if (tableCache.cache.remove(startKey, oldLocs)) {
- recordClearRegionCache();
- return;
- }
- } else {
- if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
- recordClearRegionCache();
- return;
- }
- }
- }
- }
-
- private void addLocationToCache(HRegionLocation loc) {
- addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
- }
-
- private HRegionLocation getCachedLocation(HRegionLocation loc) {
- TableCache tableCache = cache.get(loc.getRegion().getTable());
- if (tableCache == null) {
- return null;
- }
- RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
- return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
- }
-
- void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
- Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
- AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
- this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
- }
-
- void clearCache(TableName tableName) {
- TableCache tableCache = cache.remove(tableName);
- if (tableCache == null) {
- return;
- }
- synchronized (tableCache) {
- if (!tableCache.allRequests.isEmpty()) {
- IOException error = new IOException("Cache cleared");
- tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
- }
- }
- conn.getConnectionMetrics()
- .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
- }
-
- void clearCache() {
- cache.clear();
- }
-
- void clearCache(ServerName serverName) {
- for (TableCache tableCache : cache.values()) {
- for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
- byte[] regionName = entry.getKey();
- RegionLocations locs = entry.getValue();
- RegionLocations newLocs = locs.removeByServer(serverName);
- if (locs == newLocs) {
- continue;
- }
- if (newLocs.isEmpty()) {
- tableCache.cache.remove(regionName, locs);
- } else {
- tableCache.cache.replace(regionName, locs, newLocs);
- }
- }
- }
- }
-
- // only used for testing whether we have cached the location for a region.
- @VisibleForTesting
- RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
- TableCache tableCache = cache.get(tableName);
- if (tableCache == null) {
- return null;
- }
- return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
- }
-
- // only used for testing whether we have cached the location for a table.
- @VisibleForTesting
- int getNumberOfCachedRegionLocations(TableName tableName) {
- TableCache tableCache = cache.get(tableName);
- if (tableCache == null) {
- return 0;
- }
- return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
- }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
new file mode 100644
index 0000000..09b3133
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class for locating region for table other than meta.
+ */
+@InterfaceAudience.Private
+class AsyncNonMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaTableRegionLocator.class);
+
+ private final int prefetchLimit;
+
+ private final boolean useMetaReplicas;
+
+ AsyncNonMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent,
+ int prefetchLimit, boolean useMetaReplicas) {
+ super(conn, tableName, maxConcurrent);
+ this.prefetchLimit = prefetchLimit;
+ this.useMetaReplicas = useMetaReplicas;
+ }
+
+ // return whether we should stop the scan
+ private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
+ Bytes.toStringBinary(req.row), req.locateType, locs);
+ }
+ if (!validateRegionLocations(locs, req)) {
+ return true;
+ }
+ if (locs.getDefaultRegionLocation().getRegion().isSplitParent()) {
+ return false;
+ }
+ onLocateComplete(req, locs, null);
+ return true;
+ }
+
+ @Override
+ protected void locate(LocateRequest req) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try locate '{}', row='{}', locateType={} in meta", tableName,
+ Bytes.toStringBinary(req.row), req.locateType);
+ }
+ Scan scan =
+ CatalogFamilyFormat.createRegionLocateScan(tableName, req.row, req.locateType, prefetchLimit);
+ if (useMetaReplicas) {
+ scan.setConsistency(Consistency.TIMELINE);
+ }
+ conn.getTable(TableName.META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
+
+ private boolean completeNormally = false;
+
+ private boolean tableNotFound = true;
+
+ @Override
+ public void onError(Throwable error) {
+ onLocateComplete(req, null, error);
+ }
+
+ @Override
+ public void onComplete() {
+ if (tableNotFound) {
+ onLocateComplete(req, null, new TableNotFoundException(tableName));
+ } else if (!completeNormally) {
+ onLocateComplete(req, null, new IOException(
+ "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
+ }
+ }
+
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ if (results.length == 0) {
+ return;
+ }
+ tableNotFound = false;
+ int i = 0;
+ for (; i < results.length; i++) {
+ if (onScanNext(tableName, req, results[i])) {
+ completeNormally = true;
+ controller.terminate();
+ i++;
+ break;
+ }
+ }
+ // Add the remaining results into cache
+ if (i < results.length) {
+ for (; i < results.length; i++) {
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
+ if (locs == null) {
+ continue;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ if (loc == null) {
+ continue;
+ }
+ RegionInfo info = loc.getRegion();
+ if (info == null || info.isOffline() || info.isSplitParent()) {
+ continue;
+ }
+ RegionLocations addedLocs = cache.add(locs);
+ synchronized (this) {
+ clearCompletedRequests(addedLocs);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ CompletableFuture<List<HRegionLocation>>
+ getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+ return ClientMetaTableAccessor.getTableHRegionLocations(
+ conn.getTable(TableName.META_TABLE_NAME), tableName, excludeOfflinedSplitParents);
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 09eabfc..fce59a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,10 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -31,8 +40,6 @@ import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
@@ -44,25 +51,51 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@InterfaceAudience.Private
class AsyncRegionLocator {
- private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class);
+ @VisibleForTesting
+ static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
+ "hbase.client.meta.max.concurrent.locate.per.table";
+
+ private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+
+ @VisibleForTesting
+ static final String MAX_CONCURRENT_LOCATE_META_REQUEST =
+ "hbase.client.meta.max.concurrent.locate";
+
+ @VisibleForTesting
+ static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
+
+ private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
- private final AsyncMetaRegionLocator metaRegionLocator;
+ private final int maxConcurrentLocateRequestPerTable;
+
+ private final int maxConcurrentLocateMetaRequest;
+
+ private final int locatePrefetchLimit;
- private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
+ private final boolean useMetaReplicas;
- AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+ private final ConcurrentMap<TableName, AbstractAsyncTableRegionLocator> table2Locator =
+ new ConcurrentHashMap<>();
+
+ public AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
this.conn = conn;
- this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
- this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
this.retryTimer = retryTimer;
+ this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
+ MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+ this.maxConcurrentLocateMetaRequest = conn.getConfiguration()
+ .getInt(MAX_CONCURRENT_LOCATE_META_REQUEST, maxConcurrentLocateRequestPerTable);
+ this.locatePrefetchLimit =
+ conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
+ this.useMetaReplicas =
+ conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
}
private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
- Supplier<String> timeoutMsg) {
+ Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
@@ -85,12 +118,28 @@ class AsyncRegionLocator {
return TableName.isMetaTableName(tableName);
}
+ private AbstractAsyncTableRegionLocator getOrCreateTableRegionLocator(TableName tableName) {
+ return computeIfAbsent(table2Locator, tableName, () -> {
+ if (isMeta(tableName)) {
+ return new AsyncMetaTableRegionLocator(conn, tableName, maxConcurrentLocateMetaRequest);
+ } else {
+ return new AsyncNonMetaTableRegionLocator(conn, tableName,
+ maxConcurrentLocateRequestPerTable, locatePrefetchLimit, useMetaReplicas);
+ }
+ });
+ }
+
+ @VisibleForTesting
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
- RegionLocateType type, boolean reload, long timeoutNs) {
- CompletableFuture<RegionLocations> future = isMeta(tableName)
- ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
- : nonMetaRegionLocator.getRegionLocations(tableName, row,
- RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+ int replicaId, RegionLocateType locateType, boolean reload) {
+ return getOrCreateTableRegionLocator(tableName).getRegionLocations(row, replicaId, locateType,
+ reload);
+ }
+
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ RegionLocateType type, boolean reload, long timeoutNs) {
+ CompletableFuture<RegionLocations> future =
+ getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
@@ -98,13 +147,12 @@ class AsyncRegionLocator {
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
+ int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
CompletableFuture<RegionLocations> locsFuture =
- isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
- : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+ getRegionLocations(tableName, row, replicaId, type, reload);
addListener(locsFuture, (locs, error) -> {
if (error != null) {
future.completeExceptionally(error);
@@ -131,72 +179,115 @@ class AsyncRegionLocator {
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- int replicaId, RegionLocateType type, long timeoutNs) {
+ int replicaId, RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType type, boolean reload, long timeoutNs) {
+ RegionLocateType type, boolean reload, long timeoutNs) {
return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
timeoutNs);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType type, long timeoutNs) {
+ RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, type, false, timeoutNs);
}
- void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
- if (loc.getRegion().isMetaRegion()) {
- metaRegionLocator.updateCachedLocationOnError(loc, exception);
- } else {
- nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
+ /**
+ * Get all region locations for a table.
+ * <p/>
+ * Notice that this method will not read from cache.
+ */
+ CompletableFuture<List<HRegionLocation>> getAllRegionLocations(TableName tableName,
+ boolean excludeOfflinedSplitParents) {
+ CompletableFuture<List<HRegionLocation>> future =
+ getOrCreateTableRegionLocator(tableName).getAllRegionLocations(excludeOfflinedSplitParents);
+ addListener(future, (locs, error) -> {
+ if (error != null) {
+ return;
+ }
+ // add locations to cache
+ AbstractAsyncTableRegionLocator locator = getOrCreateTableRegionLocator(tableName);
+ Map<RegionInfo, List<HRegionLocation>> map = new HashMap<>();
+ for (HRegionLocation loc : locs) {
+ // do not cache split parent
+ if (loc.getRegion() != null && !loc.getRegion().isSplitParent()) {
+ map.computeIfAbsent(RegionReplicaUtil.getRegionInfoForDefaultReplica(loc.getRegion()),
+ k -> new ArrayList<>()).add(loc);
+ }
+ }
+ for (List<HRegionLocation> l : map.values()) {
+ locator.addToCache(new RegionLocations(l));
+ }
+ });
+ return future;
+ }
+
+ private void removeLocationFromCache(HRegionLocation loc) {
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+ if (locator == null) {
+ return;
}
+ locator.removeLocationFromCache(loc);
}
- void clearCache(TableName tableName) {
- LOG.debug("Clear meta cache for {}", tableName);
- if (tableName.equals(META_TABLE_NAME)) {
- metaRegionLocator.clearCache();
- } else {
- nonMetaRegionLocator.clearCache(tableName);
+ private void addLocationToCache(HRegionLocation loc) {
+ getOrCreateTableRegionLocator(loc.getRegion().getTable())
+ .addToCache(createRegionLocations(loc));
+ }
+
+ private HRegionLocation getCachedLocation(HRegionLocation loc) {
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+ if (locator == null) {
+ return null;
}
+ RegionLocations locs = locator.getInCache(loc.getRegion().getStartKey());
+ return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
}
- void clearCache(ServerName serverName) {
- LOG.debug("Clear meta cache for {}", serverName);
- metaRegionLocator.clearCache(serverName);
- nonMetaRegionLocator.clearCache(serverName);
- conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+ this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
+ }
+
+ void clearCache(TableName tableName) {
+ AbstractAsyncTableRegionLocator locator = table2Locator.remove(tableName);
+ if (locator == null) {
+ return;
+ }
+ locator.clearPendingRequests();
+ conn.getConnectionMetrics()
+ .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(locator.getCacheSize()));
}
void clearCache() {
- metaRegionLocator.clearCache();
- nonMetaRegionLocator.clearCache();
+ table2Locator.clear();
}
- @VisibleForTesting
- AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
- return nonMetaRegionLocator;
+ void clearCache(ServerName serverName) {
+ for (AbstractAsyncTableRegionLocator locator : table2Locator.values()) {
+ locator.clearCache(serverName);
+ }
}
// only used for testing whether we have cached the location for a region.
@VisibleForTesting
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
- if (TableName.isMetaTableName(tableName)) {
- return metaRegionLocator.getRegionLocationInCache();
- } else {
- return nonMetaRegionLocator.getRegionLocationInCache(tableName, row);
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+ if (locator == null) {
+ return null;
}
+ return locator.locateInCache(row);
}
// only used for testing whether we have cached the location for a table.
@VisibleForTesting
int getNumberOfCachedRegionLocations(TableName tableName) {
- if (TableName.isMetaTableName(tableName)) {
- return metaRegionLocator.getNumberOfCachedRegionLocations();
- } else {
- return nonMetaRegionLocator.getNumberOfCachedRegionLocations(tableName);
+ AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+ if (locator == null) {
+ return 0;
}
+ return locator.getNumberOfCachedRegionLocations();
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4c6cd5a..c34cc5a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import java.util.Arrays;
+import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
@@ -30,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
+
/**
* Helper class for asynchronous region locator.
*/
@@ -55,9 +58,9 @@ final class AsyncRegionLocatorHelper {
}
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
- Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
- Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
- MetricsConnection metrics) {
+ Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+ Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
+ Optional<MetricsConnection> metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -85,9 +88,7 @@ final class AsyncRegionLocatorHelper {
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
- if (metrics != null) {
- metrics.incrCacheDroppingExceptions(exception);
- }
+ metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
removeFromCache.accept(loc);
}
}
@@ -146,4 +147,33 @@ final class AsyncRegionLocatorHelper {
HRegionLocation loc = locs.getRegionLocation(replicaId);
return loc != null && loc.getServerName() != null;
}
+
+ static boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+ HRegionLocation[] locArr1 = locs1.getRegionLocations();
+ HRegionLocation[] locArr2 = locs2.getRegionLocations();
+ if (locArr1.length != locArr2.length) {
+ return false;
+ }
+ for (int i = 0; i < locArr1.length; i++) {
+ // do not need to compare region info
+ HRegionLocation loc1 = locArr1[i];
+ HRegionLocation loc2 = locArr2[i];
+ if (loc1 == null) {
+ if (loc2 != null) {
+ return false;
+ }
+ } else {
+ if (loc2 == null) {
+ return false;
+ }
+ if (loc1.getSeqNum() != loc2.getSeqNum()) {
+ return false;
+ }
+ if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index 321f44e..49bade3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -107,8 +107,8 @@ public interface AsyncTableRegionLocator {
/**
* Retrieves all of the regions associated with this table.
* <p/>
- * Usually we will go to meta table directly in this method so there is no {@code reload}
- * parameter.
+ * We will go to meta table directly in this method so there is no {@code reload} parameter. So
+ * please use with caution as this could generate great load to a cluster.
* <p/>
* Notice that the location for region replicas other than the default replica are also returned.
* @return a {@link List} of all regions associated with this table.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index ad6a051..83b601e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -54,12 +53,7 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
- if (TableName.isMetaTableName(tableName)) {
- return conn.registry.getMetaRegionLocations()
- .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
- }
- return ClientMetaTableAccessor
- .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
+ return conn.getLocator().getAllRegionLocations(tableName, true);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
index cd22d78..569d728 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,11 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
interface ConnectionRegistry extends Closeable {
/**
- * Get the location of meta region(s).
- */
- CompletableFuture<RegionLocations> getMetaRegionLocations();
-
- /**
* Should only be called once.
* <p>
* The upper layer should store this value somewhere as it will not be change any more.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 1228c7e..94ab7fa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -186,24 +186,6 @@ public final class ConnectionUtils {
return Arrays.copyOf(row, row.length + 1);
}
- /**
- * Create a row before the specified row and very close to the specified row.
- */
- static byte[] createCloseRowBefore(byte[] row) {
- if (row.length == 0) {
- return MAX_BYTE_ARRAY;
- }
- if (row[row.length - 1] == 0) {
- return Arrays.copyOf(row, row.length - 1);
- } else {
- byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
- System.arraycopy(row, 0, nextRow, 0, row.length - 1);
- nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
- System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length);
- return nextRow;
- }
- }
-
static boolean isEmptyStartRow(byte[] row) {
return Bytes.equals(row, EMPTY_START_ROW);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 4d0a591..e70aa97 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -243,7 +243,7 @@ public class MasterRegistry implements ConnectionRegistry {
return new RegionLocations(regionLocations);
}
- @Override
+ // keep the method here just for testing compatibility
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 16483ef..cbdc462 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -746,8 +746,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
if (TableName.isMetaTableName(tableName)) {
- return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
- .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
+ return getTableHRegionLocations(tableName).thenApply(
+ locs -> locs.stream().allMatch(loc -> loc != null && loc.getServerName() != null));
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
addListener(isTableEnabled(tableName), (enabled, error) -> {
@@ -763,7 +763,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.complete(false);
} else {
addListener(
- ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true),
(locations, error1) -> {
if (error1 != null) {
future.completeExceptionally(error1);
@@ -883,15 +883,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
- if (tableName.equals(META_TABLE_NAME)) {
- return connection.registry.getMetaRegionLocations()
- .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
- .collect(Collectors.toList()));
- } else {
- return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
- .thenApply(
- locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
- }
+ return getTableHRegionLocations(tableName).thenApply(
+ locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
}
@Override
@@ -1109,23 +1102,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
* List all region locations for the specific table.
*/
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (metaRegions == null || metaRegions.isEmpty() ||
- metaRegions.getDefaultRegionLocation() == null) {
- future.completeExceptionally(new IOException("meta region does not found"));
- } else {
- future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
- }
- });
- return future;
- } else {
- // For non-meta table, we fetch all locations by scanning hbase:meta table
- return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
- }
+ return connection.getRegionLocator(tableName).getAllRegionLocations();
}
/**
@@ -2375,9 +2352,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
// old format encodedName, should be meta region
- future = connection.registry.getMetaRegionLocations()
- .thenApply(locs -> Stream.of(locs.getRegionLocations())
- .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
+ future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+ .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
} else {
future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
regionNameOrEncodedRegionName);
@@ -2386,10 +2362,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
RegionInfo regionInfo =
CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
if (regionInfo.isMetaRegion()) {
- future = connection.registry.getMetaRegionLocations()
- .thenApply(locs -> Stream.of(locs.getRegionLocations())
- .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
- .findFirst());
+ future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+ .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
+ .findFirst());
} else {
future =
ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
index 950123c..2380fd8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -27,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* <li>{@link #AFTER} locate the region which contains the row after the given row.</li>
* </ul>
*/
-@InterfaceAudience.Private
-enum RegionLocateType {
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+public enum RegionLocateType {
BEFORE, CURRENT, AFTER
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
new file mode 100644
index 0000000..86f79ae
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isEqual;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The location cache for regions of a table.
+ */
+@InterfaceAudience.Private
+class TableRegionLocationCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableRegionLocationCache.class);
+
+ private final Optional<MetricsConnection> metrics;
+
+ private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
+ new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+
+ TableRegionLocationCache(Optional<MetricsConnection> metrics) {
+ this.metrics = metrics;
+ }
+
+ private void recordCacheHit() {
+ metrics.ifPresent(MetricsConnection::incrMetaCacheHit);
+ }
+
+ private void recordCacheMiss() {
+ metrics.ifPresent(MetricsConnection::incrMetaCacheMiss);
+ }
+
+ private void recordClearRegionCache() {
+ metrics.ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
+ }
+
+ private RegionLocations locateRow(TableName tableName, byte[] row, int replicaId) {
+ Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+ if (entry == null) {
+ recordCacheMiss();
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ recordCacheMiss();
+ return null;
+ }
+ byte[] endKey = loc.getRegion().getEndKey();
+ if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+ }
+ recordCacheHit();
+ return locs;
+ } else {
+ recordCacheMiss();
+ return null;
+ }
+ }
+
+ private RegionLocations locateRowBefore(TableName tableName, byte[] row, int replicaId) {
+ boolean isEmptyStopRow = isEmptyStopRow(row);
+ Map.Entry<byte[], RegionLocations> entry =
+ isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+ if (entry == null) {
+ recordCacheMiss();
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ recordCacheMiss();
+ return null;
+ }
+ if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+ (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+ }
+ recordCacheHit();
+ return locs;
+ } else {
+ recordCacheMiss();
+ return null;
+ }
+ }
+
+ RegionLocations locate(TableName tableName, byte[] row, int replicaId,
+ RegionLocateType locateType) {
+ return locateType.equals(RegionLocateType.BEFORE) ? locateRowBefore(tableName, row, replicaId) :
+ locateRow(tableName, row, replicaId);
+ }
+
+ // if we successfully add the locations to cache, return the locations, otherwise return the one
+ // which prevents us being added. The upper layer can use this value to complete pending requests.
+ RegionLocations add(RegionLocations locs) {
+ LOG.trace("Try adding {} to cache", locs);
+ byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
+ if (oldLocs == null) {
+ return locs;
+ }
+ // check whether the regions are the same, this usually happens when table is split/merged, or
+ // deleted and recreated again.
+ RegionInfo region = locs.getRegionLocation().getRegion();
+ RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
+ if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
+ RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
+ if (isEqual(mergedLocs, oldLocs)) {
+ // the merged one is the same with the old one, give up
+ LOG.trace("Will not add {} to cache because the old value {} " +
+ " is newer than us or has the same server name." +
+ " Maybe it is updated before we replace it", locs, oldLocs);
+ return oldLocs;
+ }
+ if (cache.replace(startKey, oldLocs, mergedLocs)) {
+ return mergedLocs;
+ }
+ } else {
+ // the region is different, here we trust the one we fetched. This maybe wrong but finally
+ // the upper layer can detect this and trigger removal of the wrong locations
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
+ " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
+ }
+ if (cache.replace(startKey, oldLocs, locs)) {
+ return locs;
+ }
+ }
+ }
+ }
+
+ // notice that this is not a constant time operation, do not call it on critical path.
+ int size() {
+ return cache.size();
+ }
+
+ void clearCache(ServerName serverName) {
+ for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
+ byte[] regionName = entry.getKey();
+ RegionLocations locs = entry.getValue();
+ RegionLocations newLocs = locs.removeByServer(serverName);
+ if (locs == newLocs) {
+ continue;
+ }
+ if (newLocs.isEmpty()) {
+ cache.remove(regionName, locs);
+ } else {
+ cache.replace(regionName, locs, newLocs);
+ }
+ }
+ }
+
+ void removeLocationFromCache(HRegionLocation loc) {
+ byte[] startKey = loc.getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = cache.get(startKey);
+ if (oldLocs == null) {
+ return;
+ }
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
+ }
+ RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+ if (newLocs == null) {
+ if (cache.remove(startKey, oldLocs)) {
+ recordClearRegionCache();
+ return;
+ }
+ } else {
+ if (cache.replace(startKey, oldLocs, newLocs)) {
+ recordClearRegionCache();
+ return;
+ }
+ }
+ }
+ }
+
+ RegionLocations get(byte[] key) {
+ return cache.get(key);
+ }
+
+ // only used for testing whether we have cached the location for a table.
+ @VisibleForTesting
+ int getNumberOfCachedRegionLocations() {
+ return cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
index 42a4188..04ea535 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForR
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
+
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -42,7 +43,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@@ -112,7 +115,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
- CompletableFuture<RegionLocations> future) {
+ CompletableFuture<RegionLocations> future) {
remaining.decrement();
if (remaining.intValue() > 0) {
return;
@@ -120,8 +123,8 @@ class ZKConnectionRegistry implements ConnectionRegistry {
future.complete(new RegionLocations(locs));
}
- private Pair<RegionState.State, ServerName> getStateAndServerName(
- ZooKeeperProtos.MetaRegionServer proto) {
+ private Pair<RegionState.State, ServerName>
+ getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
RegionState.State state;
if (proto.hasState()) {
state = RegionState.State.convert(proto.getState());
@@ -134,7 +137,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
- List<String> metaReplicaZNodes) {
+ List<String> metaReplicaZNodes) {
if (metaReplicaZNodes.isEmpty()) {
future.completeExceptionally(new IOException("No meta znode available"));
}
@@ -190,13 +193,12 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
}
- @Override
+ // keep the method here just for testing compatibility
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
- zk.list(znodePaths.baseZNode)
- .thenApply(children -> children.stream()
- .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
+ zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
+ .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
@@ -219,14 +221,13 @@ class ZKConnectionRegistry implements ConnectionRegistry {
@Override
public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
- .thenApply(proto -> {
- if (proto == null) {
- return null;
- }
- HBaseProtos.ServerName snProto = proto.getMaster();
- return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
- snProto.getStartCode());
- });
+ .thenApply(proto -> {
+ if (proto == null) {
+ return null;
+ }
+ HBaseProtos.ServerName snProto = proto.getMaster();
+ return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode());
+ });
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 4cdbfbc..1f6e057 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLoadStats;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
@@ -3543,4 +3544,30 @@ public final class ProtobufUtil {
return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName()).addAllServers(hostports)
.addAllTables(tables).addAllConfiguration(configuration).build();
}
+
+ public static MasterProtos.RegionLocateType toProtoRegionLocateType(RegionLocateType pojo) {
+ switch (pojo) {
+ case BEFORE:
+ return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_BEFORE;
+ case CURRENT:
+ return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_CURRENT;
+ case AFTER:
+ return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_AFTER;
+ default:
+ throw new IllegalArgumentException("Unknown RegionLocateType: " + pojo);
+ }
+ }
+
+ public static RegionLocateType toRegionLocateType(MasterProtos.RegionLocateType proto) {
+ switch (proto) {
+ case REGION_LOCATE_TYPE_BEFORE:
+ return RegionLocateType.BEFORE;
+ case REGION_LOCATE_TYPE_CURRENT:
+ return RegionLocateType.CURRENT;
+ case REGION_LOCATE_TYPE_AFTER:
+ return RegionLocateType.AFTER;
+ default:
+ throw new IllegalArgumentException("Unknown proto RegionLocateType: " + proto);
+ }
+ }
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
index 4bd6687..64ded7f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,11 +32,6 @@ class DoNothingConnectionRegistry implements ConnectionRegistry {
}
@Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
public CompletableFuture<String> getClusterId() {
return CompletableFuture.completedFuture(null);
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
deleted file mode 100644
index b306500..0000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ClientTests.class, SmallTests.class })
-public class TestAsyncMetaRegionLocatorFailFast {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncMetaRegionLocatorFailFast.class);
-
- private static Configuration CONF = HBaseConfiguration.create();
-
- private static AsyncMetaRegionLocator LOCATOR;
-
- private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry {
-
- public FaultyConnectionRegistry(Configuration conf) {
- super(conf);
- }
-
- @Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error"));
- }
- }
-
- @BeforeClass
- public static void setUp() {
- LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF));
- }
-
- @Test(expected = DoNotRetryIOException.class)
- public void test() throws IOException {
- FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false));
- }
-}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 286c96f..44d5125 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -1269,6 +1269,29 @@ message GetMetaRegionLocationsResponse {
repeated RegionLocation meta_locations = 1;
}
+enum RegionLocateType {
+ REGION_LOCATE_TYPE_BEFORE =1;
+ REGION_LOCATE_TYPE_CURRENT = 2;
+ REGION_LOCATE_TYPE_AFTER = 3;
+}
+
+message LocateMetaRegionRequest {
+ required bytes row = 1;
+ required RegionLocateType locateType = 2;
+}
+
+message LocateMetaRegionResponse {
+ repeated RegionLocation meta_locations = 1;
+}
+
+message GetAllMetaRegionLocationsRequest {
+ required bool exclude_offlined_split_parents = 1;
+}
+
+message GetAllMetaRegionLocationsResponse {
+ repeated RegionLocation meta_locations = 1;
+}
+
/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
*/
@@ -1287,4 +1310,16 @@ service ClientMetaService {
* Get current meta replicas' region locations.
*/
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
+
+ /**
+ * Get meta region locations for a given row
+ */
+ rpc LocateMetaRegion(LocateMetaRegionRequest)
+ returns(LocateMetaRegionResponse);
+
+ /**
+ * Get all meta regions locations
+ */
+ rpc GetAllMetaRegionLocations(GetAllMetaRegionLocationsRequest)
+ returns(GetAllMetaRegionLocationsResponse);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ffe4f53..92b3f6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.RegionPlan;
@@ -1765,7 +1767,7 @@ public interface MasterObserver {
throws IOException {
}
- /*
+ /**
* Called before checking if user has permissions.
* @param ctx the coprocessor instance's environment
* @param userName the user name
@@ -1784,4 +1786,44 @@ public interface MasterObserver {
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}
+
+ /**
+ * Called before locating meta region.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param row the row key to locate
+ * @param locateType the direction of the locate operation
+ */
+ default void preLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+ RegionLocateType locateType) throws IOException {
+ }
+
+ /**
+ * Called after locating meta region.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param row the row key to locate
+ * @param locateType the direction of the locate operation
+ * @param locs the locations of the given meta region, including meta replicas if any.
+ */
+ default void postLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+ RegionLocateType locateType, List<HRegionLocation> locs) throws IOException {
+ }
+
+ /**
+ * Called before getting all locations for meta regions.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param excludeOfflinedSplitParents don't return split parents
+ */
+ default void preGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ boolean excludeOfflinedSplitParents) {
+ }
+
+ /**
+ * Called after getting all locations for meta regions.
+ * @param ctx ctx the coprocessor instance's environment
+ * @param excludeOfflinedSplitParents don't return split parents
+ * @param locs the locations of all meta regions, including meta replicas if any.
+ */
+ default void postGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ boolean excludeOfflinedSplitParents, List<HRegionLocation> locs) {
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2e2136e..2e59f0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -72,11 +72,13 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -89,7 +91,9 @@ import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -219,7 +223,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -882,6 +885,27 @@ public class HMaster extends HRegionServer implements MasterServices {
return new AssignmentManager(master, masterRegion);
}
+ /**
+ * Load the meta region state from the meta region server ZNode.
+ * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
+ * @param replicaId the ID of the replica
+ * @return regionstate
+ * @throws KeeperException if a ZooKeeper operation fails
+ */
+ private static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
+ throws KeeperException {
+ RegionState regionState = null;
+ try {
+ byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
+ regionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return regionState;
+ }
+
private void tryMigrateRootTableFromZooKeeper() throws IOException, KeeperException {
// try migrate data from zookeeper
try (RegionScanner scanner =
@@ -903,7 +927,7 @@ public class HMaster extends HRegionServer implements MasterServices {
StringBuilder info = new StringBuilder("Migrating meta location:");
for (String metaReplicaNode : metaReplicaNodes) {
int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZnode(metaReplicaNode);
- RegionState state = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
+ RegionState state = getMetaRegionState(zooKeeper, replicaId);
info.append(" ").append(state);
put.setTimestamp(state.getStamp());
MetaTableAccessor.addRegionInfo(put, state.getRegion());
@@ -3993,4 +4017,85 @@ public class HMaster extends HRegionServer implements MasterServices {
public RSGroupInfoManager getRSGroupInfoManager() {
return rsGroupInfoManager;
}
+
+ public RegionLocations locateMeta(byte[] row, RegionLocateType locateType) throws IOException {
+ if (locateType == RegionLocateType.AFTER) {
+ // as we know the exact row after us, so we can just create the new row, and use the same
+ // algorithm to locate it.
+ row = Arrays.copyOf(row, row.length + 1);
+ locateType = RegionLocateType.CURRENT;
+ }
+ Scan scan =
+ CatalogFamilyFormat.createRegionLocateScan(TableName.META_TABLE_NAME, row, locateType, 1);
+ try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+ boolean moreRows;
+ List<Cell> cells = new ArrayList<>();
+ do {
+ moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Result result = Result.create(cells);
+ cells.clear();
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+ if (locs == null || locs.getDefaultRegionLocation() == null) {
+ LOG.warn("No location found when locating meta region with row='{}', locateType={}",
+ Bytes.toStringBinary(row), locateType);
+ return null;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ RegionInfo info = loc.getRegion();
+ if (info == null) {
+ LOG.warn("HRegionInfo is null when locating meta region with row='{}', locateType={}",
+ Bytes.toStringBinary(row), locateType);
+ return null;
+ }
+ if (info.isSplitParent()) {
+ continue;
+ }
+ return locs;
+ } while (moreRows);
+ LOG.warn("No location available when locating meta region with row='{}', locateType={}",
+ Bytes.toStringBinary(row), locateType);
+ return null;
+ }
+ }
+
+ public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+ throws IOException {
+ Scan scan = new Scan().addFamily(HConstants.CATALOG_FAMILY);
+ List<RegionLocations> list = new ArrayList<>();
+ try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+ boolean moreRows;
+ List<Cell> cells = new ArrayList<>();
+ do {
+ moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Result result = Result.create(cells);
+ cells.clear();
+ RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+ if (locs == null) {
+ LOG.warn("No locations in {}", result);
+ continue;
+ }
+ HRegionLocation loc = locs.getRegionLocation();
+ if (loc == null) {
+ LOG.warn("No non null location in {}", result);
+ continue;
+ }
+ RegionInfo info = loc.getRegion();
+ if (info == null) {
+ LOG.warn("No serialized RegionInfo in {}", result);
+ continue;
+ }
+ if (excludeOfflinedSplitParents && info.isSplitParent()) {
+ continue;
+ }
+ list.add(locs);
+ } while (moreRows);
+ }
+ return list;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 28c9c30..ae29375 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -2039,4 +2041,42 @@ public class MasterCoprocessorHost
}
});
}
+
+ public void preLocateMetaRegion(byte[] row, RegionLocateType locateType) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.preLocateMetaRegion(this, row, locateType);
+ }
+ });
+ }
+
+ public void postLocateMetaRegion(byte[] row, RegionLocateType locateType,
+ List<HRegionLocation> locs) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.postLocateMetaRegion(this, row, locateType, locs);
+ }
+ });
+ }
+
+ public void preGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetAllMetaRegionLocations(this, excludeOfflinedSplitParents);
+ }
+ });
+ }
+
+ public void postGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents,
+ List<HRegionLocation> locs) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.postGetAllMetaRegionLocations(this, excludeOfflinedSplitParents, locs);
+ }
+ });
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index c3fc33a..914b7ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -30,7 +30,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -81,22 +83,27 @@ class MasterMetaBootstrap {
}
for (int i = 1; i < numReplicas; i++) {
RegionInfo secondaryRegionInfo = RegionReplicaUtil.getRegionInfoForReplica(regionInfo, i);
- RegionState secondaryRegionState = regionStates.getRegionState(secondaryRegionInfo);
- ServerName sn = null;
- if (secondaryRegionState != null) {
- sn = secondaryRegionState.getServerName();
- if (sn != null && !metaServerNames.add(sn)) {
- LOG.info("{} old location {} is same with other hbase:meta replica location;" +
- " setting location as null...", secondaryRegionInfo.getRegionNameAsString(), sn);
- sn = null;
- }
- }
+ RegionStateNode secondaryRegionState =
+ regionStates.getOrCreateRegionStateNode(secondaryRegionInfo);
// These assigns run inline. All is blocked till they complete. Only interrupt is shutting
// down hosting server which calls AM#stop.
- if (sn != null) {
- am.assignAsync(secondaryRegionInfo, sn);
- } else {
+ if (secondaryRegionState.getState() == State.OFFLINE) {
+ LOG.info("Assign new meta region replica {}", secondaryRegionInfo);
am.assignAsync(secondaryRegionInfo);
+ } else if (secondaryRegionState.getProcedure() == null) {
+ ServerName sn = secondaryRegionState.getRegionLocation();
+ if (sn != null) {
+ if (!metaServerNames.add(sn)) {
+ LOG.info(
+ "{} old location {} is same with other hbase:meta replica location;" +
+ " setting location as null...",
+ secondaryRegionInfo.getRegionNameAsString(),
+ secondaryRegionState.getRegionLocation());
+ am.moveAsync(new RegionPlan(secondaryRegionInfo, sn, null));
+ } else {
+ regionStates.addRegionToServer(secondaryRegionState);
+ }
+ }
}
}
}
@@ -119,8 +126,10 @@ class MasterMetaBootstrap {
}
RegionState regionState = regionStates.getRegionState(regionInfo);
try {
- ServerManager.closeRegionSilentlyAndWait(master.getAsyncClusterConnection(),
- regionState.getServerName(), regionInfo, 30000);
+ if (regionState.getServerName() != null) {
+ ServerManager.closeRegionSilentlyAndWait(master.getAsyncClusterConnection(),
+ regionState.getServerName(), regionInfo, 30000);
+ }
if (regionInfo.isFirst()) {
// for compatibility, also try to remove the replicas on zk.
ZKUtil.deleteNode(zooKeeper,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index fc29a38..8882aa8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
@@ -50,7 +51,7 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
@@ -113,7 +114,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -197,6 +197,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaReq
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
@@ -252,6 +254,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
@@ -391,10 +395,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
-public class MasterRpcServices extends RSRpcServices implements
- MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
- LockService.BlockingInterface, HbckService.BlockingInterface,
- ClientMetaService.BlockingInterface {
+public class MasterRpcServices extends RSRpcServices implements MasterService.BlockingInterface,
+ RegionServerStatusService.BlockingInterface, LockService.BlockingInterface,
+ HbckService.BlockingInterface, ClientMetaService.BlockingInterface {
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
private static final Logger AUDITLOG =
@@ -529,18 +532,17 @@ public class MasterRpcServices extends RSRpcServices implements
@Override
protected List<BlockingServiceAndInterface> getServices() {
List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
- bssi.add(new BlockingServiceAndInterface(
- MasterService.newReflectiveBlockingService(this),
- MasterService.BlockingInterface.class));
- bssi.add(new BlockingServiceAndInterface(
- RegionServerStatusService.newReflectiveBlockingService(this),
+ bssi.add(new BlockingServiceAndInterface(MasterService.newReflectiveBlockingService(this),
+ MasterService.BlockingInterface.class));
+ bssi.add(
+ new BlockingServiceAndInterface(RegionServerStatusService.newReflectiveBlockingService(this),
RegionServerStatusService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this),
- LockService.BlockingInterface.class));
+ LockService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this),
- HbckService.BlockingInterface.class));
+ HbckService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
- ClientMetaService.BlockingInterface.class));
+ ClientMetaService.BlockingInterface.class));
bssi.addAll(super.getServices());
return bssi;
}
@@ -1691,40 +1693,33 @@ public class MasterRpcServices extends RSRpcServices implements
}
@Override
- public UnassignRegionResponse unassignRegion(RpcController controller,
- UnassignRegionRequest req) throws ServiceException {
+ public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
+ throws ServiceException {
try {
- final byte [] regionName = req.getRegion().getValue().toByteArray();
+ final byte[] regionName = req.getRegion().getValue().toByteArray();
RegionSpecifierType type = req.getRegion().getType();
final boolean force = req.getForce();
UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
master.checkInitialized();
if (type != RegionSpecifierType.REGION_NAME) {
- LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
- + " actual: " + type);
+ LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME +
+ " actual: " + type);
}
- Pair<RegionInfo, ServerName> pair =
- MetaTableAccessor.getRegion(master.getConnection(), regionName);
- if (Bytes.equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName(), regionName)) {
- pair = new Pair<>(RegionInfoBuilder.FIRST_META_REGIONINFO,
- MetaTableLocator.getMetaRegionLocation(master.getZooKeeper()));
- }
- if (pair == null) {
- throw new UnknownRegionException(Bytes.toString(regionName));
+ final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
+ if (regionInfo == null) {
+ throw new UnknownRegionException(Bytes.toStringBinary(regionName));
}
-
- RegionInfo hri = pair.getFirst();
if (master.cpHost != null) {
- master.cpHost.preUnassign(hri, force);
+ master.cpHost.preUnassign(regionInfo, force);
}
- LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
- + " in current location if it is online and reassign.force=" + force);
- master.getAssignmentManager().unassign(hri);
+ LOG
+ .debug(master.getClientIdAuditPrefix() + " unassign " + regionInfo.getRegionNameAsString() +
+ " in current location if it is online and reassign.force=" + force);
+ master.getAssignmentManager().unassign(regionInfo);
if (master.cpHost != null) {
- master.cpHost.postUnassign(hri, force);
+ master.cpHost.postUnassign(regionInfo, force);
}
-
return urr;
} catch (IOException ioe) {
throw new ServiceException(ioe);
@@ -3274,4 +3269,67 @@ public class MasterRpcServices extends RSRpcServices implements
}
return builder.build();
}
+
+ @Override
+ public LocateMetaRegionResponse locateMetaRegion(RpcController controller,
+ LocateMetaRegionRequest request) throws ServiceException {
+ byte[] row = request.getRow().toByteArray();
+ RegionLocateType locateType = ProtobufUtil.toRegionLocateType(request.getLocateType());
+ try {
+ master.checkServiceStarted();
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preLocateMetaRegion(row, locateType);
+ }
+ RegionLocations locs = master.locateMeta(row, locateType);
+ List<HRegionLocation> list = new ArrayList<>();
+ LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
+ if (locs != null) {
+ for (HRegionLocation loc : locs) {
+ if (loc != null) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ list.add(loc);
+ }
+ }
+ }
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postLocateMetaRegion(row, locateType, list);
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GetAllMetaRegionLocationsResponse getAllMetaRegionLocations(RpcController controller,
+ GetAllMetaRegionLocationsRequest request) throws ServiceException {
+ boolean excludeOfflinedSplitParents = request.getExcludeOfflinedSplitParents();
+ try {
+ master.checkServiceStarted();
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preGetAllMetaRegionLocations(excludeOfflinedSplitParents);
+ }
+ List<RegionLocations> locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
+ List<HRegionLocation> list = new ArrayList<>();
+ GetAllMetaRegionLocationsResponse.Builder builder =
+ GetAllMetaRegionLocationsResponse.newBuilder();
+ if (locs != null) {
+ for (RegionLocations ls : locs) {
+ for (HRegionLocation loc : ls) {
+ if (loc != null) {
+ builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+ list.add(loc);
+ }
+ }
+ }
+ }
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postGetAllMetaRegionLocations(excludeOfflinedSplitParents,
+ list);
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 02e9dce..b46acfc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -552,4 +553,12 @@ public interface MasterServices extends Server {
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();
+
+ /**
+ * Get locations for all meta regions.
+ * @param excludeOfflinedSplitParents don't return split parents
+ * @return The locations of all the meta regions
+ */
+ List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+ throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
index 58e57c4..198208f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
@@ -27,9 +27,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.tmpl.master.MasterStatusTmpl;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -80,7 +80,8 @@ public class MasterStatusServlet extends HttpServlet {
}
private ServerName getMetaLocationOrNull(HMaster master) {
- return MetaTableLocator.getMetaRegionLocation(master.getZooKeeper());
+ return master.getAssignmentManager().getRegionStates()
+ .getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO).getServerName();
}
private Map<String, Integer> getFragmentationInfo(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
index f4e91b5..fef1a84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
@@ -39,11 +39,14 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
- * A cache of meta region location metadata. Registers a listener on ZK to track changes to the
- * meta table znodes. Clients are expected to retry if the meta information is stale. This class
- * is thread-safe (a single instance of this class can be shared by multiple threads without race
+ * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta
+ * table znodes. Clients are expected to retry if the meta information is stale. This class is
+ * thread-safe (a single instance of this class can be shared by multiple threads without race
* conditions).
+ * @deprecated Now we store meta location in the local store at master side so we should get the
+ * meta location from active master instead of zk, keep it here only for compatibility.
*/
+@Deprecated
@InterfaceAudience.Private
public class MetaRegionLocationCache extends ZKListener {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 2723185..5a73f9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -166,8 +166,8 @@ public class RegionStateStore {
final Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
MetaTableAccessor.addRegionInfo(put, regionInfo);
final StringBuilder info =
- new StringBuilder("pid=").append(pid).append(" updating hbase:meta row=")
- .append(regionInfo.getEncodedName()).append(", regionState=").append(state);
+ new StringBuilder("pid=").append(pid).append(" updating catalog row=")
+ .append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state);
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
"Open region should be on a server");
@@ -210,7 +210,7 @@ public class RegionStateStore {
}
public void mirrorMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
- throws IOException {
+ throws IOException {
try {
MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(),
state);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 0cd9972..1c2981c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -365,8 +365,6 @@ public class CreateTableProcedure
final List<RegionInfo> regions) throws IOException {
assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
- ProcedureSyncWait.waitMetaRegions(env);
-
// Add replicas if needed
// we need to create regions with replicaIds starting from 1
List<RegionInfo> newRegions = RegionReplicaUtil.addReplicas(tableDescriptor, regions, 1,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 46621da..b28c95f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -222,18 +220,6 @@ public final class ProcedureSyncWait {
throw new TimeoutIOException("Timed out while waiting on " + purpose);
}
- protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
- int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
- try {
- if (MetaTableLocator.waitMetaRegionLocation(env.getMasterServices().getZooKeeper(),
- timeout) == null) {
- throw new NotAllMetaRegionsOnlineException();
- }
- } catch (InterruptedException e) {
- throw (InterruptedIOException) new InterruptedIOException().initCause(e);
- }
- }
-
protected static void waitRegionInTransition(final MasterProcedureEnv env,
final List<RegionInfo> regions) throws IOException {
final RegionStates states = env.getAssignmentManager().getRegionStates();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
index 23d0263..507058b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
@@ -21,8 +21,11 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -36,7 +39,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -159,7 +161,9 @@ public final class MasterSnapshotVerifier {
private void verifyRegions(final SnapshotManifest manifest) throws IOException {
List<RegionInfo> regions;
if (TableName.META_TABLE_NAME.equals(tableName)) {
- regions = MetaTableLocator.getMetaRegions(services.getZooKeeper());
+ regions = services.getAllMetaRegionLocations(false).stream()
+ .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+ .map(HRegionLocation::getRegion).collect(Collectors.toList());
} else {
regions = MetaTableAccessor.getTableRegions(services.getConnection(), tableName);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index e491467..f4b0f3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -52,7 +54,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -193,12 +194,15 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
monitor.rethrowException();
List<Pair<RegionInfo, ServerName>> regionsAndLocations;
+
if (TableName.META_TABLE_NAME.equals(snapshotTable)) {
- regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
- server.getZooKeeper());
+ regionsAndLocations = master.getAllMetaRegionLocations(false).stream()
+ .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+ .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+ .collect(Collectors.toList());
} else {
- regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
- server.getConnection(), snapshotTable, false);
+ regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(server.getConnection(),
+ snapshotTable, false);
}
// run the snapshot
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index 1f7a5e2..595d15b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -24,12 +24,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -129,20 +129,16 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
// Each region server will get its own online regions for the table.
// We may still miss regions that need to be flushed.
List<Pair<RegionInfo, ServerName>> regionsAndLocations;
-
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
- master.getZooKeeper());
- } else {
- regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
- master.getConnection(), tableName, false);
+ try (RegionLocator locator =
+ master.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ regionsAndLocations = locator.getAllRegionLocations().stream()
+ .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+ .collect(Collectors.toList());
}
Set<String> regionServers = new HashSet<>(regionsAndLocations.size());
for (Pair<RegionInfo, ServerName> region : regionsAndLocations) {
if (region != null && region.getFirst() != null && region.getSecond() != null) {
- RegionInfo hri = region.getFirst();
- if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
regionServers.add(region.getSecond().toString());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 40a009c..3c78b30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -793,8 +793,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
+ MAX_FLUSH_PER_CHANGES);
}
- this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
- DEFAULT_ROWLOCK_WAIT_DURATION);
+ this.rowLockWaitDuration =
+ conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION);
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 43fd908..104c9b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
-import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -177,7 +176,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -2309,8 +2307,8 @@ public class HRegionServer extends Thread implements
}
/**
- * Helper method for use in tests. Skip the region transition report when there's no master
- * around to receive it.
+ * Helper method for use in tests. Skip the region transition report when there's no master around
+ * to receive it.
*/
private boolean skipReportingTransition(final RegionStateTransitionContext context) {
final TransitionCode code = context.getCode();
@@ -2321,17 +2319,13 @@ public class HRegionServer extends Thread implements
if (code == TransitionCode.OPENED) {
Preconditions.checkArgument(hris != null && hris.length == 1);
if (hris[0].isMetaRegion()) {
- try {
- MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
- hris[0].getReplicaId(), RegionState.State.OPEN);
- } catch (KeeperException e) {
- LOG.info("Failed to update meta location", e);
- return false;
- }
+ LOG.warn(
+ "meta table location is stored in master local store, so we can not skip reporting");
+ return false;
} else {
try {
MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
- serverName, openSeqNum, masterSystemTime);
+ serverName, openSeqNum, masterSystemTime);
} catch (IOException e) {
LOG.info("Failed to update meta", e);
return false;
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
index 1a34b7f..292f001 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
@@ -62,7 +62,6 @@
<%@ page import="org.apache.hadoop.hbase.quotas.ThrottleSettings" %>
<%@ page import="org.apache.hadoop.hbase.util.Bytes" %>
<%@ page import="org.apache.hadoop.hbase.util.FSUtils" %>
-<%@ page import="org.apache.hadoop.hbase.zookeeper.MetaTableLocator" %>
<%@ page import="org.apache.hadoop.util.StringUtils" %>
<%@ page import="org.apache.hbase.thirdparty.com.google.protobuf.ByteString" %>
<%@ page import="org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos" %>
@@ -308,7 +307,8 @@
for (int j = 0; j < numMetaReplicas; j++) {
RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
- ServerName metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
+ RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+ ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
for (int i = 0; i < 1; i++) {
String hostAndPort = "";
String readReq = "N/A";
@@ -373,7 +373,8 @@
for (int j = 0; j < numMetaReplicas; j++) {
RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
- ServerName metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
+ RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+ ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
for (int i = 0; i < 1; i++) {
String hostAndPort = "";
float locality = 0.0f;
@@ -421,7 +422,8 @@
for (int j = 0; j < numMetaReplicas; j++) {
RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
- ServerName metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
+ RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+ ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
for (int i = 0; i < 1; i++) {
String hostAndPort = "";
long compactingCells = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 112fa01..a3d78aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -317,13 +316,6 @@ public class TestMetaTableAccessor {
}
@Test
- public void testGetRegionsFromMetaTable() throws IOException, InterruptedException {
- List<RegionInfo> regions = MetaTableLocator.getMetaRegions(UTIL.getZooKeeperWatcher());
- assertTrue(regions.size() >= 1);
- assertTrue(MetaTableLocator.getMetaRegionsAndLocations(UTIL.getZooKeeperWatcher()).size() >= 1);
- }
-
- @Test
public void testTableExists() throws IOException {
final TableName tableName = TableName.valueOf(name.getMethodName());
assertFalse(MetaTableAccessor.tableExists(connection, tableName));
@@ -600,7 +592,7 @@ public class TestMetaTableAccessor {
UTIL.createTable(tableName, FAMILY, SPLIT_KEYS);
Table table = connection.getTable(tableName);
// Make sure all the regions are deployed
- UTIL.countRows(table);
+ HBaseTestingUtility.countRows(table);
ClientMetaTableAccessor.Visitor visitor = mock(ClientMetaTableAccessor.Visitor.class);
doReturn(true).when(visitor).visit((Result) anyObject());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
deleted file mode 100644
index 9274fa0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
-
-/**
- * Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}
- */
-@Category({ MiscTests.class, MediumTests.class })
-public class TestMetaTableLocator {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMetaTableLocator.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableLocator.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static final ServerName SN =
- ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
- private ZKWatcher watcher;
- private Abortable abortable;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // Set this down so tests run quicker
- UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
- UTIL.startMiniZKCluster();
- }
-
- @AfterClass
- public static void afterClass() throws IOException {
- UTIL.getZkCluster().shutdown();
- }
-
- @Before
- public void before() throws IOException {
- this.abortable = new Abortable() {
- @Override
- public void abort(String why, Throwable e) {
- LOG.info(why, e);
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- };
- this.watcher =
- new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
- }
-
- @After
- public void after() {
- try {
- // Clean out meta location or later tests will be confused... they presume
- // start fresh in zk.
- MetaTableLocator.deleteMetaLocation(this.watcher);
- } catch (KeeperException e) {
- LOG.warn("Unable to delete hbase:meta location", e);
- }
-
- this.watcher.close();
- }
-
- /**
- * Test normal operations
- */
- @Test
- public void testMetaLookup()
- throws IOException, InterruptedException, ServiceException, KeeperException {
- final ClientProtos.ClientService.BlockingInterface client =
- Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
-
- Mockito.when(client.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
- .thenReturn(GetResponse.newBuilder().build());
-
- assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
- for (RegionState.State state : RegionState.State.values()) {
- if (state.equals(RegionState.State.OPEN)) {
- continue;
- }
- MetaTableLocator.setMetaLocation(this.watcher, SN, state);
- assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
- assertEquals(state, MetaTableLocator.getMetaRegionState(this.watcher).getState());
- }
- MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
- assertEquals(SN, MetaTableLocator.getMetaRegionLocation(this.watcher));
- assertEquals(RegionState.State.OPEN,
- MetaTableLocator.getMetaRegionState(this.watcher).getState());
-
- MetaTableLocator.deleteMetaLocation(this.watcher);
- assertNull(MetaTableLocator.getMetaRegionState(this.watcher).getServerName());
- assertEquals(RegionState.State.OFFLINE,
- MetaTableLocator.getMetaRegionState(this.watcher).getState());
- assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
- }
-
- @Test(expected = NotAllMetaRegionsOnlineException.class)
- public void testTimeoutWaitForMeta() throws IOException, InterruptedException {
- MetaTableLocator.waitMetaRegionLocation(watcher, 100);
- }
-
- /**
- * Test waiting on meat w/ no timeout specified.
- */
- @Test
- public void testNoTimeoutWaitForMeta() throws IOException, InterruptedException, KeeperException {
- ServerName hsa = MetaTableLocator.getMetaRegionLocation(watcher);
- assertNull(hsa);
-
- // Now test waiting on meta location getting set.
- Thread t = new WaitOnMetaThread();
- startWaitAliveThenWaitItLives(t, 1);
- // Set a meta location.
- MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
- hsa = SN;
- // Join the thread... should exit shortly.
- t.join();
- // Now meta is available.
- assertTrue(MetaTableLocator.getMetaRegionLocation(watcher).equals(hsa));
- }
-
- private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
- t.start();
- UTIL.waitFor(2000, t::isAlive);
- // Wait one second.
- Threads.sleep(ms);
- assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
- }
-
- /**
- * Wait on META.
- */
- class WaitOnMetaThread extends Thread {
-
- WaitOnMetaThread() {
- super("WaitOnMeta");
- }
-
- @Override
- public void run() {
- try {
- doWaiting();
- } catch (InterruptedException e) {
- throw new RuntimeException("Failed wait", e);
- }
- LOG.info("Exiting " + getName());
- }
-
- void doWaiting() throws InterruptedException {
- try {
- for (;;) {
- if (MetaTableLocator.waitMetaRegionLocation(watcher, 10000) != null) {
- break;
- }
- }
- } catch (NotAllMetaRegionsOnlineException e) {
- // Ignore
- }
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index 89f287b..cb10e38 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -58,10 +58,7 @@ public abstract class AbstractTestRegionLocator {
}
UTIL.getAdmin().createTable(td, SPLIT_KEYS);
UTIL.waitTableAvailable(TABLE_NAME);
- try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
- }
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
UTIL.getAdmin().balancerSwitch(false, true);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index c9d67f4..ea1122c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
/**
@@ -32,11 +31,6 @@ public class DummyConnectionRegistry implements ConnectionRegistry {
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
@Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return null;
- }
-
- @Override
public CompletableFuture<String> getClusterId() {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
index 78e3e54..977a274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
@@ -29,12 +29,12 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.AfterClass;
import org.junit.Rule;
import org.slf4j.Logger;
@@ -64,8 +64,11 @@ public class MetaWithReplicasTestBase {
TEST_UTIL.startMiniCluster(option);
AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
Set<ServerName> sns = new HashSet<ServerName>();
- ServerName hbaseMetaServerName =
- MetaTableLocator.getMetaRegionLocation(TEST_UTIL.getZooKeeperWatcher());
+ ServerName hbaseMetaServerName;
+ try (RegionLocator locator =
+ TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ hbaseMetaServerName = locator.getRegionLocation(HConstants.EMPTY_START_ROW).getServerName();
+ }
LOG.info("HBASE:META DEPLOY: on " + hbaseMetaServerName);
sns.add(hbaseMetaServerName);
for (int replicaId = 1; replicaId < 3; replicaId++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index 8e562bd..8e220eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -43,11 +44,10 @@ public final class RegionReplicaTestHelper {
}
// waits for all replicas to have region location
- static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util,
- ConnectionRegistry registry) {
+ static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util) {
Configuration conf = util.getConfiguration();
int regionReplicaCount = util.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
- HConstants.DEFAULT_META_REPLICA_NUM);
+ HConstants.DEFAULT_META_REPLICA_NUM);
Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
new ExplainingPredicate<IOException>() {
@Override
@@ -58,16 +58,20 @@ public final class RegionReplicaTestHelper {
@Override
public boolean evaluate() {
try {
- RegionLocations locs = registry.getMetaRegionLocations().get();
+ List<HRegionLocation> locs;
+ try (RegionLocator locator =
+ util.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locs = locator.getAllRegionLocations();
+ }
if (locs.size() < regionReplicaCount) {
return false;
}
for (int i = 0; i < regionReplicaCount; i++) {
- HRegionLocation loc = locs.getRegionLocation(i);
+ HRegionLocation loc = locs.get(i);
// Wait until the replica is served by a region server. There could be delay between
// the replica being available to the connection and region server opening it.
Optional<ServerName> rsCarryingReplica =
- getRSCarryingReplica(util, loc.getRegion().getTable(), i);
+ getRSCarryingReplica(util, loc.getRegion().getTable(), i);
if (!rsCarryingReplica.isPresent()) {
return false;
}
@@ -82,7 +86,7 @@ public final class RegionReplicaTestHelper {
}
static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
- int replicaId) {
+ int replicaId) {
return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.filter(rs -> rs.getRegions(tableName).stream()
.anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
@@ -93,7 +97,7 @@ public final class RegionReplicaTestHelper {
* Return the new location.
*/
static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
- throws Exception {
+ throws Exception {
ServerName serverName = currentLoc.getServerName();
RegionInfo regionInfo = currentLoc.getRegion();
TableName tableName = regionInfo.getTable();
@@ -120,13 +124,13 @@ public final class RegionReplicaTestHelper {
interface Locator {
RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
- throws Exception;
+ throws Exception;
void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
}
static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
- throws Exception {
+ throws Exception {
RegionLocations locs =
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
assertEquals(3, locs.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index 50111f7..79371a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -18,9 +18,9 @@
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.List;
@@ -53,11 +53,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TestAsyncAdminBase.setUpBeforeClass();
- try (ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
- RegionReplicaTestHelper
- .waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
- }
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
}
private void testMoveNonDefaultReplica(TableName tableName)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 003bef3..1382c02 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,9 +17,9 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -35,6 +35,8 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncMetaRegionLocator {
@@ -44,24 +46,25 @@ public class TestAsyncMetaRegionLocator {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static ConnectionRegistry REGISTRY;
+ private static AsyncConnectionImpl CONN;
- private static AsyncMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.waitUntilNoRegionsInTransition();
- REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+ CONN = (AsyncConnectionImpl) ConnectionFactory
+ .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
- LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
}
@AfterClass
public static void tearDown() throws Exception {
- IOUtils.closeQuietly(REGISTRY);
+ Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -71,14 +74,15 @@ public class TestAsyncMetaRegionLocator {
@Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
- throws Exception {
+ throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
- throws Exception {
- return LOCATOR.getRegionLocations(replicaId, reload).get();
+ throws Exception {
+ return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+ RegionLocateType.CURRENT, reload).get();
}
});
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index d8388de..b0643ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -22,10 +22,10 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.Arrays;
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -56,6 +55,8 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncNonMetaRegionLocator {
@@ -71,7 +72,7 @@ public class TestAsyncNonMetaRegionLocator {
private static AsyncConnectionImpl CONN;
- private static AsyncNonMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
private static byte[][] SPLIT_KEYS;
@@ -80,10 +81,10 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+ ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
- LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@@ -92,7 +93,7 @@ public class TestAsyncNonMetaRegionLocator {
@AfterClass
public static void tearDown() throws Exception {
- IOUtils.closeQuietly(CONN);
+ Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -114,7 +115,7 @@ public class TestAsyncNonMetaRegionLocator {
}
private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
- byte[] row, RegionLocateType locateType, boolean reload) {
+ byte[] row, RegionLocateType locateType, boolean reload) {
return LOCATOR
.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
.thenApply(RegionLocations::getDefaultRegionLocation);
@@ -145,7 +146,7 @@ public class TestAsyncNonMetaRegionLocator {
}
private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
- HRegionLocation loc) {
+ HRegionLocation loc) {
RegionInfo info = loc.getRegion();
assertEquals(TABLE_NAME, info.getTable());
assertArrayEquals(startKey, info.getStartKey());
@@ -359,7 +360,7 @@ public class TestAsyncNonMetaRegionLocator {
// Testcase for HBASE-20822
@Test
public void testLocateBeforeLastRegion()
- throws IOException, InterruptedException, ExecutionException {
+ throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable();
getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc =
@@ -377,13 +378,13 @@ public class TestAsyncNonMetaRegionLocator {
@Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
- throws Exception {
+ throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
- throws Exception {
+ throws Exception {
return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
RegionLocateType.CURRENT, reload).get();
}
@@ -405,9 +406,8 @@ public class TestAsyncNonMetaRegionLocator {
public void testConcurrentUpdateCachedLocationOnError() throws Exception {
createSingleRegionTable();
HRegionLocation loc =
- getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false)
- .get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
IntStream.range(0, 100).parallel()
- .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
+ .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
index 1cbde2f..2ef60f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
@@ -51,8 +51,8 @@ import org.junit.runners.Parameterized;
/**
* Class to test asynchronous region admin operations.
- * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our ten
+ * minute timeout so they were split.
*/
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
@@ -60,7 +60,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
+ HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
@Test
public void testGetRegionLocation() throws Exception {
@@ -79,13 +79,13 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
@Test
public void testSplitSwitch() throws Exception {
createTableWithDefaultConf(tableName);
- byte[][] families = {FAMILY};
+ byte[][] families = { FAMILY };
final int rows = 10000;
TestAsyncRegionAdminApi.loadData(tableName, families, rows);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations =
- ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
int originalCount = regionLocations.size();
initSplitMergeSwitch();
@@ -93,7 +93,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
try {
admin.split(tableName, Bytes.toBytes(rows / 2)).join();
} catch (Exception e) {
- //Expected
+ // Expected
}
int count = admin.getRegions(tableName).get().size();
assertTrue(originalCount == count);
@@ -111,12 +111,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
// It was ignored in TestSplitOrMergeStatus, too
public void testMergeSwitch() throws Exception {
createTableWithDefaultConf(tableName);
- byte[][] families = {FAMILY};
+ byte[][] families = { FAMILY };
TestAsyncRegionAdminApi.loadData(tableName, families, 1000);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations =
- ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
int originalCount = regionLocations.size();
initSplitMergeSwitch();
@@ -126,7 +126,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
Threads.sleep(100);
}
assertTrue("originalCount=" + originalCount + ", postSplitCount=" + postSplitCount,
- originalCount != postSplitCount);
+ originalCount != postSplitCount);
// Merge switch is off so merge should NOT succeed.
assertTrue(admin.mergeSwitch(false).get());
@@ -156,12 +156,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
@Test
public void testMergeRegions() throws Exception {
- byte[][] splitRows = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("6")};
+ byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
createTableWithDefaultConf(tableName, splitRows);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
RegionInfo regionA;
RegionInfo regionB;
RegionInfo regionC;
@@ -175,7 +175,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(2, regionLocations.size());
for (HRegionLocation rl : regionLocations) {
@@ -195,11 +195,10 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
Thread.sleep(200);
}
// merge with encoded name
- admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(),
- false).get();
+ admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(), false).get();
- regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(1, regionLocations.size());
}
@@ -233,18 +232,18 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
- splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
+ splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true,
+ Bytes.toBytes("3"));
}
- private void
- splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
- throws Exception {
+ private void splitTest(TableName tableName, int rowCount, boolean isSplitRegion,
+ byte[] splitPoint) throws Exception {
// create table
createTableWithDefaultConf(tableName);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(1, regionLocations.size());
AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
@@ -273,8 +272,8 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
int count = 0;
for (int i = 0; i < 45; i++) {
try {
- regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
count = regionLocations.size();
if (count >= 2) {
break;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
similarity index 88%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
index 88ab3ad..3f66d39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -55,12 +54,14 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
+public class TestAsyncRegionLocatorConcurrenyLimit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+ HBaseClassTestRule.forClass(TestAsyncRegionLocatorConcurrenyLimit.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -70,7 +71,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
private static AsyncConnectionImpl CONN;
- private static AsyncNonMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
private static byte[][] SPLIT_KEYS;
@@ -89,7 +90,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Override
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
- InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+ InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
int concurrency = CONCURRENCY.incrementAndGet();
for (;;) {
@@ -108,7 +109,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
- InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+ InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
CONCURRENCY.decrementAndGet();
}
@@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
ConnectionRegistry registry =
- ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+ ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
- LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.toArray(byte[][]::new);
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
@@ -136,12 +137,12 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@AfterClass
public static void tearDown() throws Exception {
- IOUtils.closeQuietly(CONN);
+ Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
- throws InterruptedException, ExecutionException {
+ throws InterruptedException, ExecutionException {
assertEquals(256, futures.size());
for (int i = 0; i < futures.size(); i++) {
HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index 52ebc16..c8a5cec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -49,8 +50,8 @@ import org.junit.runners.Parameterized;
/**
* Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ * minute timeout so they were split.
* @see TestAsyncTableAdminApi3 Another split out from this class so each runs under ten minutes.
*/
@RunWith(Parameterized.class)
@@ -59,7 +60,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
+ HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
@Test
public void testCreateTable() throws Exception {
@@ -69,13 +70,13 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
tables = admin.listTableDescriptors().get();
assertEquals(numTables + 1, tables.size());
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
}
static TableState.State getStateFromMeta(TableName table) throws Exception {
Optional<TableState> state = ClientMetaTableAccessor
- .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
+ .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
assertTrue(state.isPresent());
return state.get().getState();
}
@@ -86,19 +87,21 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
createTableWithDefaultConf(tableName);
List<HRegionLocation> regionLocations = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals("Table should have only 1 region", 1, regionLocations.size());
final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
createTableWithDefaultConf(tableName2, new byte[][] { new byte[] { 42 } });
- regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
assertEquals("Table should have only 2 region", 2, regionLocations.size());
final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName3);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), Bytes.toBytes("a"), Bytes.toBytes("z"), 3).join();
- regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
assertEquals("Table should have only 3 region", 3, regionLocations.size());
final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
@@ -115,7 +118,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
builder = TableDescriptorBuilder.newBuilder(tableName5);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join();
- regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5).get();
+ regionLocations =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5, true).get();
assertEquals("Table should have 16 region", 16, regionLocations.size());
}
@@ -133,7 +137,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
List<HRegionLocation> regions = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ .getTableHRegionLocations(metaTable, tableName, true).get();
Iterator<HRegionLocation> hris = regions.iterator();
assertEquals(
@@ -191,7 +195,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
- regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+ regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
@@ -243,8 +247,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
- regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3)
- .get();
+ regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
@@ -333,7 +336,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
}
private void testTruncateTable(final TableName tableName, boolean preserveSplits)
- throws Exception {
+ throws Exception {
byte[][] splitKeys = new byte[2][];
splitKeys[0] = Bytes.toBytes(4);
splitKeys[1] = Bytes.toBytes(8);
@@ -374,8 +377,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
testCloneTableSchema(tableName, newTableName, true);
}
- private void testCloneTableSchema(final TableName tableName,
- final TableName newTableName, boolean preserveSplits) throws Exception {
+ private void testCloneTableSchema(final TableName tableName, final TableName newTableName,
+ boolean preserveSplits) throws Exception {
byte[][] splitKeys = new byte[2][];
splitKeys[0] = Bytes.toBytes(4);
splitKeys[1] = Bytes.toBytes(8);
@@ -386,20 +389,16 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
boolean BLOCK_CACHE = false;
// Create the table
- TableDescriptor tableDesc = TableDescriptorBuilder
- .newBuilder(tableName)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
- .setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(FAMILY_1)
- .setBlocksize(BLOCK_SIZE)
- .setBlockCacheEnabled(BLOCK_CACHE)
- .setTimeToLive(TTL)
- .build()).build();
+ TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).setBlocksize(BLOCK_SIZE)
+ .setBlockCacheEnabled(BLOCK_CACHE).setTimeToLive(TTL).build())
+ .build();
admin.createTable(tableDesc, splitKeys).join();
assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
assertTrue("Table should be created with splitKyes + 1 rows in META",
- admin.isTableAvailable(tableName).get());
+ admin.isTableAvailable(tableName).get());
// Clone & Verify
admin.cloneTableSchema(tableName, newTableName, preserveSplits).join();
@@ -414,7 +413,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
if (preserveSplits) {
assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
assertTrue("New table should be created with splitKyes + 1 rows in META",
- admin.isTableAvailable(newTableName).get());
+ admin.isTableAvailable(newTableName).get());
} else {
assertEquals(1, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
index 6ba960b..b84dfb6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
@@ -47,15 +47,15 @@ import org.junit.runners.Parameterized;
/**
* Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ * minute timeout so they were split.
*/
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
+ HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
@Test
public void testTableExist() throws Exception {
@@ -123,7 +123,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
assertEquals(tables.length + 1, size);
for (int i = 0, j = 0; i < tables.length && j < size; i++, j++) {
assertTrue("tableName should be equal in order",
- tableDescs.get(j).getTableName().equals(tables[i]));
+ tableDescs.get(j).getTableName().equals(tables[i]));
}
assertTrue(tableDescs.get(size - 1).getTableName().equals(TableName.META_TABLE_NAME));
@@ -168,7 +168,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
this.admin.disableTable(tableName).join();
assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
// Test that table is disabled
@@ -190,7 +190,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
assertTrue(ok);
this.admin.enableTable(tableName).join();
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
assertEquals(TableState.State.ENABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
// Test that table is enabled
@@ -232,7 +232,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
table2.get(get).get();
admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
- .forEach(t -> admin.disableTable(t).join());
+ .forEach(t -> admin.disableTable(t).join());
// Test that tables are disabled
get = new Get(row);
@@ -256,7 +256,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName2));
admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
- .forEach(t -> admin.enableTable(t).join());
+ .forEach(t -> admin.enableTable(t).join());
// Test that tables are enabled
try {
@@ -283,8 +283,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
createTableWithDefaultConf(tableName, splitKeys);
AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
- List<HRegionLocation> regions = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ List<HRegionLocation> regions =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
@@ -294,8 +294,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
// Enable table, use retain assignment to assign regions.
admin.enableTable(tableName).join();
- List<HRegionLocation> regions2 = ClientMetaTableAccessor
- .getTableHRegionLocations(metaTable, tableName).get();
+ List<HRegionLocation> regions2 =
+ ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
// Check the assignment.
assertEquals(regions.size(), regions2.size());
assertTrue(regions2.containsAll(regions));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 6c6bb98..058da39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
@@ -47,18 +48,19 @@ public class TestAsyncTableLocatePrefetch {
private static byte[] FAMILY = Bytes.toBytes("cf");
- private static AsyncConnection CONN;
+ private static AsyncConnectionImpl CONN;
- private static AsyncNonMetaRegionLocator LOCATOR;
+ private static AsyncRegionLocator LOCATOR;
@BeforeClass
public static void setUp() throws Exception {
- TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
+ TEST_UTIL.getConfiguration().setInt(AsyncRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
- CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
- LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+ CONN = (AsyncConnectionImpl) ConnectionFactory
+ .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
}
@AfterClass
@@ -70,7 +72,7 @@ public class TestAsyncTableLocatePrefetch {
@Test
public void test() throws InterruptedException, ExecutionException {
assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
- RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
+ RegionLocateType.CURRENT, false, TimeUnit.MINUTES.toNanos(1)).get());
// we finish the request before we adding the remaining results to cache so sleep a bit here
Thread.sleep(1000);
// confirm that the locations of all the regions have been cached.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
index 461bf1b..3eeee50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
@@ -84,8 +84,7 @@ public class TestAsyncTableRSCrashPublish {
public void test() throws IOException, ExecutionException, InterruptedException {
Configuration conf = UTIL.getHBaseCluster().getMaster().getConfiguration();
try (AsyncConnection connection = ConnectionFactory.createAsyncConnection(conf).get()) {
- AsyncNonMetaRegionLocator locator =
- ((AsyncConnectionImpl) connection).getLocator().getNonMetaRegionLocator();
+ AsyncRegionLocator locator = ((AsyncConnectionImpl) connection).getLocator();
connection.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
ServerName serverName =
locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 3485955..c6a30d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -91,9 +91,7 @@ public class TestAsyncTableUseMetaReplicas {
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
FailPrimaryMetaScanCp.class.getName());
UTIL.startMiniCluster(3);
- try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
- }
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 0e0060c..fa9370b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -113,7 +113,7 @@ public class TestMasterRegistry {
try (MasterRegistry registry = new MasterRegistry(conf)) {
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index abaf092..71abaae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -46,11 +46,13 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category({SmallTests.class, MasterTests.class })
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ SmallTests.class, MasterTests.class })
public class TestMetaRegionLocationCache {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
+ HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static ConnectionRegistry REGISTRY;
@@ -60,19 +62,19 @@ public class TestMetaRegionLocationCache {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
}
@AfterClass
public static void cleanUp() throws Exception {
- IOUtils.closeQuietly(REGISTRY);
+ Closeables.close(REGISTRY, true);
TEST_UTIL.shutdownMiniCluster();
}
private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Exception {
List<HRegionLocation> result = new ArrayList<>();
- for (String znode: zk.getMetaReplicaNodes()) {
+ for (String znode : zk.getMetaReplicaNodes()) {
String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode);
int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path);
RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId);
@@ -92,7 +94,7 @@ public class TestMetaRegionLocationCache {
}
}
List<HRegionLocation> metaHRLs =
- master.getMetaRegionLocationCache().getMetaRegionLocations().get();
+ master.getMetaRegionLocationCache().getMetaRegionLocations().get();
assertFalse(metaHRLs.isEmpty());
ZKWatcher zk = master.getZooKeeper();
List<String> metaZnodes = zk.getMetaReplicaNodes();
@@ -103,11 +105,13 @@ public class TestMetaRegionLocationCache {
assertEquals(actualHRLs, metaHRLs);
}
- @Test public void testInitialMetaLocations() throws Exception {
+ @Test
+ public void testInitialMetaLocations() throws Exception {
verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
}
- @Test public void testStandByMetaLocations() throws Exception {
+ @Test
+ public void testStandByMetaLocations() throws Exception {
HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
verifyCachedMetaLocations(standBy);
}
@@ -115,16 +119,17 @@ public class TestMetaRegionLocationCache {
/*
* Shuffles the meta region replicas around the cluster and makes sure the cache is not stale.
*/
- @Test public void testMetaLocationsChange() throws Exception {
+ @Test
+ public void testMetaLocationsChange() throws Exception {
List<HRegionLocation> currentMetaLocs =
- getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
+ getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
// Move these replicas to random servers.
- for (HRegionLocation location: currentMetaLocs) {
+ for (HRegionLocation location : currentMetaLocs) {
RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
}
- RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
- for (JVMClusterUtil.MasterThread masterThread:
- TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
+ for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+ .getMasterThreads()) {
verifyCachedMetaLocations(masterThread.getMaster());
}
}
@@ -133,7 +138,8 @@ public class TestMetaRegionLocationCache {
* Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base
* znode for notifications.
*/
- @Test public void testMetaRegionLocationCache() throws Exception {
+ @Test
+ public void testMetaRegionLocationCache() throws Exception {
final String parentZnodeName = "/randomznodename";
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName);
@@ -143,7 +149,8 @@ public class TestMetaRegionLocationCache {
// some ZK activity in the background.
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
- @Override public void doAnAction() throws Exception {
+ @Override
+ public void doAnAction() throws Exception {
final String testZnode = parentZnodeName + "/child";
ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes());
ZKUtil.deleteNode(zkWatcher, testZnode);
@@ -163,8 +170,8 @@ public class TestMetaRegionLocationCache {
// Wait until the meta cache is populated.
int iters = 0;
while (iters++ < 10) {
- if (metaCache.getMetaRegionLocations().isPresent()
- && metaCache.getMetaRegionLocations().get().size() == 3) {
+ if (metaCache.getMetaRegionLocations().isPresent() &&
+ metaCache.getMetaRegionLocations().get().size() == 3) {
break;
}
Thread.sleep(1000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
index db08165..0100ad7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
@@ -118,6 +118,10 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
Thread.sleep(
conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 30000) * 3);
}
+ // cache the location for all the meta regions.
+ try (RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
// Ensure all metas are not on same hbase:meta replica=0 server!
master = util.getHBaseClusterInterface().getClusterMetrics().getMasterName();
@@ -131,7 +135,6 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
util.getHBaseClusterInterface().killRegionServer(primary);
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
}
- c.clearRegionLocationCache();
}
LOG.info("Running GETs");
try (Table htable = c.getTable(TABLE)) {
@@ -150,15 +153,15 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
LOG.info("Master active!");
- c.clearRegionLocationCache();
}
}
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
LOG.info("Running GETs no replicas");
- try (Connection c = ConnectionFactory.createConnection(conf);
- Table htable = c.getTable(TABLE)) {
- Result r = htable.get(new Get(row));
- assertArrayEquals(row, r.getRow());
+ try (Connection c = ConnectionFactory.createConnection(conf)) {
+ try (Table htable = c.getTable(TABLE)) {
+ Result r = htable.get(new Get(row));
+ assertArrayEquals(r.getRow(), row);
+ }
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index d7fc8e0..47a47ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -211,7 +211,7 @@ public class TestReplicasClient {
// No master
LOG.info("Master is going to be stopped");
- TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
Configuration c = new Configuration(HTU.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
LOG.info("Master has stopped");
@@ -225,7 +225,9 @@ public class TestReplicasClient {
@Before
public void before() throws IOException {
- HTU.getConnection().clearRegionLocationCache();
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+ locator.clearRegionLocationCache();
+ }
try {
openRegion(hriPrimary);
} catch (Exception ignored) {
@@ -247,7 +249,6 @@ public class TestReplicasClient {
closeRegion(hriPrimary);
} catch (Exception ignored) {
}
- HTU.getConnection().clearRegionLocationCache();
}
private HRegionServer getRS() {
@@ -326,16 +327,15 @@ public class TestReplicasClient {
byte[] b1 = Bytes.toBytes("testLocations");
openRegion(hriSecondary);
- try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
- RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
- conn.clearRegionLocationCache();
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+ locator.clearRegionLocationCache();
List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
Assert.assertEquals(2, rl.size());
rl = locator.getRegionLocations(b1, false);
Assert.assertEquals(2, rl.size());
- conn.clearRegionLocationCache();
+ locator.clearRegionLocationCache();
rl = locator.getRegionLocations(b1, false);
Assert.assertEquals(2, rl.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
index 137cb28..af4533b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
@@ -19,16 +19,16 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -48,6 +48,8 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
@Category({ MediumTests.class, ClientTests.class })
public class TestZKConnectionRegistry {
@@ -73,7 +75,7 @@ public class TestZKConnectionRegistry {
@AfterClass
public static void tearDown() throws Exception {
- IOUtils.closeQuietly(REGISTRY);
+ Closeables.close(REGISTRY, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -86,8 +88,7 @@ public class TestZKConnectionRegistry {
clusterId);
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getActiveMaster().get());
- RegionReplicaTestHelper
- .waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
index 41a008a..8a760dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
@@ -62,10 +62,10 @@ public class AlwaysStandByHMaster extends HMaster {
if (MasterAddressTracker.getMasterAddress(watcher) != null) {
clusterHasActiveMaster.set(true);
}
- Threads.sleepWithoutInterrupt(100);
} catch (IOException e) {
// pass, we will get notified when some other active master creates the znode.
}
+ Threads.sleepWithoutInterrupt(1000);
} catch (KeeperException e) {
master.abort("Received an unexpected KeeperException, aborting", e);
return false;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 50851c1..4a68f7a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
@@ -506,4 +507,10 @@ public class MockNoopMasterServices implements MasterServices {
public boolean isBalancerOn() {
return false;
}
+
+ @Override
+ public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+ throws IOException {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index a27936d..45fac6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -28,12 +28,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -161,54 +158,5 @@ public class TestMasterFailover {
TEST_UTIL.shutdownMiniCluster();
}
}
-
- /**
- * Test meta in transition when master failover.
- * This test used to manipulate region state up in zk. That is not allowed any more in hbase2
- * so I removed that messing. That makes this test anemic.
- */
- @Test
- public void testMetaInTransitionWhenMasterFailover() throws Exception {
- // Start the cluster
- HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- TEST_UTIL.startMiniCluster();
- try {
- MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
- LOG.info("Cluster started");
-
- HMaster activeMaster = cluster.getMaster();
- ServerName metaServerName = cluster.getServerHoldingMeta();
- HRegionServer hrs = cluster.getRegionServer(metaServerName);
-
- // Now kill master, meta should remain on rs, where we placed it before.
- LOG.info("Aborting master");
- activeMaster.abort("test-kill");
- cluster.waitForMasterToStop(activeMaster.getServerName(), 30000);
- LOG.info("Master has aborted");
-
- // meta should remain where it was
- RegionState metaState = MetaTableLocator.getMetaRegionState(hrs.getZooKeeper());
- assertEquals("hbase:meta should be online on RS",
- metaState.getServerName(), metaServerName);
- assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
- // Start up a new master
- LOG.info("Starting up a new master");
- activeMaster = cluster.startMaster().getMaster();
- LOG.info("Waiting for master to be ready");
- cluster.waitForActiveAndReadyMaster();
- LOG.info("Master is ready");
-
- // ensure meta is still deployed on RS
- metaState = MetaTableLocator.getMetaRegionState(activeMaster.getZooKeeper());
- assertEquals("hbase:meta should be online on RS",
- metaState.getServerName(), metaServerName);
- assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
- // Done, shutdown the cluster
- } finally {
- TEST_UTIL.shutdownMiniCluster();
- }
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
index 425d08b..f2b9a56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
@@ -89,7 +90,20 @@ public class TestMetaAssignmentWithStopMaster {
}
}
- ServerName newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+ ServerName newMetaServer;
+ startTime = System.currentTimeMillis();
+ for (;;) {
+ try {
+ newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+ break;
+ } catch (IOException e) {
+ LOG.warn("failed to get all locations, retry...", e);
+ }
+ Thread.sleep(3000);
+ if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
+ fail("Wait too long for getting the new meta location");
+ }
+ }
assertTrue("The new meta server " + newMetaServer + " should be same with" +
" the old meta server " + oldMetaServer, newMetaServer.equals(oldMetaServer));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
index 742734e..efb784e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
@@ -99,8 +97,6 @@ public class TestMetaShutdownHandler {
metaServerName =
regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
}
- RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
- assertEquals("Wrong state for meta!", RegionState.State.OPEN, metaState.getState());
assertNotEquals("Meta is on master!", metaServerName, master.getServerName());
// Delete the ephemeral node of the meta-carrying region server.
@@ -126,11 +122,9 @@ public class TestMetaShutdownHandler {
assertTrue("Meta should be assigned",
regionStates.isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO));
// Now, make sure meta is registered in zk
- metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
- assertEquals("Meta should not be in transition", RegionState.State.OPEN, metaState.getState());
- assertEquals("Meta should be assigned", metaState.getServerName(),
- regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO));
- assertNotEquals("Meta should be assigned on a different server", metaState.getServerName(),
+ ServerName newMetaServerName =
+ regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ assertNotEquals("Meta should be assigned on a different server", newMetaServerName,
metaServerName);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
index 0d3790f..70c5b1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
@@ -19,10 +19,10 @@
package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertNotEquals;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -47,15 +47,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestRegionReplicaSplit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaSplit.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaSplit.class);
private static final int NB_SERVERS = 4;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
index 6cd91a7..e706595 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
@@ -165,7 +165,7 @@ public class TestCompactionLifeCycleTracker {
.setValue(Bytes.toBytes(i))
.build()));
}
- UTIL.getAdmin().flush(NAME);
+ UTIL.flush(NAME);
for (int i = 100; i < 200; i++) {
byte[] row = Bytes.toBytes(i);
table.put(new Put(row)
@@ -178,7 +178,7 @@ public class TestCompactionLifeCycleTracker {
.setValue(Bytes.toBytes(i))
.build()));
}
- UTIL.getAdmin().flush(NAME);
+ UTIL.flush(NAME);
}
region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
assertEquals(2, region.getStore(CF1).getStorefilesCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 504a140..a23aa32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -110,7 +110,7 @@ public class TestRegionReplicas {
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
// No master
- TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
}
@AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 81a8559..241f425 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -35,10 +34,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -90,15 +88,24 @@ public class TestRegionServerNoMaster {
}
regionName = hri.getRegionName();
- stopMasterAndAssignMeta(HTU);
+ stopMasterAndCacheMetaLocation(HTU);
}
- public static void stopMasterAndAssignMeta(HBaseTestingUtility HTU)
- throws IOException, InterruptedException {
+ public static void stopMasterAndCacheMetaLocation(HBaseTestingUtility HTU)
+ throws IOException, InterruptedException {
+ // cache meta location, so we will not go to master to lookup meta region location
+ for (JVMClusterUtil.RegionServerThread t : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
+ try (RegionLocator locator =
+ t.getRegionServer().getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
+ }
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
// Stop master
HMaster master = HTU.getHBaseCluster().getMaster();
Thread masterThread = HTU.getHBaseCluster().getMasterThread();
- ServerName masterAddr = master.getServerName();
master.stopMaster();
LOG.info("Waiting until master thread exits");
@@ -107,27 +114,6 @@ public class TestRegionServerNoMaster {
}
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = true;
- // Master is down, so is the meta. We need to assign it somewhere
- // so that regions can be assigned during the mocking phase.
- HRegionServer hrs = HTU.getHBaseCluster()
- .getLiveRegionServerThreads().get(0).getRegionServer();
- ZKWatcher zkw = hrs.getZooKeeper();
- ServerName sn = MetaTableLocator.getMetaRegionLocation(zkw);
- if (sn != null && !masterAddr.equals(sn)) {
- return;
- }
-
- ProtobufUtil.openRegion(null, hrs.getRSRpcServices(),
- hrs.getServerName(), RegionInfoBuilder.FIRST_META_REGIONINFO);
- while (true) {
- sn = MetaTableLocator.getMetaRegionLocation(zkw);
- if (sn != null && sn.equals(hrs.getServerName())
- && hrs.getOnlineRegions().containsKey(
- RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
- break;
- }
- Thread.sleep(100);
- }
}
/** Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush() */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index e56083d..1ec6d36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -133,8 +131,12 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
// mock a secondary region info to open
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
+ // cache the location for meta regions
+ try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+ locator.getAllRegionLocations();
+ }
// No master
- TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+ TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
}
@@ -186,11 +188,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
- try (AsyncClusterConnection conn = ClusterConnectionFactory
- .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(conn, entries);
- }
+ AsyncClusterConnection conn = HTU.getAsyncConnection();
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(conn, entries);
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
@@ -216,36 +216,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
- try (AsyncClusterConnection conn = ClusterConnectionFactory
- .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
- openRegion(HTU, rs0, hriSecondary);
- // load some data to primary
- HTU.loadNumericRows(table, f, 0, 1000);
+ AsyncClusterConnection conn = HTU.getAsyncConnection();
+ openRegion(HTU, rs0, hriSecondary);
+ // load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
- Assert.assertEquals(1000, entries.size());
+ Assert.assertEquals(1000, entries.size());
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(conn, entries);
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(conn, entries);
- Region region = rs0.getRegion(hriSecondary.getEncodedName());
- HTU.verifyNumericRows(region, f, 0, 1000);
+ Region region = rs0.getRegion(hriSecondary.getEncodedName());
+ HTU.verifyNumericRows(region, f, 0, 1000);
- HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+ HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
- // move the secondary region from RS0 to RS1
- closeRegion(HTU, rs0, hriSecondary);
- openRegion(HTU, rs1, hriSecondary);
+ // move the secondary region from RS0 to RS1
+ closeRegion(HTU, rs0, hriSecondary);
+ openRegion(HTU, rs1, hriSecondary);
- // replicate the new data
- replicateUsingCallable(conn, entries);
+ // replicate the new data
+ replicateUsingCallable(conn, entries);
- region = rs1.getRegion(hriSecondary.getEncodedName());
- // verify the new data. old data may or may not be there
- HTU.verifyNumericRows(region, f, 1000, 2000);
+ region = rs1.getRegion(hriSecondary.getEncodedName());
+ // verify the new data. old data may or may not be there
+ HTU.verifyNumericRows(region, f, 1000, 2000);
- HTU.deleteNumericRows(table, f, 0, 2000);
- closeRegion(HTU, rs1, hriSecondary);
- }
+ HTU.deleteNumericRows(table, f, 0, 2000);
+ closeRegion(HTU, rs1, hriSecondary);
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
index 8afc1f1..4cf1508 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
@@ -129,7 +129,7 @@ public class TestHBaseFsckEncryption {
table.close();
}
// Flush it
- TEST_UTIL.getAdmin().flush(tableDescriptor.getTableName());
+ TEST_UTIL.flush(tableDescriptor.getTableName());
// Verify we have encrypted store files on disk
final List<Path> paths = findStorefilePaths(tableDescriptor.getTableName());
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index c7b45fe..0fd3467 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -17,23 +17,16 @@
*/
package org.apache.hadoop.hbase.zookeeper;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
@@ -46,9 +39,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaReg
* <p/>
* Meta region location is set by <code>RegionServerServices</code>. This class doesn't use ZK
* watchers, rather accesses ZK directly.
- * <p/>
- * TODO: rewrite using RPC calls to master to find out about hbase:meta.
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we store the meta location in the local
+ * store of master, the location on zk is only a mirror of the first meta region to keep
+ * compatibility.
*/
+@Deprecated
@InterfaceAudience.Private
public final class MetaTableLocator {
private static final Logger LOG = LoggerFactory.getLogger(MetaTableLocator.class);
@@ -57,166 +52,16 @@ public final class MetaTableLocator {
}
/**
- * @param zkw ZooKeeper watcher to be used
- * @return meta table regions and their locations.
- */
- public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) {
- return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
- * Gets the meta regions and their locations for the given path and replica ID.
- *
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @return meta table regions and their locations.
- */
- public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw,
- int replicaId) {
- ServerName serverName = getMetaRegionLocation(zkw, replicaId);
- List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1);
- list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica(
- RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName));
- return list;
- }
-
- /**
- * Gets the meta regions for the given path with the default replica ID.
- *
- * @param zkw ZooKeeper watcher to be used
- * @return List of meta regions
- */
- public static List<RegionInfo> getMetaRegions(ZKWatcher zkw) {
- return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
- * Gets the meta regions for the given path and replica ID.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @return List of meta regions
- */
- public static List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) {
- List<Pair<RegionInfo, ServerName>> result;
- result = getMetaRegionsAndLocations(zkw, replicaId);
- return getListOfRegionInfos(result);
- }
-
- private static List<RegionInfo> getListOfRegionInfos(
- final List<Pair<RegionInfo, ServerName>> pairs) {
- if (pairs == null || pairs.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<RegionInfo> result = new ArrayList<>(pairs.size());
- for (Pair<RegionInfo, ServerName> pair : pairs) {
- result.add(pair.getFirst());
- }
- return result;
- }
-
- /**
- * Gets the meta region location, if available. Does not block.
- * @param zkw zookeeper connection to use
- * @return server name or null if we failed to get the data.
- */
- public static ServerName getMetaRegionLocation(final ZKWatcher zkw) {
- try {
- RegionState state = getMetaRegionState(zkw);
- return state.isOpened() ? state.getServerName() : null;
- } catch (KeeperException ke) {
- return null;
- }
- }
-
- /**
- * Gets the meta region location, if available. Does not block.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @return server name
- */
- public static ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
- try {
- RegionState state = getMetaRegionState(zkw, replicaId);
- return state.isOpened() ? state.getServerName() : null;
- } catch (KeeperException ke) {
- return null;
- }
- }
-
- /**
- * Gets the meta region location, if available, and waits for up to the specified timeout if not
- * immediately available. Given the zookeeper notification could be delayed, we will try to get
- * the latest data.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout maximum time to wait, in millis
- * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
- * if none available
- * @throws InterruptedException if interrupted while waiting
- * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
- */
- public static ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout)
- throws InterruptedException, NotAllMetaRegionsOnlineException {
- return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
- }
-
- /**
- * Gets the meta region location, if available, and waits for up to the specified timeout if not
- * immediately available. Given the zookeeper notification could be delayed, we will try to get
- * the latest data.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param replicaId the ID of the replica
- * @param timeout maximum time to wait, in millis
- * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
- * if none available
- * @throws InterruptedException if waiting for the socket operation fails
- * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
- */
- public static ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout)
- throws InterruptedException, NotAllMetaRegionsOnlineException {
- try {
- if (ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) == -1) {
- String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. " +
- "There could be a mismatch with the one configured in the master.";
- LOG.error(errorMsg);
- throw new IllegalArgumentException(errorMsg);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
- }
- ServerName sn = blockUntilAvailable(zkw, replicaId, timeout);
-
- if (sn == null) {
- throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
- }
-
- return sn;
- }
-
- /**
- * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
- * specified server address.
- * @param zookeeper zookeeper reference
- * @param serverName The server hosting <code>hbase:meta</code>
- * @param state The region transition state
- * @throws KeeperException unexpected zookeeper exception
- */
- public static void setMetaLocation(ZKWatcher zookeeper,
- ServerName serverName, RegionState.State state) throws KeeperException {
- setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state);
- }
-
- /**
* Sets the location of <code>hbase:meta</code> in ZooKeeper to the specified server address.
* @param zookeeper reference to the {@link ZKWatcher} which also contains configuration and
- * operation
+ * operation
* @param serverName the name of the server
* @param replicaId the ID of the replica
* @param state the state of the region
* @throws KeeperException if a ZooKeeper operation fails
*/
public static void setMetaLocation(ZKWatcher zookeeper, ServerName serverName, int replicaId,
- RegionState.State state) throws KeeperException {
+ RegionState.State state) throws KeeperException {
if (serverName == null) {
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
return;
@@ -225,43 +70,32 @@ public final class MetaTableLocator {
serverName);
// Make the MetaRegionServer pb and then get its bytes and save this as
// the znode content.
- MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
- .setServer(ProtobufUtil.toServerName(serverName))
- .setRpcVersion(HConstants.RPC_CURRENT_VERSION)
- .setState(state.convert()).build();
+ MetaRegionServer pbrsr =
+ MetaRegionServer.newBuilder().setServer(ProtobufUtil.toServerName(serverName))
+ .setRpcVersion(HConstants.RPC_CURRENT_VERSION).setState(state.convert()).build();
byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
try {
- ZKUtil.setData(zookeeper,
- zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
- } catch(KeeperException.NoNodeException nne) {
+ ZKUtil.setData(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
+ } catch (KeeperException.NoNodeException nne) {
if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
LOG.debug("META region location doesn't exist, create it");
} else {
- LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
- ", create it");
+ LOG.debug("META region location doesn't exist for replicaId=" + replicaId + ", create it");
}
ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),
- data);
+ data);
}
}
/**
- * Load the meta region state from the meta server ZNode.
- */
- public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException {
- return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- /**
* Load the meta region state from the meta region server ZNode.
- *
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
* @param replicaId the ID of the replica
* @return regionstate
* @throws KeeperException if a ZooKeeper operation fails
*/
public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
- throws KeeperException {
+ throws KeeperException {
RegionState regionState = null;
try {
byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
@@ -273,109 +107,4 @@ public final class MetaTableLocator {
}
return regionState;
}
-
- /**
- * Deletes the location of <code>hbase:meta</code> in ZooKeeper.
- * @param zookeeper zookeeper reference
- * @throws KeeperException unexpected zookeeper exception
- */
- public static void deleteMetaLocation(ZKWatcher zookeeper)
- throws KeeperException {
- deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID);
- }
-
- public static void deleteMetaLocation(ZKWatcher zookeeper, int replicaId)
- throws KeeperException {
- if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
- LOG.info("Deleting hbase:meta region location in ZooKeeper");
- } else {
- LOG.info("Deleting hbase:meta for {} region location in ZooKeeper", replicaId);
- }
- try {
- // Just delete the node. Don't need any watches.
- ZKUtil.deleteNode(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId));
- } catch(KeeperException.NoNodeException nne) {
- // Has already been deleted
- }
- }
- /**
- * Wait until the primary meta region is available. Get the secondary locations as well but don't
- * block for those.
- *
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
- * @param timeout maximum time to wait in millis
- * @param conf the {@link Configuration} to use
- * @return ServerName or null if we timed out.
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static List<ServerName> blockUntilAvailable(final ZKWatcher zkw, final long timeout,
- Configuration conf) throws InterruptedException {
- int numReplicasConfigured = 1;
-
- List<ServerName> servers = new ArrayList<>();
- // Make the blocking call first so that we do the wait to know
- // the znodes are all in place or timeout.
- ServerName server = blockUntilAvailable(zkw, timeout);
-
- if (server == null) {
- return null;
- }
-
- servers.add(server);
-
- try {
- List<String> metaReplicaNodes = zkw.getMetaReplicaNodes();
- numReplicasConfigured = metaReplicaNodes.size();
- } catch (KeeperException e) {
- LOG.warn("Got ZK exception {}", e);
- }
- for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) {
- // return all replica locations for the meta
- servers.add(getMetaRegionLocation(zkw, replicaId));
- }
- return servers;
- }
-
- /**
- * Wait until the meta region is available and is not in transition.
- * @param zkw zookeeper connection to use
- * @param timeout maximum time to wait, in millis
- * @return ServerName or null if we timed out.
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static ServerName blockUntilAvailable(final ZKWatcher zkw, final long timeout)
- throws InterruptedException {
- return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
- }
-
- /**
- * Wait until the meta region is available and is not in transition.
- * @param zkw reference to the {@link ZKWatcher} which also contains configuration and constants
- * @param replicaId the ID of the replica
- * @param timeout maximum time to wait in millis
- * @return ServerName or null if we timed out.
- * @throws InterruptedException if waiting for the socket operation fails
- */
- public static ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId,
- final long timeout) throws InterruptedException {
- if (timeout < 0) {
- throw new IllegalArgumentException();
- }
-
- if (zkw == null) {
- throw new IllegalArgumentException();
- }
-
- long startTime = System.currentTimeMillis();
- ServerName sn = null;
- while (true) {
- sn = getMetaRegionLocation(zkw, replicaId);
- if (sn != null ||
- (System.currentTimeMillis() - startTime) > timeout - HConstants.SOCKET_RETRY_WAIT_MS) {
- break;
- }
- Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
- }
- return sn;
- }
}
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 19d11d0..d1a4dfe 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -37,14 +37,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.security.Superusers;
@@ -78,6 +75,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@@ -1859,15 +1857,6 @@ public final class ZKUtil {
sb.append("\n ").append(child);
}
}
- sb.append("\nRegion server holding hbase:meta: "
- + MetaTableLocator.getMetaRegionLocation(zkw));
- Configuration conf = HBaseConfiguration.create();
- int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
- HConstants.DEFAULT_META_REPLICA_NUM);
- for (int i = 1; i < numMetaReplicas; i++) {
- sb.append("\nRegion server holding hbase:meta, replicaId " + i + " "
- + MetaTableLocator.getMetaRegionLocation(zkw, i));
- }
sb.append("\nRegion servers:");
final List<String> rsChildrenNoWatchList =
listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);