You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ri...@apache.org on 2022/07/14 04:30:04 UTC
[ozone] branch master updated: HDDS-6433. Refactor OMFailoverProxyProvider to provide a base for OM and GrpcOM FailoverProxyProviders (#3389)
This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 843fac2fb6 HDDS-6433. Refactor OMFailoverProxyProvider to provide a base for OM and GrpcOM FailoverProxyProviders (#3389)
843fac2fb6 is described below
commit 843fac2fb646eecfc33103fdb16eaf77a66ca062
Author: Neil Joshi <ne...@gmail.com>
AuthorDate: Wed Jul 13 22:29:59 2022 -0600
HDDS-6433. Refactor OMFailoverProxyProvider to provide a base for OM and GrpcOM FailoverProxyProviders (#3389)
---
.../ozone/om/ha/GrpcOMFailoverProxyProvider.java | 102 ++++---
.../om/ha/HadoopRpcOMFailoverProxyProvider.java | 240 ++++++++++++++++
...vider.java => OMFailoverProxyProviderBase.java} | 304 +++++----------------
.../ozone/om/protocolPB/GrpcOmTransport.java | 27 +-
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 20 +-
.../protocolPB/OMAdminProtocolClientSideImpl.java | 10 +-
.../OMInterServiceProtocolClientSideImpl.java | 12 +-
.../ozone/om/ha/TestOMFailoverProxyProvider.java | 12 +-
.../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 42 ++-
.../client/rpc/TestOzoneRpcClientAbstract.java | 4 +-
.../hadoop/ozone/om/OmFailoverProxyUtil.java | 4 +-
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 6 +-
.../ozone/om/TestOzoneManagerHAMetadataOnly.java | 36 +--
.../ozone/om/TestOzoneManagerHAWithData.java | 6 +-
.../ozone/om/TestOzoneManagerHAWithFailover.java | 6 +-
.../hadoop/ozone/om/failover/TestOMFailovers.java | 11 +-
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 17 +-
.../ozone/protocolPB/TestGrpcOmTransport.java | 28 +-
18 files changed, 536 insertions(+), 351 deletions(-)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
index 498f935974..002c2012e7 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -17,18 +17,21 @@
*/
package org.apache.hadoop.ozone.om.ha;
+import io.grpc.Status;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
-import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -36,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
+import io.grpc.StatusRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
@@ -48,39 +54,29 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
* connecting to another OM node from the list of proxies.
*/
public class GrpcOMFailoverProxyProvider<T> extends
- OMFailoverProxyProvider<T> {
-
- private Map<String, String> omAddresses;
+ OMFailoverProxyProviderBase<T> {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(GrpcOMFailoverProxyProvider.class);
public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
- UserGroupInformation ugi,
String omServiceId,
Class<T> protocol) throws IOException {
- super(configuration, ugi, omServiceId, protocol);
+ super(configuration, omServiceId, protocol);
}
@Override
protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
throws IOException {
- // to be used for base class omProxies,
- // ProxyInfo not applicable for gRPC, just need key set
- Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
- // to be used for base class omProxyInfos
- // OMProxyInfo not applicable for gRPC, just need key set
- Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
- List<String> omNodeIDList = new ArrayList<>();
- omAddresses = new HashMap<>();
Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+ Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+ List<String> omNodeIDList = new ArrayList<>();
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
omSvcId, nodeId);
-
Optional<String> hostaddr = getHostNameFromConfigKeys(config,
rpcAddrKey);
-
OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
omSvcId, nodeId),
@@ -88,15 +84,15 @@ public class GrpcOMFailoverProxyProvider<T> extends
if (nodeId == null) {
nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
}
- omProxiesNodeIdKeyset.put(nodeId, null);
- omProxyInfosNodeIdKeyset.put(nodeId, null);
if (hostaddr.isPresent()) {
- omAddresses.put(nodeId,
- hostaddr.get() + ":"
- + hostport.orElse(config
- .getObject(GrpcOmTransport
- .GrpcOmTransportConfig.class)
- .getPort()));
+ ProxyInfo<T> proxyInfo =
+ new ProxyInfo<>(createOMProxy(),
+ hostaddr.get() + ":"
+ + hostport.orElse(config
+ .getObject(GrpcOmTransport
+ .GrpcOmTransportConfig.class)
+ .getPort()));
+ omProxies.put(nodeId, proxyInfo);
} else {
LOG.error("expected host address not defined for: {}", rpcAddrKey);
throw new ConfigurationException(rpcAddrKey + "is not defined");
@@ -104,37 +100,63 @@ public class GrpcOMFailoverProxyProvider<T> extends
omNodeIDList.add(nodeId);
}
- if (omProxiesNodeIdKeyset.isEmpty()) {
+ if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
+ setOmProxies(omProxies);
+ setOmNodeIDList(omNodeIDList);
+ }
- // set base class omProxies, omProxyInfos, omNodeIDList
+ private T createOMProxy() throws IOException {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ Configuration hadoopConf =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(getConf());
+ return (T) RPC.getProxy(getInterface(), 0, addr, hadoopConf);
+ }
- // omProxies needed in base class
- // omProxies.size == number of om nodes
- // omProxies key needs to be valid nodeid
- // omProxyInfos keyset needed in base class
- setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+ /**
+ * Get the proxy object which should be used until the next failover event
+ * occurs. RPC proxy object is intialized lazily.
+ * @return the OM proxy object to invoke methods upon
+ */
+ @Override
+ public synchronized ProxyInfo<T> getProxy() {
+ return getOMProxyMap().get(getCurrentProxyOMNodeId());
}
@Override
- protected Text computeDelegationTokenService() {
- return new Text();
+ protected synchronized boolean shouldFailover(Exception ex) {
+ if (ex instanceof StatusRuntimeException) {
+ StatusRuntimeException srexp = (StatusRuntimeException)ex;
+ Status status = srexp.getStatus();
+ if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+ LOG.debug("Grpc response has invalid length, {}", srexp.getMessage());
+ return false;
+ } else if (status.getCode() == Status.Code.DATA_LOSS) {
+ LOG.debug("Grpc unrecoverable data loss or corruption, {}",
+ srexp.getMessage());
+ return false;
+ }
+ }
+ return super.shouldFailover(ex);
}
+ @Override
+ public synchronized void close() throws IOException { }
+
// need to throw if nodeID not in omAddresses
public String getGrpcProxyAddress(String nodeId) throws IOException {
- if (omAddresses.containsKey(nodeId)) {
- return omAddresses.get(nodeId);
+ Map<String, ProxyInfo<T>> omProxies = getOMProxyMap();
+ if (omProxies.containsKey(nodeId)) {
+ return omProxies.get(nodeId).proxyInfo;
} else {
- LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
+ LOG.error("expected nodeId not found in omProxies for proxyhost {}",
nodeId);
throw new IOException(
- "expected nodeId not found in omAddresses for proxyhost");
+ "expected nodeId not found in omProxies for proxyhost");
}
-
}
public List<String> getGrpcOmNodeIDList() {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
new file mode 100644
index 0000000000..f27db4e04a
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+
+/**
+ * A failover proxy provider implementation which allows clients to configure
+ * multiple OMs to connect to. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class HadoopRpcOMFailoverProxyProvider<T> extends
+ OMFailoverProxyProviderBase<T> {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
+
+ private final long omVersion;
+ private final Text delegationTokenService;
+ private final UserGroupInformation ugi;
+ private Map<String, OMProxyInfo> omProxyInfos;
+ private List<String> retryExceptions = new ArrayList<>();
+
+ // HadoopRpcOMFailoverProxyProvider, on encountering certain exception,
+ // tries each OM once in a round robin fashion. After that it waits
+ // for configured time before attempting to contact all the OMs again.
+ // For other exceptions such as LeaderNotReadyException, the same OM
+ // is contacted again with a linearly increasing wait time.
+
+ public HadoopRpcOMFailoverProxyProvider(ConfigurationSource configuration,
+ UserGroupInformation ugi,
+ String omServiceId,
+ Class<T> protocol) throws IOException {
+ super(configuration, omServiceId, protocol);
+ this.ugi = ugi;
+ this.omVersion = RPC.getProtocolVersion(protocol);
+ this.delegationTokenService = computeDelegationTokenService();
+ }
+
+ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
+ throws IOException {
+ Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+ this.omProxyInfos = new HashMap<>();
+ List<String> omNodeIDList = new ArrayList<>();
+
+ Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
+ omSvcId);
+
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+ String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omSvcId, nodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+ if (rpcAddrStr == null) {
+ continue;
+ }
+
+ OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
+ rpcAddrStr);
+
+ if (omProxyInfo.getAddress() != null) {
+ // For a non-HA OM setup, nodeId might be null. If so, we assign it
+ // the default value
+ if (nodeId == null) {
+ nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+ }
+ // ProxyInfo will be set during first time call to server.
+ omProxies.put(nodeId, null);
+ omProxyInfos.put(nodeId, omProxyInfo);
+ omNodeIDList.add(nodeId);
+ } else {
+ LOG.error("Failed to create OM proxy for {} at address {}",
+ nodeId, rpcAddrStr);
+ }
+ }
+
+ if (omProxies.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for OM. Please configure the system with "
+ + OZONE_OM_ADDRESS_KEY);
+ }
+ setOmProxies(omProxies);
+ setOmNodeIDList(omNodeIDList);
+ }
+
+ private T createOMProxy(InetSocketAddress omAddress) throws IOException {
+ Configuration hadoopConf =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(getConf());
+ RPC.setProtocolEngine(hadoopConf, getInterface(), ProtobufRpcEngine.class);
+
+ // FailoverOnNetworkException ensures that the IPC layer does not attempt
+ // retries on the same OM in case of connection exception. This retry
+ // policy essentially results in TRY_ONCE_THEN_FAIL.
+ RetryPolicy connectionRetryPolicy = RetryPolicies
+ .failoverOnNetworkException(0);
+
+ return (T) RPC.getProtocolProxy(getInterface(), omVersion,
+ omAddress, ugi, hadoopConf, NetUtils.getDefaultSocketFactory(
+ hadoopConf), (int) OmUtils.getOMClientRpcTimeOut(getConf()),
+ connectionRetryPolicy).getProxy();
+
+ }
+
+ /**
+ * Get the proxy object which should be used until the next failover event
+ * occurs. RPC proxy object is intialized lazily.
+ * @return the OM proxy object to invoke methods upon
+ */
+ @Override
+ public synchronized ProxyInfo<T> getProxy() {
+ ProxyInfo currentProxyInfo = getOMProxyMap().get(getCurrentProxyOMNodeId());
+ if (currentProxyInfo == null) {
+ currentProxyInfo = createOMProxy(getCurrentProxyOMNodeId());
+ }
+ return currentProxyInfo;
+ }
+
+ /**
+ * Creates proxy object.
+ */
+ protected ProxyInfo createOMProxy(String nodeId) {
+ OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
+ InetSocketAddress address = omProxyInfo.getAddress();
+ ProxyInfo proxyInfo;
+ try {
+ T proxy = createOMProxy(address);
+ // Create proxyInfo here, to make it work with all Hadoop versions.
+ proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
+ getOMProxyMap().put(nodeId, proxyInfo);
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to OM at {}",
+ this.getClass().getSimpleName(), address, ioe);
+ throw new RuntimeException(ioe);
+ }
+ return proxyInfo;
+ }
+
+ public Text getCurrentProxyDelegationToken() {
+ return delegationTokenService;
+ }
+
+ protected Text computeDelegationTokenService() {
+ // For HA, this will return "," separated address of all OM's.
+ List<String> addresses = new ArrayList<>();
+
+ for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
+ omProxyInfos.entrySet()) {
+ Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
+
+ // During client object creation when one of the OM configured address
+ // in unreachable, dtService can be null.
+ if (dtService != null) {
+ addresses.add(dtService.toString());
+ }
+ }
+
+ if (!addresses.isEmpty()) {
+ Collections.sort(addresses);
+ return new Text(String.join(",", addresses));
+ } else {
+ // If all OM addresses are unresolvable, set dt service to null. Let
+ // this fail in later step when during connection setup.
+ return null;
+ }
+ }
+
+ /**
+ * Close all the proxy objects which have been opened over the lifetime of
+ * the proxy provider.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ for (ProxyInfo<T> proxyInfo : getOMProxies()) {
+ if (proxyInfo != null) {
+ RPC.stopProxy(proxyInfo.proxy);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public List<OMProxyInfo> getOMProxyInfos() {
+ return new ArrayList<OMProxyInfo>(omProxyInfos.values());
+ }
+
+ @VisibleForTesting
+ protected void setProxiesForTesting(
+ Map<String, ProxyInfo<T>> setOMProxies,
+ Map<String, OMProxyInfo> setOMProxyInfos,
+ List<String> setOMNodeIDList) {
+ setOmProxies(setOMProxies);
+ this.omProxyInfos = setOMProxyInfos;
+ setOmNodeIDList(setOMNodeIDList);
+ }
+
+}
+
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
similarity index 65%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index 985b3e76ba..b281e22783 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -19,79 +19,57 @@
package org.apache.hadoop.ozone.om.ha;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
-
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
- * A failover proxy provider implementation which allows clients to configure
+ * A failover proxy provider base abstract class.
+ * Provides common methods for failover proxy provider
+ * implementations. Failover proxy provider allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
-public class OMFailoverProxyProvider<T> implements
+public abstract class OMFailoverProxyProviderBase<T> implements
FailoverProxyProvider<T>, Closeable {
public static final Logger LOG =
- LoggerFactory.getLogger(OMFailoverProxyProvider.class);
+ LoggerFactory.getLogger(OMFailoverProxyProviderBase.class);
- private final String omServiceId;
private final ConfigurationSource conf;
+ private final String omServiceId;
private final Class<T> protocolClass;
- private final long omVersion;
- private final UserGroupInformation ugi;
- private final Text delegationTokenService;
// Map of OMNodeID to its proxy
private Map<String, ProxyInfo<T>> omProxies;
- private Map<String, OMProxyInfo> omProxyInfos;
private List<String> omNodeIDList;
- private String nextProxyOMNodeId;
private String currentProxyOMNodeId;
- private int nextProxyIndex;
private int currentProxyIndex;
-
- private List<String> retryExceptions = new ArrayList<>();
+ private String nextProxyOMNodeId;
+ private int nextProxyIndex;
// OMFailoverProxyProvider, on encountering certain exception, tries each OM
// once in a round robin fashion. After that it waits for configured time
@@ -105,70 +83,61 @@ public class OMFailoverProxyProvider<T> implements
private Set<String> accessControlExceptionOMs = new HashSet<>();
private boolean performFailoverDone;
- public OMFailoverProxyProvider(ConfigurationSource configuration,
- UserGroupInformation ugi, String omServiceId, Class<T> protocol)
- throws IOException {
+ public OMFailoverProxyProviderBase(ConfigurationSource configuration,
+ String omServiceId,
+ Class<T> protocol) throws IOException {
this.conf = configuration;
- this.omVersion = RPC.getProtocolVersion(protocol);
- this.ugi = ugi;
- this.omServiceId = omServiceId;
this.protocolClass = protocol;
this.performFailoverDone = true;
- loadOMClientConfigs(conf, this.omServiceId);
- this.delegationTokenService = computeDelegationTokenService();
-
- nextProxyIndex = 0;
- nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
- currentProxyIndex = 0;
- currentProxyOMNodeId = nextProxyOMNodeId;
+ this.omServiceId = omServiceId;
waitBetweenRetries = conf.getLong(
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
- }
-
- protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
- throws IOException {
- this.omProxies = new HashMap<>();
- this.omProxyInfos = new HashMap<>();
- this.omNodeIDList = new ArrayList<>();
- Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
- omSvcId);
+ loadOMClientConfigs(conf, omServiceId);
+ Preconditions.checkNotNull(omProxies);
+ Preconditions.checkNotNull(omNodeIDList);
- for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
- String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
- omSvcId, nodeId);
- String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
- if (rpcAddrStr == null) {
- continue;
- }
+ nextProxyIndex = 0;
+ nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+ currentProxyIndex = 0;
+ currentProxyOMNodeId = nextProxyOMNodeId;
+ }
- OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
- rpcAddrStr);
+ protected abstract void loadOMClientConfigs(ConfigurationSource config,
+ String omSvcId)
+ throws IOException;
- if (omProxyInfo.getAddress() != null) {
- // For a non-HA OM setup, nodeId might be null. If so, we assign it
- // the default value
- if (nodeId == null) {
- nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
- }
- // ProxyInfo will be set during first time call to server.
- omProxies.put(nodeId, null);
- omProxyInfos.put(nodeId, omProxyInfo);
- omNodeIDList.add(nodeId);
+ protected synchronized boolean shouldFailover(Exception ex) {
+ Throwable unwrappedException = HddsUtils.getUnwrappedException(ex);
+ if (unwrappedException instanceof AccessControlException ||
+ unwrappedException instanceof SecretManager.InvalidToken) {
+ // Retry all available OMs once before failing with
+ // AccessControlException.
+ if (accessControlExceptionOMs.contains(nextProxyOMNodeId)) {
+ accessControlExceptionOMs.clear();
+ return false;
} else {
- LOG.error("Failed to create OM proxy for {} at address {}",
- nodeId, rpcAddrStr);
+ accessControlExceptionOMs.add(nextProxyOMNodeId);
+ if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
+ return false;
+ }
+ }
+ } else if (HddsUtils.shouldNotFailoverOnRpcException(unwrappedException)) {
+ return false;
+ } else if (ex instanceof StateMachineException) {
+ StateMachineException smEx = (StateMachineException) ex;
+ Throwable cause = smEx.getCause();
+ if (cause instanceof OMException) {
+ OMException omEx = (OMException) cause;
+ // Do not failover if the operation was blocked because the OM was
+ // prepared.
+ return omEx.getResult() !=
+ OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED;
}
}
-
- if (omProxies.isEmpty()) {
- throw new IllegalArgumentException("Could not find any configured " +
- "addresses for OM. Please configure the system with "
- + OZONE_OM_ADDRESS_KEY);
- }
+ return true;
}
@VisibleForTesting
@@ -176,58 +145,6 @@ public class OMFailoverProxyProvider<T> implements
return currentProxyOMNodeId;
}
- private T createOMProxy(InetSocketAddress omAddress) throws IOException {
- Configuration hadoopConf =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
- RPC.setProtocolEngine(hadoopConf, protocolClass, ProtobufRpcEngine.class);
-
- // FailoverOnNetworkException ensures that the IPC layer does not attempt
- // retries on the same OM in case of connection exception. This retry
- // policy essentially results in TRY_ONCE_THEN_FAIL.
- RetryPolicy connectionRetryPolicy = RetryPolicies
- .failoverOnNetworkException(0);
-
- return (T) RPC.getProtocolProxy(protocolClass, omVersion,
- omAddress, ugi, hadoopConf, NetUtils.getDefaultSocketFactory(
- hadoopConf), (int) OmUtils.getOMClientRpcTimeOut(conf),
- connectionRetryPolicy).getProxy();
-
- }
-
- /**
- * Get the proxy object which should be used until the next failover event
- * occurs. RPC proxy object is intialized lazily.
- * @return the OM proxy object to invoke methods upon
- */
- @Override
- public synchronized ProxyInfo<T> getProxy() {
- ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId);
- if (currentProxyInfo == null) {
- currentProxyInfo = createOMProxy(currentProxyOMNodeId);
- }
- return currentProxyInfo;
- }
-
- /**
- * Creates proxy object.
- */
- protected ProxyInfo createOMProxy(String nodeId) {
- OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
- InetSocketAddress address = omProxyInfo.getAddress();
- ProxyInfo proxyInfo;
- try {
- T proxy = createOMProxy(address);
- // Create proxyInfo here, to make it work with all Hadoop versions.
- proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
- omProxies.put(nodeId, proxyInfo);
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to OM at {}",
- this.getClass().getSimpleName(), address, ioe);
- throw new RuntimeException(ioe);
- }
- return proxyInfo;
- }
-
@VisibleForTesting
public RetryPolicy getRetryPolicy(int maxFailovers) {
// Client will attempt upto maxFailovers number of failovers between
@@ -292,7 +209,7 @@ public class OMFailoverProxyProvider<T> implements
return new RetryAction(fallbackAction, getWaitTime());
} else {
LOG.error("Failed to connect to OMs: {}. Attempted {} failovers.",
- getOMProxyInfos(), maxFailovers);
+ omNodeIDList, maxFailovers);
return RetryAction.FAIL;
}
}
@@ -301,37 +218,8 @@ public class OMFailoverProxyProvider<T> implements
return retryPolicy;
}
- public Text getCurrentProxyDelegationToken() {
- return delegationTokenService;
- }
-
- protected Text computeDelegationTokenService() {
- // For HA, this will return "," separated address of all OM's.
- List<String> addresses = new ArrayList<>();
-
- for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
- omProxyInfos.entrySet()) {
- Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
-
- // During client object creation when one of the OM configured address
- // in unreachable, dtService can be null.
- if (dtService != null) {
- addresses.add(dtService.toString());
- }
- }
-
- if (!addresses.isEmpty()) {
- Collections.sort(addresses);
- return new Text(String.join(",", addresses));
- } else {
- // If all OM addresses are unresolvable, set dt service to null. Let
- // this fail in later step when during connection setup.
- return null;
- }
- }
-
@Override
- public Class<T> getInterface() {
+ public final Class<T> getInterface() {
return protocolClass;
}
@@ -441,7 +329,7 @@ public class OMFailoverProxyProvider<T> implements
// OMs are being contacted in Round Robin way. Check if all the OMs have
// been contacted in this attempt.
- for (String omNodeID : omProxyInfos.keySet()) {
+ for (String omNodeID : omProxies.keySet()) {
if (!attemptedOMs.contains(omNodeID)) {
return 0;
}
@@ -453,65 +341,16 @@ public class OMFailoverProxyProvider<T> implements
return waitBetweenRetries;
}
- private synchronized boolean shouldFailover(Exception ex) {
- Throwable unwrappedException = HddsUtils.getUnwrappedException(ex);
- if (unwrappedException instanceof AccessControlException ||
- unwrappedException instanceof SecretManager.InvalidToken) {
- // Retry all available OMs once before failing with
- // AccessControlException.
- if (accessControlExceptionOMs.contains(nextProxyOMNodeId)) {
- accessControlExceptionOMs.clear();
- return false;
- } else {
- accessControlExceptionOMs.add(nextProxyOMNodeId);
- if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
- return false;
- }
- }
- } else if (HddsUtils.shouldNotFailoverOnRpcException(unwrappedException)) {
- return false;
- } else if (ex instanceof StateMachineException) {
- StateMachineException smEx = (StateMachineException) ex;
- Throwable cause = smEx.getCause();
- if (cause instanceof OMException) {
- OMException omEx = (OMException) cause;
- // Do not failover if the operation was blocked because the OM was
- // prepared.
- return omEx.getResult() !=
- OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED;
- }
- }
- return true;
- }
-
- /**
- * Close all the proxy objects which have been opened over the lifetime of
- * the proxy provider.
- */
- @Override
- public synchronized void close() throws IOException {
- for (ProxyInfo<T> proxyInfo : omProxies.values()) {
- if (proxyInfo != null) {
- RPC.stopProxy(proxyInfo.proxy);
- }
- }
- }
- @VisibleForTesting
public List<ProxyInfo> getOMProxies() {
return new ArrayList<ProxyInfo>(omProxies.values());
}
- @VisibleForTesting
+
public Map<String, ProxyInfo<T>> getOMProxyMap() {
return omProxies;
}
- @VisibleForTesting
- public List<OMProxyInfo> getOMProxyInfos() {
- return new ArrayList<OMProxyInfo>(omProxyInfos.values());
- }
-
/**
* Check if exception is OMLeaderNotReadyException.
*
@@ -549,18 +388,21 @@ public class OMFailoverProxyProvider<T> implements
return null;
}
- protected void setProxies(
- Map<String, ProxyInfo<T>> setOMProxies,
- Map<String, OMProxyInfo> setOMProxyInfos,
- List<String> setOMNodeIDList) {
- this.omProxies = setOMProxies;
- this.omProxyInfos = setOMProxyInfos;
- this.omNodeIDList = setOMNodeIDList;
+ protected ConfigurationSource getConf() {
+ return conf;
+ }
+
+ protected synchronized void setOmProxies(Map<String,
+ ProxyInfo<T>> omProxies) {
+ this.omProxies = omProxies;
+ }
+
+ protected synchronized void setOmNodeIDList(List<String> omNodeIDList) {
+ this.omNodeIDList = omNodeIDList;
}
- protected List<String> getOmNodeIDList() {
+ protected synchronized List<String> getOmNodeIDList() {
return omNodeIDList;
}
}
-
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index d32926a88f..b899efcd9d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -23,6 +23,7 @@ import java.security.cert.X509Certificate;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.HashMap;
import java.util.Map;
@@ -83,6 +84,7 @@ public class GrpcOmTransport implements OmTransport {
private ConfigurationSource conf;
private AtomicReference<String> host;
+ private AtomicInteger syncFailoverCount;
private final int maxSize;
private SecurityConfig secConfig;
@@ -104,6 +106,9 @@ public class GrpcOmTransport implements OmTransport {
this.clients = new HashMap<>();
this.conf = conf;
this.host = new AtomicReference();
+ this.failoverCount = 0;
+ this.syncFailoverCount = new AtomicInteger();
+
secConfig = new SecurityConfig(conf);
maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
@@ -111,7 +116,6 @@ public class GrpcOmTransport implements OmTransport {
omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
conf,
- ugi,
omServiceId,
OzoneManagerProtocolPB.class);
@@ -177,9 +181,11 @@ public class GrpcOmTransport implements OmTransport {
public OMResponse submitRequest(OMRequest payload) throws IOException {
OMResponse resp = null;
boolean tryOtherHost = true;
+ int expectedFailoverCount = 0;
ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
while (tryOtherHost) {
tryOtherHost = false;
+ expectedFailoverCount = syncFailoverCount.get();
try {
resp = clients.get(host.get()).submitRequest(payload);
} catch (StatusRuntimeException e) {
@@ -187,7 +193,8 @@ public class GrpcOmTransport implements OmTransport {
resultCode = ResultCodes.TIMEOUT;
}
Exception exp = new Exception(e);
- tryOtherHost = shouldRetry(unwrapException(exp));
+ tryOtherHost = shouldRetry(unwrapException(exp),
+ expectedFailoverCount);
if (!tryOtherHost) {
throw new OMException(resultCode);
}
@@ -223,6 +230,9 @@ public class GrpcOmTransport implements OmTransport {
} catch (Exception e) {
LOG.error("cannot get cause for remote exception");
}
+ } else if ((status.getCode() == Status.Code.RESOURCE_EXHAUSTED) ||
+ (status.getCode() == Status.Code.DATA_LOSS)) {
+ grpcException = srexp;
} else {
// exception generated by connection failure, gRPC
grpcException = ex;
@@ -234,7 +244,7 @@ public class GrpcOmTransport implements OmTransport {
return grpcException;
}
- private boolean shouldRetry(Exception ex) {
+ private boolean shouldRetry(Exception ex, int expectedFailoverCount) {
boolean retry = false;
RetryPolicy.RetryAction action = null;
try {
@@ -242,7 +252,8 @@ public class GrpcOmTransport implements OmTransport {
LOG.debug("grpc failover retry action {}", action.action);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
retry = false;
- LOG.error("Retry request failed. " + action.reason, ex);
+ LOG.error("Retry request failed. Action : {}, {}",
+ action.action, ex.toString());
} else {
if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
(action.action == RetryPolicy.RetryAction.RetryDecision
@@ -255,7 +266,13 @@ public class GrpcOmTransport implements OmTransport {
}
}
// switch om host to current proxy OMNodeId
- omFailoverProxyProvider.performFailover(null);
+ if (syncFailoverCount.get() == expectedFailoverCount) {
+ omFailoverProxyProvider.performFailover(null);
+ syncFailoverCount.getAndIncrement();
+ } else {
+ LOG.warn("A failover has occurred since the start of current" +
+ " thread retry, NOT failover using current proxy");
+ }
host.set(omFailoverProxyProvider
.getGrpcProxyAddress(
omFailoverProxyProvider.getCurrentProxyOMNodeId()));
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 9a6040c8ea..297d5d6322 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.security.UserGroupInformation;
@@ -52,7 +52,7 @@ public class Hadoop3OmTransport implements OmTransport {
private static final Logger LOG =
LoggerFactory.getLogger(Hadoop3OmTransport.class);
- private final OMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
@@ -63,8 +63,8 @@ public class Hadoop3OmTransport implements OmTransport {
OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
- omServiceId, OzoneManagerProtocolPB.class);
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -90,7 +90,7 @@ public class Hadoop3OmTransport implements OmTransport {
return omResponse;
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
- OMFailoverProxyProvider.getNotLeaderException(e);
+ HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
if (notLeaderException == null) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -105,11 +105,13 @@ public class Hadoop3OmTransport implements OmTransport {
/**
* Creates a {@link RetryProxy} encapsulating the
- * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
- * exception or if the current proxy is not the leader OM.
+ * {@link HadoopRpcOMFailoverProxyProvider}. The retry proxy
+ * fails over on network exception or if the current proxy
+ * is not the leader OM.
*/
private OzoneManagerProtocolPB createRetryProxy(
- OMFailoverProxyProvider failoverProxyProvider, int maxFailovers) {
+ HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
+ int maxFailovers) {
OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider,
@@ -118,7 +120,7 @@ public class Hadoop3OmTransport implements OmTransport {
}
@VisibleForTesting
- public OMFailoverProxyProvider getOmFailoverProxyProvider() {
+ public HadoopRpcOMFailoverProxyProvider getOmFailoverProxyProvider() {
return omFailoverProxyProvider;
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
index 05e73d4b8f..b207657b1f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.ozone.om.protocol.OMConfiguration;
import org.apache.hadoop.ozone.om.protocol.OMAdminProtocol;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.DecommissionOMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.DecommissionOMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMConfigurationRequest;
@@ -128,8 +128,8 @@ public final class OMAdminProtocolClientSideImpl implements OMAdminProtocol {
RPC.setProtocolEngine(OzoneConfiguration.of(conf),
OMAdminProtocolPB.class, ProtobufRpcEngine.class);
- OMFailoverProxyProvider omFailoverProxyProvider =
- new OMFailoverProxyProvider(conf, ugi, omServiceId,
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ new HadoopRpcOMFailoverProxyProvider(conf, ugi, omServiceId,
OMAdminProtocolPB.class);
// Multiple the max number of retries with number of OMs to calculate the
@@ -192,13 +192,13 @@ public final class OMAdminProtocolClientSideImpl implements OMAdminProtocol {
response = rpcProxy.decommission(NULL_RPC_CONTROLLER, decommOMRequest);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
- OMFailoverProxyProvider.getNotLeaderException(e);
+ HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
if (notLeaderException != null) {
throwException(notLeaderException.getMessage());
}
OMLeaderNotReadyException leaderNotReadyException =
- OMFailoverProxyProvider.getLeaderNotReadyException(e);
+ HadoopRpcOMFailoverProxyProvider.getLeaderNotReadyException(e);
if (leaderNotReadyException != null) {
throwException(leaderNotReadyException.getMessage());
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
index a81a19bfd6..66b5f63bc9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.BootstrapOMRequest;
@@ -53,7 +53,7 @@ public class OMInterServiceProtocolClientSideImpl implements
private static final Logger LOG =
LoggerFactory.getLogger(OMInterServiceProtocolClientSideImpl.class);
- private final OMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
private final OMInterServiceProtocolPB rpcProxy;
@@ -63,8 +63,8 @@ public class OMInterServiceProtocolClientSideImpl implements
RPC.setProtocolEngine(OzoneConfiguration.of(conf),
OMInterServiceProtocolPB.class, ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
- omServiceId, OMInterServiceProtocolPB.class);
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ conf, ugi, omServiceId, OMInterServiceProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -88,14 +88,14 @@ public class OMInterServiceProtocolClientSideImpl implements
response = rpcProxy.bootstrap(NULL_RPC_CONTROLLER, bootstrapOMRequest);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
- OMFailoverProxyProvider.getNotLeaderException(e);
+ HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
if (notLeaderException != null) {
throwException(ErrorCode.LEADER_UNDETERMINED,
notLeaderException.getMessage());
}
OMLeaderNotReadyException leaderNotReadyException =
- OMFailoverProxyProvider.getLeaderNotReadyException(e);
+ HadoopRpcOMFailoverProxyProvider.getLeaderNotReadyException(e);
if (leaderNotReadyException != null) {
throwException(ErrorCode.LEADER_NOT_READY,
leaderNotReadyException.getMessage());
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index 78505f7948..d1a3f5c51d 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -48,7 +48,7 @@ public class TestOMFailoverProxyProvider {
private static final String OM_SERVICE_ID = "om-service-test1";
private static final String NODE_ID_BASE_STR = "omNode-";
private static final String DUMMY_NODE_ADDR = "0.0.0.0:8080";
- private OMFailoverProxyProvider provider;
+ private HadoopRpcOMFailoverProxyProvider provider;
private long waitBetweenRetries;
private int numNodes = 3;
private OzoneConfiguration config;
@@ -68,7 +68,7 @@ public class TestOMFailoverProxyProvider {
}
config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
allNodeIds.toString());
- provider = new OMFailoverProxyProvider(config,
+ provider = new HadoopRpcOMFailoverProxyProvider(config,
UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
OzoneManagerProtocolPB.class);
}
@@ -187,9 +187,11 @@ public class TestOMFailoverProxyProvider {
}
ozoneConf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
allNodeIds.toString());
- OMFailoverProxyProvider prov = new OMFailoverProxyProvider(ozoneConf,
- UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
- OzoneManagerProtocolPB.class);
+ HadoopRpcOMFailoverProxyProvider prov =
+ new HadoopRpcOMFailoverProxyProvider(ozoneConf,
+ UserGroupInformation.getCurrentUser(),
+ OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class);
Text dtService = prov.getCurrentProxyDelegationToken();
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index 1ca5b6470b..421100ffaa 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om.protocolPB;
import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.mock;
@@ -48,6 +49,8 @@ import com.google.protobuf.ServiceException;
import org.apache.ratis.protocol.RaftPeerId;
import static org.junit.Assert.fail;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
/**
* Tests for GrpcOmTransport client.
@@ -77,6 +80,8 @@ public class TestS3GrpcOmTransport {
private UserGroupInformation ugi;
private ManagedChannel channel;
+ private String serverName;
+
private ServiceException createNotLeaderException() {
RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
@@ -126,7 +131,7 @@ public class TestS3GrpcOmTransport {
@Before
public void setUp() throws Exception {
// Generate a unique in-process server name.
- String serverName = InProcessServerBuilder.generateName();
+ serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start,
// and register for automatic graceful shutdown.
@@ -219,4 +224,39 @@ public class TestS3GrpcOmTransport {
Assert.assertTrue(true);
}
}
+
+ @Test
+ public void testGrpcFailoverExceedMaxMesgLen() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
+ conf.setInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH, 1);
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ int maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+ OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+ channel = grpcCleanup.register(
+ InProcessChannelBuilder
+ .forName(serverName)
+ .maxInboundMetadataSize(maxSize)
+ .directExecutor().build());
+ client.startClient(channel);
+
+ doFailover = true;
+ // GrpcOMFailoverProvider returns Fail retry due to mesg response
+ // len > 0, causing RESOURCE_EXHAUSTED exception.
+ // This exception should cause failover to NOT retry,
+ // rather to fail.
+ try {
+ final OMResponse resp = client.submitRequest(omRequest);
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 280eb3573e..49622b3c57 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -93,7 +93,7 @@ import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -265,7 +265,7 @@ public abstract class TestOzoneRpcClientAbstract {
@Test
public void testOMClientProxyProvider() {
- OMFailoverProxyProvider omFailoverProxyProvider =
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
List<OMProxyInfo> omProxies = omFailoverProxyProvider.getOMProxyInfos();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
index 15cb4253d6..a2f8373e6d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.om;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -33,7 +33,7 @@ public final class OmFailoverProxyUtil {
/**
* Get FailoverProxyProvider from RpcClient / ClientProtocol.
*/
- public static OMFailoverProxyProvider getFailoverProxyProvider(
+ public static HadoopRpcOMFailoverProxyProvider getFailoverProxyProvider(
ClientProtocol clientProtocol) {
OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 66e1505b59..3095633eb9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
@@ -250,11 +250,11 @@ public abstract class TestOzoneManagerHA {
*/
protected void stopLeaderOM() {
//Stop the leader OM.
- OMFailoverProxyProvider omFailoverProxyProvider =
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil.getFailoverProxyProvider(
(RpcClient) objectStore.getClientProxy());
- // The OMFailoverProxyProvider will point to the current leader OM node.
+ // The omFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Stop one of the ozone manager, to see when the OM leader changes
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
index e1874c36fb..203abdce5e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
@@ -150,14 +150,14 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
}
/**
- * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
- * cluster.
+ * Test that HadoopRpcOMFailoverProxyProvider creates an OM proxy
+ * for each OM in the cluster.
*/
@Test
public void testOMProxyProviderInitialization() throws Exception {
OzoneClient rpcClient = getCluster().getRpcClient();
- OMFailoverProxyProvider omFailoverProxyProvider =
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil.getFailoverProxyProvider(
rpcClient.getObjectStore().getClientProxy());
@@ -183,13 +183,14 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
}
/**
- * Test OMFailoverProxyProvider failover on connection exception to OM client.
+ * Test HadoopRpcOMFailoverProxyProvider failover on connection exception
+ * to OM client.
*/
@Test
public void testOMProxyProviderFailoverOnConnectionFailure()
throws Exception {
ObjectStore objectStore = getObjectStore();
- OMFailoverProxyProvider omFailoverProxyProvider =
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil
.getFailoverProxyProvider(objectStore.getClientProxy());
String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
@@ -215,20 +216,21 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
}
/**
- * Test OMFailoverProxyProvider failover when current OM proxy is not
+ * Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is not
* the current OM Leader.
*/
@Test
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
ObjectStore objectStore = getObjectStore();
- OMFailoverProxyProvider omFailoverProxyProvider = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy());
// Run couple of createVolume tests to discover the current Leader OM
createVolumeTest(true);
createVolumeTest(true);
- // The OMFailoverProxyProvider will point to the current leader OM node.
+ // The oMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Perform a manual failover of the proxy provider to move the
@@ -288,21 +290,22 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
ObjectStore objectStore = getObjectStore();
objectStore.createVolume(volumeName);
- OMFailoverProxyProvider omFailoverProxyProvider = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy());
String currentLeaderNodeId = omFailoverProxyProvider
.getCurrentProxyOMNodeId();
// A read request from any proxy should failover to the current leader OM
for (int i = 0; i < getNumOfOMs(); i++) {
- // Failover OMFailoverProxyProvider to OM at index i
+ // Failover omFailoverProxyProvider to OM at index i
OzoneManager ozoneManager = getCluster().getOzoneManager(i);
// Get the ObjectStore and FailoverProxyProvider for OM at index i
final ObjectStore store = OzoneClientFactory.getRpcClient(
getOmServiceId(), getConf()).getObjectStore();
- final OMFailoverProxyProvider proxyProvider =
+ final HadoopRpcOMFailoverProxyProvider proxyProvider =
OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
// Failover to the OM node that the objectStore points to
@@ -371,8 +374,9 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
objectStore.createVolume(UUID.randomUUID().toString());
- OMFailoverProxyProvider omFailoverProxyProvider = OmFailoverProxyUtil
- .getFailoverProxyProvider(objectStore.getClientProxy());
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+ OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy());
String currentLeaderNodeId = omFailoverProxyProvider
.getCurrentProxyOMNodeId();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
index b6fc179dab..fbfe6d36e3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.ozone.test.GenericTestUtils;
@@ -257,11 +257,11 @@ public class TestOzoneManagerHAWithData extends TestOzoneManagerHA {
// Stop leader OM, to see when the OM leader changes
// multipart upload is happening successfully or not.
- OMFailoverProxyProvider omFailoverProxyProvider =
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil
.getFailoverProxyProvider(getObjectStore().getClientProxy());
- // The OMFailoverProxyProvider will point to the current leader OM node.
+ // The omFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Stop one of the ozone manager, to see when the OM leader changes
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
index 39f5f4ead1..3aee013ebb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.om;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
@@ -44,11 +44,11 @@ public class TestOzoneManagerHAWithFailover extends TestOzoneManagerHA {
long waitBetweenRetries = getConf().getLong(
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
- OMFailoverProxyProvider omFailoverProxyProvider =
+ HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil
.getFailoverProxyProvider(getObjectStore().getClientProxy());
- // The OMFailoverProxyProvider will point to the current leader OM node.
+ // The omFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
getCluster().stopOzoneManager(leaderOMNodeId);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index fe7f6f49ea..79ea1ec170 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -54,9 +55,9 @@ public class TestOMFailovers {
testException = new AccessControlException();
- GenericTestUtils.setLogLevel(OMFailoverProxyProvider.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(OMFailoverProxyProviderBase.LOG, Level.DEBUG);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(OMFailoverProxyProvider.LOG);
+ .captureLogs(OMFailoverProxyProviderBase.LOG);
MockFailoverProxyProvider failoverProxyProvider =
new MockFailoverProxyProvider(conf);
@@ -114,7 +115,7 @@ public class TestOMFailovers {
}
private final class MockFailoverProxyProvider
- extends OMFailoverProxyProvider {
+ extends HadoopRpcOMFailoverProxyProvider {
private MockFailoverProxyProvider(ConfigurationSource configuration)
throws IOException {
@@ -143,7 +144,7 @@ public class TestOMFailovers {
omProxyInfos.put(nodeId, null);
omNodeIDList.add(nodeId);
}
- setProxies(omProxies, omProxyInfos, omNodeIDList);
+ setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
}
@Override
diff --git a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index a3b678efa3..408b282c64 100644
--- a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -47,7 +47,7 @@ public class Hadoop27RpcTransport implements OmTransport {
private final OzoneManagerProtocolPB rpcProxy;
- private final OMFailoverProxyProvider omFailoverProxyProvider;
+ private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
public Hadoop27RpcTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -56,8 +56,8 @@ public class Hadoop27RpcTransport implements OmTransport {
OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
- omServiceId, OzoneManagerProtocolPB.class);
+ this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+ conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -84,7 +84,7 @@ public class Hadoop27RpcTransport implements OmTransport {
return omResponse;
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
- OMFailoverProxyProvider.getNotLeaderException(e);
+ HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
if (notLeaderException == null) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -99,11 +99,12 @@ public class Hadoop27RpcTransport implements OmTransport {
/**
* Creates a {@link RetryProxy} encapsulating the
- * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
- * exception or if the current proxy is not the leader OM.
+ * {@link HadoopRpcOMFailoverProxyProvider}. The retry proxy fails over on
+ * network exception or if the current proxy is not the leader OM.
*/
private OzoneManagerProtocolPB createRetryProxy(
- OMFailoverProxyProvider failoverProxyProvider, int maxFailovers) {
+ HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
+ int maxFailovers) {
OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider,
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
index 304a2bfb86..4082dc82f8 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
@@ -19,11 +19,17 @@
package org.apache.hadoop.ozone.protocolPB;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransportFactory;
import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransportFactory;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -39,34 +45,44 @@ public class TestGrpcOmTransport {
private static final Logger LOG =
LoggerFactory.getLogger(TestGrpcOmTransport.class);
+ private static OzoneConfiguration conf;
@Rule
- public Timeout timeout = Timeout.seconds(30);
+ public Timeout timeout = Timeout.seconds(3000);
+ @BeforeClass
+ public static void setUp() {
+ conf = new OzoneConfiguration();
+ RPC.setProtocolEngine(OzoneConfiguration.of(conf),
+ OzoneManagerProtocolPB.class,
+ ProtobufRpcEngine.class);
+ }
@Test
public void testGrpcOmTransportFactory() throws Exception {
String omServiceId = "";
- String transportCls = GrpcOmTransport.class.getName();
- OzoneConfiguration conf = new OzoneConfiguration();
+ String transportCls = GrpcOmTransportFactory.class.getName();
conf.set(OZONE_OM_TRANSPORT_CLASS,
transportCls);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
+ omTransport.close();
Assert.assertEquals(GrpcOmTransport.class.getSimpleName(),
omTransport.getClass().getSimpleName());
-
}
@Test
public void testHrpcOmTransportFactory() throws Exception {
String omServiceId = "";
- OzoneConfiguration conf = new OzoneConfiguration();
+ String transportCls = Hadoop3OmTransportFactory.class.getName();
+ conf.set(OZONE_OM_TRANSPORT_CLASS,
+ transportCls);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
// OmTransport should be Hadoop Rpc and
// fail equality GrpcOmTransport equality test
+ omTransport.close();
Assert.assertNotEquals(GrpcOmTransport.class.getSimpleName(),
omTransport.getClass().getSimpleName());
}
@@ -74,8 +90,6 @@ public class TestGrpcOmTransport {
@Test
public void testStartStop() throws Exception {
String omServiceId = "";
- OzoneConfiguration conf = new OzoneConfiguration();
-
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
GrpcOmTransport client = new GrpcOmTransport(conf, ugi, omServiceId);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org