You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/10 18:08:59 UTC
[3/3] hbase git commit: HBASE-19200 Make hbase-client only depend on
ZKAsyncRegistry and ZNodePaths
HBASE-19200 Make hbase-client only depend on ZKAsyncRegistry and ZNodePaths
- Removes zookeeper connection from ClusterConnection
- Deletes class ZooKeeperKeepAliveConnection
- Removes Registry, ZooKeeperRegistry, and RegistryFactory
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72270866
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72270866
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72270866
Branch: refs/heads/master
Commit: 72270866cbacbcfd10f2ad2cce33362577eeef4d
Parents: 31234eb
Author: zhangduo <zh...@apache.org>
Authored: Fri Nov 10 22:16:45 2017 +0800
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Fri Nov 10 10:05:25 2017 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/ClusterConnection.java | 4 +
.../hbase/client/ConnectionImplementation.java | 236 +++++---------
.../apache/hadoop/hbase/client/HBaseAdmin.java | 326 +++++++------------
.../apache/hadoop/hbase/client/MetaCache.java | 27 +-
.../apache/hadoop/hbase/client/Registry.java | 53 ---
.../hadoop/hbase/client/RegistryFactory.java | 50 ---
.../hadoop/hbase/client/ZKAsyncRegistry.java | 13 +-
.../client/ZooKeeperKeepAliveConnection.java | 56 ----
.../hadoop/hbase/client/ZooKeeperRegistry.java | 129 --------
.../hbase/zookeeper/MasterAddressTracker.java | 2 +-
.../hbase/zookeeper/RecoverableZooKeeper.java | 70 +---
.../hadoop/hbase/zookeeper/ZKMetadata.java | 80 +++++
.../apache/hadoop/hbase/zookeeper/ZKUtil.java | 35 +-
.../hadoop/hbase/zookeeper/ZNodePaths.java | 80 +++--
.../hadoop/hbase/zookeeper/ZkAclReset.java | 2 +-
.../hbase/zookeeper/ZooKeeperWatcher.java | 2 +-
.../hbase/client/DoNothingAsyncRegistry.java | 64 ++++
.../hadoop/hbase/client/TestAsyncProcess.java | 21 +-
.../hbase/client/TestBufferedMutator.java | 33 +-
.../hbase/client/TestClientNoCluster.java | 66 ++--
.../hbase/zookeeper/TestZooKeeperWatcher.java | 9 +-
.../hbase/IntegrationTestMetaReplicas.java | 3 +-
.../test/IntegrationTestZKAndFSPermissions.java | 3 +-
.../replication/ReplicationPeersZKImpl.java | 3 +-
.../ReplicationQueuesClientZKImpl.java | 9 +-
.../replication/ReplicationQueuesZKImpl.java | 54 +--
.../replication/ReplicationStateZKBase.java | 17 +-
.../hbase/rsgroup/RSGroupInfoManagerImpl.java | 11 +-
.../rsgroup/VerifyingRSGroupAdminClient.java | 5 +-
.../apache/hadoop/hbase/ZKNamespaceManager.java | 5 +-
.../org/apache/hadoop/hbase/ZNodeClearer.java | 12 +-
.../backup/example/HFileArchiveManager.java | 3 +-
.../backup/example/ZKTableArchiveClient.java | 4 +-
.../ZKSplitLogManagerCoordination.java | 10 +-
.../ZkSplitLogWorkerCoordination.java | 9 +-
.../hbase/master/ActiveMasterManager.java | 3 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 9 +-
.../cleaner/ReplicationZKNodeCleaner.java | 7 +-
.../hbase/procedure/ZKProcedureCoordinator.java | 5 +-
.../hbase/procedure/ZKProcedureMemberRpcs.java | 10 +-
.../hadoop/hbase/procedure/ZKProcedureUtil.java | 19 +-
.../hbase/regionserver/HRegionServer.java | 3 +-
.../security/access/ZKPermissionWatcher.java | 15 +-
.../token/AuthenticationTokenSecretManager.java | 4 +-
.../hbase/security/token/ZKSecretWatcher.java | 7 +-
.../visibility/ZKVisibilityLabelWatcher.java | 5 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 3 +-
.../hadoop/hbase/util/ZKDataMigrator.java | 3 +-
.../hbase/zookeeper/RegionServerTracker.java | 2 +-
.../hbase/zookeeper/SplitOrMergeTracker.java | 4 +-
.../hadoop/hbase/zookeeper/ZKSplitLog.java | 4 +-
.../apache/hadoop/hbase/GenericTestUtils.java | 1 +
.../hbase/client/TestMetaWithReplicas.java | 9 +-
.../hbase/master/TestAssignmentListener.java | 5 +-
.../hbase/master/TestMetaShutdownHandler.java | 3 +-
.../hbase/master/TestTableStateManager.java | 3 +-
.../TestCompactionInDeadRegionServer.java | 4 +-
.../hbase/regionserver/TestSplitLogWorker.java | 7 +-
.../replication/TestMasterReplication.java | 7 +-
.../TestReplicationStateHBaseImpl.java | 3 +-
.../replication/TestReplicationStateZKImpl.java | 9 +-
.../TestReplicationTrackerZKImpl.java | 18 +-
.../TestReplicationSourceManager.java | 8 +-
.../token/TestZKSecretWatcherRefreshKeys.java | 3 +-
.../hadoop/hbase/zookeeper/TestZKMulti.java | 40 +--
.../zookeeper/TestZooKeeperNodeTracker.java | 2 +-
66 files changed, 687 insertions(+), 1044 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index a931b1d..7294559 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -226,6 +226,10 @@ public interface ClusterConnection extends Connection {
*/
MasterService.BlockingInterface getMaster() throws IOException;
+ /**
+ * Get the admin service for master.
+ */
+ AdminService.BlockingInterface getAdminForMaster() throws IOException;
/**
* Establishes a connection to the region server at the specified address.
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 56a2e84..47e8dc8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -25,16 +25,23 @@ import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRI
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx;
+import com.google.common.base.Throwables;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -47,7 +54,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -72,9 +78,6 @@ import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -124,8 +127,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import edu.umd.cs.findbugs.annotations.Nullable;
-
/**
* Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
* Encapsulates connection to zookeeper and regionservers.
@@ -168,12 +169,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final Object metaRegionLock = new Object();
- // We have a single lock for master & zk to prevent deadlocks. Having
- // one lock for ZK and one lock for master is not possible:
- // When creating a connection to master, we need a connection to ZK to get
- // its address. But another thread could have taken the ZK lock, and could
- // be waiting for the master lock => deadlock.
- private final Object masterAndZKLock = new Object();
+ private final Object masterLock = new Object();
// thread executor shared by all Table instances created
// by this connection
@@ -206,7 +202,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
/**
* Cluster registry of basic info such as clusterid and meta region location.
*/
- Registry registry;
+ private final AsyncRegistry registry;
private final ClientBackoffPolicy backoffPolicy;
@@ -284,7 +280,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.conf.get(BufferedMutator.CLASSNAME_KEY);
try {
- this.registry = setupRegistry();
+ this.registry = AsyncRegistryFactory.getRegistry(conf);
retrieveClusterId();
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
@@ -495,13 +491,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
/**
- * @return The cluster registry implementation to use.
- */
- private Registry setupRegistry() throws IOException {
- return RegistryFactory.getRegistry(this);
- }
-
- /**
* For tests only.
*/
@VisibleForTesting
@@ -523,7 +512,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (clusterId != null) {
return;
}
- this.clusterId = this.registry.getClusterId();
+ try {
+ this.clusterId = this.registry.getClusterId().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Retrieve cluster id failed", e);
+ }
if (clusterId == null) {
clusterId = HConstants.CLUSTER_ID_DEFAULT;
LOG.debug("clusterid came back null, using default " + clusterId);
@@ -535,25 +528,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.conf;
}
- private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
- throws MasterNotRunningException {
- String errorMsg;
- try {
- if (ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) == -1) {
- errorMsg = "The node " + zkw.znodePaths.baseZNode+" is not in ZooKeeper. "
- + "It should have been written by the master. "
- + "Check the value configured in 'zookeeper.znode.parent'. "
- + "There could be a mismatch with the one configured in the master.";
- LOG.error(errorMsg);
- throw new MasterNotRunningException(errorMsg);
- }
- } catch (KeeperException e) {
- errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
- LOG.error(errorMsg);
- throw new MasterNotRunningException(errorMsg, e);
- }
- }
-
/**
* @return true if the master is running, throws an exception otherwise
* @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
@@ -652,8 +626,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
- RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
- HRegionInfo.getStartKey(regionName), false, true);
+ RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
+ RegionInfo.getStartKey(regionName), false, true);
return locations == null ? null : locations.getRegionLocation();
}
@@ -667,17 +641,20 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
- public List<HRegionLocation> locateRegions(final TableName tableName)
- throws IOException {
+ public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
return locateRegions(tableName, false, true);
}
@Override
- public List<HRegionLocation> locateRegions(final TableName tableName,
- final boolean useCache, final boolean offlined) throws IOException {
- List<RegionInfo> regions = MetaTableAccessor
- .getTableRegions(this, tableName, !offlined);
- final List<HRegionLocation> locations = new ArrayList<>();
+ public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
+ boolean offlined) throws IOException {
+ List<RegionInfo> regions;
+ if (TableName.isMetaTableName(tableName)) {
+ regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ } else {
+ regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
+ }
+ List<HRegionLocation> locations = new ArrayList<>();
for (RegionInfo regionInfo : regions) {
RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
if (list != null) {
@@ -772,7 +749,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
// Look up from zookeeper
- locations = this.registry.getMetaRegionLocation();
+ locations = get(this.registry.getMetaRegionLocation());
if (locations != null) {
cacheLocation(tableName, locations);
}
@@ -799,7 +776,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// build the key of the meta region we should be looking for.
// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
- byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
+ byte[] metaKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
Scan s = new Scan();
s.setReversed(true);
@@ -851,7 +828,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
throw new IOException("HRegionInfo was null in " +
tableName + ", row=" + regionInfoRow);
}
- HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
+ RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
if (regionInfo == null) {
throw new IOException("HRegionInfo was null or empty in " +
TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
@@ -1124,37 +1101,25 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
*/
private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
throws IOException, KeeperException {
- ZooKeeperKeepAliveConnection zkw;
- try {
- zkw = getKeepAliveZooKeeperWatcher();
- } catch (IOException e) {
- ExceptionUtil.rethrowIfInterrupt(e);
- throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
- }
- try {
- checkIfBaseNodeAvailable(zkw);
- ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
- if (sn == null) {
- String msg = "ZooKeeper available but no active master location found";
- LOG.info(msg);
- throw new MasterNotRunningException(msg);
- }
- if (isDeadServer(sn)) {
- throw new MasterNotRunningException(sn + " is dead.");
- }
- // Use the security info interface name as our stub key
- String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn,
- hostnamesCanChange);
- MasterProtos.MasterService.BlockingInterface stub =
- (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
- BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
- return MasterProtos.MasterService.newBlockingStub(channel);
- });
- isMasterRunning(stub);
- return stub;
- } finally {
- zkw.close();
- }
+ ServerName sn = get(registry.getMasterAddress());
+ if (sn == null) {
+ String msg = "ZooKeeper available but no active master location found";
+ LOG.info(msg);
+ throw new MasterNotRunningException(msg);
+ }
+ if (isDeadServer(sn)) {
+ throw new MasterNotRunningException(sn + " is dead.");
+ }
+ // Use the security info interface name as our stub key
+ String key =
+ getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
+ MasterProtos.MasterService.BlockingInterface stub =
+ (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
+ BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
+ return MasterProtos.MasterService.newBlockingStub(channel);
+ });
+ isMasterRunning(stub);
+ return stub;
}
/**
@@ -1165,7 +1130,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
// The lock must be at the beginning to prevent multiple master creations
// (and leaks) in a multithread context
- synchronized (masterAndZKLock) {
+ synchronized (masterLock) {
Exception exceptionCaught = null;
if (!closed) {
try {
@@ -1184,6 +1149,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
+ public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
+ return getAdmin(get(registry.getMasterAddress()));
+ }
+
+ @Override
public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
throws IOException {
if (isDeadServer(serverName)) {
@@ -1212,48 +1182,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
});
}
- private ZooKeeperKeepAliveConnection keepAliveZookeeper;
- private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
-
- /**
- * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
- * @return The shared instance. Never returns null.
- */
- ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
- throws IOException {
- synchronized (masterAndZKLock) {
- if (keepAliveZookeeper == null) {
- if (this.closed) {
- throw new IOException(toString() + " closed");
- }
- // We don't check that our link to ZooKeeper is still valid
- // But there is a retry mechanism in the ZooKeeperWatcher itself
- keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
- }
- keepAliveZookeeperUserCount.addAndGet(1);
- return keepAliveZookeeper;
- }
- }
-
- void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
- if (zkw == null){
- return;
- }
- }
-
- private void closeZooKeeperWatcher() {
- synchronized (masterAndZKLock) {
- if (keepAliveZookeeper != null) {
- LOG.info("Closing zookeeper sessionid=0x" +
- Long.toHexString(
- keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
- keepAliveZookeeper.internalClose();
- keepAliveZookeeper = null;
- }
- keepAliveZookeeperUserCount.set(0);
- }
- }
-
final MasterServiceState masterServiceState = new MasterServiceState(this);
@Override
@@ -1268,7 +1196,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException {
- synchronized (masterAndZKLock) {
+ synchronized (masterLock) {
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
try {
@@ -1812,7 +1740,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (mss.getStub() == null) {
return;
}
- synchronized (masterAndZKLock) {
+ synchronized (masterLock) {
--mss.userCount;
}
}
@@ -1830,13 +1758,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* connection itself.
*/
private void closeMaster() {
- synchronized (masterAndZKLock) {
+ synchronized (masterLock) {
closeMasterService(masterServiceState);
}
}
- void updateCachedLocation(HRegionInfo hri, ServerName source,
- ServerName serverName, long seqNum) {
+ void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
cacheLocation(hri.getTable(), source, newHrl);
}
@@ -1889,7 +1816,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return;
}
- HRegionInfo regionInfo = oldLocation.getRegionInfo();
+ RegionInfo regionInfo = oldLocation.getRegion();
Throwable cause = ClientExceptionsUtil.findException(exception);
if (cause != null) {
if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
@@ -1902,7 +1829,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (LOG.isTraceEnabled()) {
LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
rme.getHostname() + ":" + rme.getPort() +
- " according to " + source.getHostAndPort());
+ " according to " + source.getAddress());
}
// We know that the region is not anymore on this region server, but we know
// the new location.
@@ -1947,26 +1874,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public void abort(final String msg, Throwable t) {
- if (t instanceof KeeperException.SessionExpiredException
- && keepAliveZookeeper != null) {
- synchronized (masterAndZKLock) {
- if (keepAliveZookeeper != null) {
- LOG.warn("This client just lost it's session with ZooKeeper," +
- " closing it." +
- " It will be recreated next time someone needs it", t);
- closeZooKeeperWatcher();
- }
- }
+ if (t != null) {
+ LOG.fatal(msg, t);
} else {
- if (t != null) {
- LOG.fatal(msg, t);
- } else {
- LOG.fatal(msg);
- }
- this.aborted = true;
- close();
- this.closed = true;
+ LOG.fatal(msg);
}
+ this.aborted = true;
+ close();
+ this.closed = true;
}
@Override
@@ -1981,7 +1896,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public int getCurrentNrHRS() throws IOException {
- return this.registry.getCurrentNrHRS();
+ return get(this.registry.getCurrentNrHRS());
}
@Override
@@ -1995,7 +1910,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.metrics.shutdown();
}
this.closed = true;
- closeZooKeeperWatcher();
+ registry.close();
this.stubs.clear();
if (clusterStatusListener != null) {
clusterStatusListener.close();
@@ -2061,4 +1976,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
public RpcControllerFactory getRpcControllerFactory() {
return this.rpcControllerFactory;
}
+
+ private static <T> T get(CompletableFuture<T> future) throws IOException {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 0c66faf..80f9d16 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,6 +18,10 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -97,14 +101,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -206,10 +206,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-
/**
* HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
* this is an HBase-internal class as defined in
@@ -399,18 +395,11 @@ public class HBaseAdmin implements Admin {
}
@Override
- public List<RegionInfo> getRegions(final TableName tableName) throws IOException {
- ZooKeeperWatcher zookeeper =
- new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
- new ThrowableAbortable());
- try {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- return new MetaTableLocator().getMetaRegions(zookeeper);
- } else {
- return MetaTableAccessor.getTableRegions(connection, tableName, true);
- }
- } finally {
- zookeeper.close();
+ public List<RegionInfo> getRegions(TableName tableName) throws IOException {
+ if (TableName.isMetaTableName(tableName)) {
+ return Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ } else {
+ return MetaTableAccessor.getTableRegions(connection, tableName, true);
}
}
@@ -1248,9 +1237,9 @@ public class HBaseAdmin implements Admin {
*/
@Override
public void compactRegionServer(final ServerName sn, boolean major)
- throws IOException, InterruptedException {
- for (HRegionInfo region : getOnlineRegions(sn)) {
- compact(sn, region, major, null);
+ throws IOException, InterruptedException {
+ for (RegionInfo region : getRegions(sn)) {
+ compact(this.connection.getAdmin(sn), region, major, null);
}
}
@@ -1295,41 +1284,28 @@ public class HBaseAdmin implements Admin {
CompactType compactType) throws IOException {
switch (compactType) {
case MOB:
- ServerName master = getMasterAddress();
- compact(master, getMobRegionInfo(tableName), major, columnFamily);
+ compact(this.connection.getAdminForMaster(), getMobRegionInfo(tableName), major,
+ columnFamily);
break;
case NORMAL:
- default:
- ZooKeeperWatcher zookeeper = null;
- try {
- checkTableExists(tableName);
- zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
- new ThrowableAbortable());
- List<Pair<RegionInfo, ServerName>> pairs;
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
- } else {
- pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
+ checkTableExists(tableName);
+ for (HRegionLocation loc :connection.locateRegions(tableName, false, false)) {
+ ServerName sn = loc.getServerName();
+ if (sn == null) {
+ continue;
}
- for (Pair<RegionInfo, ServerName> pair: pairs) {
- if (pair.getFirst().isOffline()) continue;
- if (pair.getSecond() == null) continue;
- try {
- compact(pair.getSecond(), pair.getFirst(), major, columnFamily);
- } catch (NotServingRegionException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to" + (major ? " major" : "") + " compact " +
- pair.getFirst() + ": " +
- StringUtils.stringifyException(e));
- }
+ try {
+ compact(this.connection.getAdmin(sn), loc.getRegion(), major, columnFamily);
+ } catch (NotServingRegionException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to" + (major ? " major" : "") + " compact " + loc.getRegion() +
+ ": " + StringUtils.stringifyException(e));
}
}
- } finally {
- if (zookeeper != null) {
- zookeeper.close();
- }
}
break;
+ default:
+ throw new IllegalArgumentException("Unknown compactType: " + compactType);
}
}
@@ -1343,8 +1319,8 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
- private void compactRegion(final byte[] regionName, final byte[] columnFamily,final boolean major)
- throws IOException {
+ private void compactRegion(final byte[] regionName, final byte[] columnFamily,
+ final boolean major) throws IOException {
Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
if (regionServerPair == null) {
throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
@@ -1352,13 +1328,12 @@ public class HBaseAdmin implements Admin {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
}
- compact(regionServerPair.getSecond(), regionServerPair.getFirst(), major, columnFamily);
+ compact(this.connection.getAdmin(regionServerPair.getSecond()), regionServerPair.getFirst(),
+ major, columnFamily);
}
- private void compact(final ServerName sn, final RegionInfo hri,
- final boolean major, final byte [] family)
- throws IOException {
- final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ private void compact(AdminService.BlockingInterface admin, RegionInfo hri, boolean major,
+ byte[] family) throws IOException {
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
@@ -1863,37 +1838,25 @@ public class HBaseAdmin implements Admin {
}
@Override
- public void split(final TableName tableName, final byte [] splitPoint) throws IOException {
- ZooKeeperWatcher zookeeper = null;
- try {
- checkTableExists(tableName);
- zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
- new ThrowableAbortable());
- List<Pair<RegionInfo, ServerName>> pairs;
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
- } else {
- pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
- }
- if (splitPoint == null) {
- LOG.info("SplitPoint is null, will find bestSplitPoint from Region");
+ public void split(final TableName tableName, final byte[] splitPoint) throws IOException {
+ checkTableExists(tableName);
+ for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
+ ServerName sn = loc.getServerName();
+ if (sn == null) {
+ continue;
}
- for (Pair<RegionInfo, ServerName> pair: pairs) {
- // May not be a server for a particular row
- if (pair.getSecond() == null) continue;
- RegionInfo r = pair.getFirst();
- // check for parents
- if (r.isSplitParent()) continue;
- // if a split point given, only split that particular region
- if (r.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
- (splitPoint != null && !r.containsRow(splitPoint))) continue;
- // call out to master to do split now
- splitRegionAsync(pair.getFirst(), splitPoint);
+ RegionInfo r = loc.getRegion();
+ // check for parents
+ if (r.isSplitParent()) {
+ continue;
}
- } finally {
- if (zookeeper != null) {
- zookeeper.close();
+ // if a split point given, only split that particular region
+ if (r.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID ||
+ (splitPoint != null && !r.containsRow(splitPoint))) {
+ continue;
}
+ // call out to master to do split now
+ splitRegionAsync(r, splitPoint);
}
}
@@ -2346,32 +2309,14 @@ public class HBaseAdmin implements Admin {
}
/**
- * Check to see if HBase is running. Throw an exception if not.
- * @param conf system configuration
- * @throws MasterNotRunningException if the master is not running
- * @throws ZooKeeperConnectionException if unable to connect to zookeeper
- * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have
- * protobuf as part of our public API. Use {@link #available(Configuration)}
- */
- // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
- // MOB uses it too.
- // NOTE: hbase-2.0.0 removes ServiceException from the throw.
- @Deprecated
- public static void checkHBaseAvailable(Configuration conf)
- throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
- com.google.protobuf.ServiceException {
- available(conf);
- }
-
- /**
* Is HBase available? Throw an exception if not.
* @param conf system configuration
* @throws MasterNotRunningException if the master is not running.
- * @throws ZooKeeperConnectionException if unable to connect to zookeeper.
- * // TODO do not expose ZKConnectionException.
+ * @throws ZooKeeperConnectionException if unable to connect to zookeeper. // TODO do not expose
+ * ZKConnectionException.
*/
public static void available(final Configuration conf)
- throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
Configuration copyOfConf = HBaseConfiguration.create(conf);
// We set it to make it fail as soon as possible if HBase is not available
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
@@ -2381,26 +2326,6 @@ public class HBaseAdmin implements Admin {
// If the connection exists, we may have a connection to ZK that does not work anymore
try (ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) {
- // Check ZK first.
- // If the connection exists, we may have a connection to ZK that does not work anymore
- ZooKeeperKeepAliveConnection zkw = null;
- try {
- // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
- zkw = ((ConnectionImplementation) connection)
- .getKeepAliveZooKeeperWatcher();
- zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.znodePaths.baseZNode, false);
- } catch (IOException e) {
- throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
- } catch (InterruptedException e) {
- throw (InterruptedIOException)
- new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
- } catch (KeeperException e){
- throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
- } finally {
- if (zkw != null) {
- zkw.close();
- }
- }
// can throw MasterNotRunningException
connection.isMasterRunning();
}
@@ -3232,17 +3157,6 @@ public class HBaseAdmin implements Admin {
}
}
- private ServerName getMasterAddress() throws IOException {
- // TODO: Fix! Reaching into internal implementation!!!!
- ConnectionImplementation connection = (ConnectionImplementation)this.connection;
- ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
- try {
- return MasterAddressTracker.getMasterAddress(zkw);
- } catch (KeeperException e) {
- throw new IOException("Failed to get master server name from MasterAddressTracker", e);
- }
- }
-
@Override
public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
@@ -3311,102 +3225,88 @@ public class HBaseAdmin implements Admin {
* {@inheritDoc}
*/
@Override
- public CompactionState getCompactionState(final TableName tableName,
- CompactType compactType) throws IOException {
+ public CompactionState getCompactionState(final TableName tableName, CompactType compactType)
+ throws IOException {
AdminProtos.GetRegionInfoResponse.CompactionState state =
- AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
+ AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
checkTableExists(tableName);
// TODO: There is no timeout on this controller. Set one!
- final HBaseRpcController rpcController = rpcControllerFactory.newController();
+ HBaseRpcController rpcController = rpcControllerFactory.newController();
switch (compactType) {
case MOB:
final AdminProtos.AdminService.BlockingInterface masterAdmin =
- this.connection.getAdmin(getMasterAddress());
+ this.connection.getAdminForMaster();
Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
- new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
- @Override
- public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
- HRegionInfo info = getMobRegionInfo(tableName);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- info.getRegionName(), true);
- GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
- return response.getCompactionState();
- }
- };
+ new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
+ @Override
+ public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
+ RegionInfo info = getMobRegionInfo(tableName);
+ GetRegionInfoRequest request =
+ RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
+ GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
+ return response.getCompactionState();
+ }
+ };
state = ProtobufUtil.call(callable);
break;
case NORMAL:
- default:
- ZooKeeperWatcher zookeeper = null;
- try {
- List<Pair<RegionInfo, ServerName>> pairs;
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
- new ThrowableAbortable());
- pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
- } else {
- pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
+ for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
+ ServerName sn = loc.getServerName();
+ if (sn == null) {
+ continue;
}
- for (Pair<RegionInfo, ServerName> pair: pairs) {
- if (pair.getFirst().isOffline()) continue;
- if (pair.getSecond() == null) continue;
- final ServerName sn = pair.getSecond();
- final byte [] regionName = pair.getFirst().getRegionName();
- final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
- try {
- Callable<GetRegionInfoResponse> regionInfoCallable =
- new Callable<GetRegionInfoResponse>() {
+ byte[] regionName = loc.getRegion().getRegionName();
+ AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
+ try {
+ Callable<GetRegionInfoResponse> regionInfoCallable =
+ new Callable<GetRegionInfoResponse>() {
@Override
public GetRegionInfoResponse call() throws Exception {
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- regionName, true);
+ GetRegionInfoRequest request =
+ RequestConverter.buildGetRegionInfoRequest(regionName, true);
return snAdmin.getRegionInfo(rpcController, request);
}
};
- GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
- switch (response.getCompactionState()) {
- case MAJOR_AND_MINOR:
+ GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
+ switch (response.getCompactionState()) {
+ case MAJOR_AND_MINOR:
+ return CompactionState.MAJOR_AND_MINOR;
+ case MAJOR:
+ if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
return CompactionState.MAJOR_AND_MINOR;
- case MAJOR:
- if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
- return CompactionState.MAJOR_AND_MINOR;
- }
- state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
- break;
- case MINOR:
- if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
- return CompactionState.MAJOR_AND_MINOR;
- }
- state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
- break;
- case NONE:
- default: // nothing, continue
- }
- } catch (NotServingRegionException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to get compaction state of " +
- pair.getFirst() + ": " +
- StringUtils.stringifyException(e));
- }
- } catch (RemoteException e) {
- if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to get compaction state of " + pair.getFirst() + ": "
- + StringUtils.stringifyException(e));
}
- } else {
- throw e;
+ state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
+ break;
+ case MINOR:
+ if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
+ return CompactionState.MAJOR_AND_MINOR;
+ }
+ state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
+ break;
+ case NONE:
+ default: // nothing, continue
+ }
+ } catch (NotServingRegionException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
+ StringUtils.stringifyException(e));
+ }
+ } catch (RemoteException e) {
+ if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
+ StringUtils.stringifyException(e));
}
+ } else {
+ throw e;
}
}
- } finally {
- if (zookeeper != null) {
- zookeeper.close();
- }
}
break;
+ default:
+ throw new IllegalArgumentException("Unknowne compactType: " + compactType);
}
- if(state != null) {
+ if (state != null) {
return ProtobufUtil.createCompactionState(state);
}
return null;
@@ -3927,9 +3827,9 @@ public class HBaseAdmin implements Admin {
});
}
- private HRegionInfo getMobRegionInfo(TableName tableName) {
- return new HRegionInfo(tableName, Bytes.toBytes(".mob"),
- HConstants.EMPTY_END_ROW, false, 0);
+ private RegionInfo getMobRegionInfo(TableName tableName) {
+ return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(".mob")).setRegionId(0)
+ .build();
}
private RpcControllerFactory getRpcControllerFactory() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index defb3ec..6dc46d3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -30,14 +30,13 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* A cache implementation for region locations from meta.
@@ -88,7 +87,7 @@ public class MetaCache {
// this one. the exception case is when the endkey is
// HConstants.EMPTY_END_ROW, signifying that the region we're
// checking is actually the last region in the table.
- byte[] endKey = possibleRegion.getRegionLocation().getRegionInfo().getEndKey();
+ byte[] endKey = possibleRegion.getRegionLocation().getRegion().getEndKey();
// Here we do direct Bytes.compareTo and not doing CellComparator/MetaCellComparator path.
// MetaCellComparator is for comparing against data in META table which need special handling.
// Not doing that is ok for this case because
@@ -117,7 +116,7 @@ public class MetaCache {
public void cacheLocation(final TableName tableName, final ServerName source,
final HRegionLocation location) {
assert source != null;
- byte [] startKey = location.getRegionInfo().getStartKey();
+ byte [] startKey = location.getRegion().getStartKey();
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
RegionLocations locations = new RegionLocations(new HRegionLocation[] {location}) ;
RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
@@ -132,7 +131,7 @@ public class MetaCache {
// If the server in cache sends us a redirect, assume it's always valid.
HRegionLocation oldLocation = oldLocations.getRegionLocation(
- location.getRegionInfo().getReplicaId());
+ location.getRegion().getReplicaId());
boolean force = oldLocation != null && oldLocation.getServerName() != null
&& oldLocation.getServerName().equals(source);
@@ -157,7 +156,7 @@ public class MetaCache {
* @param locations the new locations
*/
public void cacheLocation(final TableName tableName, final RegionLocations locations) {
- byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey();
+ byte [] startKey = locations.getRegionLocation().getRegion().getStartKey();
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
boolean isNewCacheEntry = (oldLocation == null);
@@ -299,7 +298,7 @@ public class MetaCache {
RegionLocations regionLocations = getCachedLocation(tableName, row);
if (regionLocations != null) {
- byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+ byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
boolean removed = tableLocations.remove(startKey, regionLocations);
if (removed) {
if (metrics != null) {
@@ -326,7 +325,7 @@ public class MetaCache {
HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
if (toBeRemoved != null) {
RegionLocations updatedLocations = regionLocations.remove(replicaId);
- byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+ byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
boolean removed;
if (updatedLocations.isEmpty()) {
removed = tableLocations.remove(startKey, regionLocations);
@@ -356,7 +355,7 @@ public class MetaCache {
if (regionLocations != null) {
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
if (updatedLocations != regionLocations) {
- byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+ byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
boolean removed = false;
if (updatedLocations.isEmpty()) {
removed = tableLocations.remove(startKey, regionLocations);
@@ -380,7 +379,7 @@ public class MetaCache {
* Deletes the cached location of the region if necessary, based on some error from source.
* @param hri The region in question.
*/
- public void clearCache(HRegionInfo hri) {
+ public void clearCache(RegionInfo hri) {
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
if (regionLocations != null) {
@@ -410,17 +409,17 @@ public class MetaCache {
if (location == null) {
return;
}
- TableName tableName = location.getRegionInfo().getTable();
+ TableName tableName = location.getRegion().getTable();
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
- RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey());
+ RegionLocations regionLocations = tableLocations.get(location.getRegion().getStartKey());
if (regionLocations != null) {
RegionLocations updatedLocations = regionLocations.remove(location);
boolean removed;
if (updatedLocations != regionLocations) {
if (updatedLocations.isEmpty()) {
- removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations);
+ removed = tableLocations.remove(location.getRegion().getStartKey(), regionLocations);
} else {
- removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations,
+ removed = tableLocations.replace(location.getRegion().getStartKey(), regionLocations,
updatedLocations);
}
if (removed) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
deleted file mode 100644
index 4d0527a..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Cluster registry.
- * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.
- * Internal use only.
- */
-@InterfaceAudience.Private
-interface Registry {
- /**
- * @param connection
- */
- void init(Connection connection);
-
- /**
- * @return Meta region location
- * @throws IOException
- */
- RegionLocations getMetaRegionLocation() throws IOException;
-
- /**
- * @return Cluster id.
- */
- String getClusterId();
-
- /**
- * @return Count of 'running' regionservers
- * @throws IOException
- */
- int getCurrentNrHRS() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
deleted file mode 100644
index 7b2ac0b..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Get instance of configured Registry.
- */
-@InterfaceAudience.Private
-final class RegistryFactory {
- static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
-
- private RegistryFactory() {}
-
- /**
- * @return The cluster registry implementation to use.
- * @throws IOException
- */
- static Registry getRegistry(final Connection connection)
- throws IOException {
- String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY,
- ZooKeeperRegistry.class.getName());
- Registry registry = null;
- try {
- registry = (Registry)Class.forName(registryClass).newInstance();
- } catch (Throwable t) {
- throw new IOException(t);
- }
- registry.init(connection);
- return registry;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
index 9688ad3..fedd527 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
-import static org.apache.hadoop.hbase.HRegionInfo.DEFAULT_REPLICA_ID;
-import static org.apache.hadoop.hbase.HRegionInfo.FIRST_META_REGIONINFO;
+import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
+import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
-import static org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.removeMetaData;
+import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -42,17 +42,18 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.data.Stat;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
+
/**
* Fetch the registry data from zookeeper.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java
deleted file mode 100644
index 34f7b23..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * We inherit the current ZooKeeperWatcher implementation to change the semantic
- * of the close: the new close won't immediately close the connection but
- * will have a keep alive. See {@link ConnectionImplementation}.
- * This allows to make it available with a consistent interface. The whole
- * ZooKeeperWatcher use in ConnectionImplementation will be then changed to remove the
- * watcher part.
- *
- * This class is intended to be used internally by HBase classes; but not by
- * final user code. Hence it's package protected.
- */
-class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
- ZooKeeperKeepAliveConnection(
- Configuration conf, String descriptor,
- ConnectionImplementation conn) throws IOException {
- super(conf, descriptor, conn);
- }
-
- @Override
- public void close() {
- if (this.abortable != null) {
- ((ConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
- }
- }
-
- void internalClose(){
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
deleted file mode 100644
index 746382f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * A cluster registry that stores to zookeeper.
- */
-@InterfaceAudience.Private
-class ZooKeeperRegistry implements Registry {
- private static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class);
- // Needs an instance of hci to function. Set after construct this instance.
- ConnectionImplementation hci;
-
- @Override
- public void init(Connection connection) {
- if (!(connection instanceof ConnectionImplementation)) {
- throw new RuntimeException("This registry depends on ConnectionImplementation");
- }
- this.hci = (ConnectionImplementation)connection;
- }
-
- @Override
- public RegionLocations getMetaRegionLocation() throws IOException {
- ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
- }
- List<ServerName> servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout,
- hci.getConfiguration());
- if (LOG.isTraceEnabled()) {
- if (servers == null) {
- LOG.trace("Looked up meta region location, connection=" + this +
- "; servers = null");
- } else {
- StringBuilder str = new StringBuilder();
- for (ServerName s : servers) {
- str.append(s.toString());
- str.append(" ");
- }
- LOG.trace("Looked up meta region location, connection=" + this +
- "; servers = " + str.toString());
- }
- }
- if (servers == null) return null;
- HRegionLocation[] locs = new HRegionLocation[servers.size()];
- int i = 0;
- for (ServerName server : servers) {
- RegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
- RegionInfoBuilder.FIRST_META_REGIONINFO, i);
- if (server == null) locs[i++] = null;
- else locs[i++] = new HRegionLocation(h, server, 0);
- }
- return new RegionLocations(locs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- } finally {
- zkw.close();
- }
- }
-
- private String clusterId = null;
-
- @Override
- public String getClusterId() {
- if (this.clusterId != null) return this.clusterId;
- // No synchronized here, worse case we will retrieve it twice, that's
- // not an issue.
- ZooKeeperKeepAliveConnection zkw = null;
- try {
- zkw = hci.getKeepAliveZooKeeperWatcher();
- this.clusterId = ZKClusterId.readClusterIdZNode(zkw);
- if (this.clusterId == null) {
- LOG.info("ClusterId read in ZooKeeper is null");
- }
- } catch (KeeperException e) {
- LOG.warn("Can't retrieve clusterId from ZooKeeper", e);
- } catch (IOException e) {
- LOG.warn("Can't retrieve clusterId from ZooKeeper", e);
- } finally {
- if (zkw != null) zkw.close();
- }
- return this.clusterId;
- }
-
- @Override
- public int getCurrentNrHRS() throws IOException {
- ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
- try {
- // We go to zk rather than to master to get count of regions to avoid
- // HTable having a Master dependency. See HBase-2828
- return ZKUtil.getNumberOfChildren(zkw, zkw.znodePaths.rsZNode);
- } catch (KeeperException ke) {
- throw new IOException("Unexpected ZooKeeper exception", ke);
- } finally {
- zkw.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
index 04e2662..fba637f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
@@ -100,7 +100,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
* @return info port or 0 if timed out or exceptions
*/
public int getBackupMasterInfoPort(final ServerName sn) {
- String backupZNode = ZKUtil.joinZNode(watcher.znodePaths.backupMasterAddressesZNode,
+ String backupZNode = ZNodePaths.joinZNode(watcher.znodePaths.backupMasterAddressesZNode,
sn.toString());
try {
byte[] data = ZKUtil.getData(watcher, backupZNode);
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
index 327bafb..94377c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -18,20 +18,24 @@
*/
package org.apache.hadoop.hbase.zookeeper;
+import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.appendMetaData;
+import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
+
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -45,8 +49,6 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
/**
* A zookeeper that can handle 'recoverable' errors.
@@ -83,22 +85,8 @@ public class RecoverableZooKeeper {
private Watcher watcher;
private int sessionTimeout;
private String quorumServers;
- private final Random salter;
private final ZooKeeperMetricsListener metrics;
- // The metadata attached to each piece of data has the
- // format:
- // <magic> 1-byte constant
- // <id length> 4-byte big-endian integer (length of next field)
- // <id> identifier corresponding uniquely to this process
- // It is prepended to the data supplied by the user.
-
- // the magic number is to be backward compatible
- private static final byte MAGIC =(byte) 0XFF;
- private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
- private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
- private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
-
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
throws IOException {
@@ -129,7 +117,6 @@ public class RecoverableZooKeeper {
this.quorumServers = quorumServers;
this.metrics = new MetricsZooKeeper();
try {checkZk();} catch (Exception x) {/* ignore */}
- salter = new Random();
}
/**
@@ -472,7 +459,7 @@ public class RecoverableZooKeeper {
try {
traceScope = Trace.startSpan("RecoverableZookeeper.setData");
RetryCounter retryCounter = retryCounterFactory.create();
- byte[] newData = appendMetaData(data);
+ byte[] newData = appendMetaData(id, data);
boolean isRetry = false;
long startTime;
while (true) {
@@ -622,7 +609,7 @@ public class RecoverableZooKeeper {
TraceScope traceScope = null;
try {
traceScope = Trace.startSpan("RecoverableZookeeper.create");
- byte[] newData = appendMetaData(data);
+ byte[] newData = appendMetaData(id, data);
switch (createMode) {
case EPHEMERAL:
case PERSISTENT:
@@ -745,14 +732,14 @@ public class RecoverableZooKeeper {
for (Op op : ops) {
if (op.getType() == ZooDefs.OpCode.create) {
CreateRequest create = (CreateRequest)op.toRequestRecord();
- preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
+ preparedOps.add(Op.create(create.getPath(), appendMetaData(id, create.getData()),
create.getAcl(), create.getFlags()));
} else if (op.getType() == ZooDefs.OpCode.delete) {
// no need to appendMetaData for delete
preparedOps.add(op);
} else if (op.getType() == ZooDefs.OpCode.setData) {
SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
- preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
+ preparedOps.add(Op.setData(setData.getPath(), appendMetaData(id, setData.getData()),
setData.getVersion()));
} else {
throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
@@ -822,41 +809,6 @@ public class RecoverableZooKeeper {
return null;
}
- public static byte[] removeMetaData(byte[] data) {
- if(data == null || data.length == 0) {
- return data;
- }
- // check the magic data; to be backward compatible
- byte magic = data[0];
- if(magic != MAGIC) {
- return data;
- }
-
- int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
- int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
- int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
-
- byte[] newData = new byte[dataLength];
- System.arraycopy(data, dataOffset, newData, 0, dataLength);
- return newData;
- }
-
- private byte[] appendMetaData(byte[] data) {
- if(data == null || data.length == 0){
- return data;
- }
- byte[] salt = Bytes.toBytes(salter.nextLong());
- int idLength = id.length + salt.length;
- byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
- int pos = 0;
- pos = Bytes.putByte(newData, pos, MAGIC);
- pos = Bytes.putInt(newData, pos, idLength);
- pos = Bytes.putBytes(newData, pos, id, 0, id.length);
- pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
- pos = Bytes.putBytes(newData, pos, data, 0, data.length);
- return newData;
- }
-
public synchronized long getSessionId() {
return zk == null ? -1 : zk.getSessionId();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetadata.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetadata.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetadata.java
new file mode 100644
index 0000000..5072706
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetadata.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The metadata append to the start of data on zookeeper.
+ */
+@InterfaceAudience.Private
+public class ZKMetadata {
+
+ private ZKMetadata() {
+ }
+
+ // The metadata attached to each piece of data has the format:
+ // <magic> 1-byte constant
+ // <id length> 4-byte big-endian integer (length of next field)
+ // <id> identifier corresponding uniquely to this process
+ // It is prepended to the data supplied by the user.
+
+ // the magic number is to be backward compatible
+ private static final byte MAGIC = (byte) 0XFF;
+ private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
+ private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
+ private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
+
+ public static byte[] appendMetaData(byte[] id, byte[] data) {
+ if (data == null || data.length == 0) {
+ return data;
+ }
+ byte[] salt = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
+ int idLength = id.length + salt.length;
+ byte[] newData = new byte[MAGIC_SIZE + ID_LENGTH_SIZE + idLength + data.length];
+ int pos = 0;
+ pos = Bytes.putByte(newData, pos, MAGIC);
+ pos = Bytes.putInt(newData, pos, idLength);
+ pos = Bytes.putBytes(newData, pos, id, 0, id.length);
+ pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
+ pos = Bytes.putBytes(newData, pos, data, 0, data.length);
+ return newData;
+ }
+
+ public static byte[] removeMetaData(byte[] data) {
+ if (data == null || data.length == 0) {
+ return data;
+ }
+ // check the magic data; to be backward compatible
+ byte magic = data[0];
+ if (magic != MAGIC) {
+ return data;
+ }
+
+ int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
+ int dataLength = data.length - MAGIC_SIZE - ID_LENGTH_SIZE - idLength;
+ int dataOffset = MAGIC_SIZE + ID_LENGTH_SIZE + idLength;
+
+ byte[] newData = new byte[dataLength];
+ System.arraycopy(data, dataOffset, newData, 0, dataLength);
+ return newData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72270866/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 717d969..61a5a42 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -87,8 +87,6 @@ import org.apache.zookeeper.server.ZooKeeperSaslServer;
public class ZKUtil {
private static final Log LOG = LogFactory.getLog(ZKUtil.class);
- // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
- public static final char ZNODE_PATH_SEPARATOR = '/';
private static int zkDumpConnectionTimeOut;
/**
@@ -302,28 +300,13 @@ public class ZKUtil {
//
// Helper methods
//
-
- /**
- * Join the prefix znode name with the suffix znode name to generate a proper
- * full znode name.
- *
- * Assumes prefix does not end with slash and suffix does not begin with it.
- *
- * @param prefix beginning of znode name
- * @param suffix ending of znode name
- * @return result of properly joining prefix with suffix
- */
- public static String joinZNode(String prefix, String suffix) {
- return prefix + ZNODE_PATH_SEPARATOR + suffix;
- }
-
/**
* Returns the full path of the immediate parent of the specified node.
* @param node path to get parent of
* @return parent of path, null if passed the root node or an invalid node
*/
public static String getParent(String node) {
- int idx = node.lastIndexOf(ZNODE_PATH_SEPARATOR);
+ int idx = node.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR);
return idx <= 0 ? null : node.substring(0, idx);
}
@@ -477,7 +460,7 @@ public class ZKUtil {
return null;
}
for (String child : children) {
- watchAndCheckExists(zkw, joinZNode(znode, child));
+ watchAndCheckExists(zkw, ZNodePaths.joinZNode(znode, child));
}
return children;
}
@@ -744,7 +727,7 @@ public class ZKUtil {
if (nodes != null) {
List<NodeAndData> newNodes = new ArrayList<>();
for (String node : nodes) {
- String nodePath = ZKUtil.joinZNode(baseNode, node);
+ String nodePath = ZNodePaths.joinZNode(baseNode, node);
byte[] data = ZKUtil.getDataAndWatch(zkw, nodePath);
newNodes.add(new NodeAndData(nodePath, data));
}
@@ -1774,7 +1757,7 @@ public class ZKUtil {
sb.append("\n").append(replicationZnode).append(": ");
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
for (String child : children) {
- String znode = joinZNode(replicationZnode, child);
+ String znode = ZNodePaths.joinZNode(replicationZnode, child);
if (znode.equals(zkw.znodePaths.peersZNode)) {
appendPeersZnodes(zkw, znode, sb);
} else if (znode.equals(zkw.znodePaths.queuesZNode)) {
@@ -1789,7 +1772,7 @@ public class ZKUtil {
StringBuilder sb) throws KeeperException {
sb.append("\n").append(hfileRefsZnode).append(": ");
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
- String znodeToProcess = ZKUtil.joinZNode(hfileRefsZnode, peerIdZnode);
+ String znodeToProcess = ZNodePaths.joinZNode(hfileRefsZnode, peerIdZnode);
sb.append("\n").append(znodeToProcess).append(": ");
List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
int size = peerHFileRefsZnodes.size();
@@ -1839,7 +1822,7 @@ public class ZKUtil {
}
}
for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
- stack.add(ZKUtil.joinZNode(znodeToProcess, zNodeChild));
+ stack.add(ZNodePaths.joinZNode(znodeToProcess, zNodeChild));
}
} while (stack.size() > 0);
}
@@ -1849,7 +1832,7 @@ public class ZKUtil {
int pblen = ProtobufUtil.lengthOfPBMagic();
sb.append("\n").append(peersZnode).append(": ");
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, peersZnode)) {
- String znodeToProcess = ZKUtil.joinZNode(peersZnode, peerIdZnode);
+ String znodeToProcess = ZNodePaths.joinZNode(peersZnode, peerIdZnode);
byte[] data;
try {
data = ZKUtil.getData(zkw, znodeToProcess);
@@ -1879,7 +1862,7 @@ public class ZKUtil {
int pblen = ProtobufUtil.lengthOfPBMagic();
for (String child : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
if (!child.equals(peerState)) continue;
- String peerStateZnode = ZKUtil.joinZNode(znodeToProcess, child);
+ String peerStateZnode = ZNodePaths.joinZNode(znodeToProcess, child);
sb.append("\n").append(peerStateZnode).append(": ");
byte[] peerStateData;
try {
@@ -2042,7 +2025,7 @@ public class ZKUtil {
if (children == null) return;
for (String child : children) {
LOG.debug(prefix + child);
- String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+ String node = ZNodePaths.joinZNode(root.equals("/") ? "" : root, child);
logZKTree(zkw, node, prefix + "---");
}
}