You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2019/03/04 08:00:17 UTC
[hadoop] 02/05: Revert "HDDS-1072. Implement RetryProxy and
FailoverProxy for OM client."
This is an automated email from the ASF dual-hosted git repository.
aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit b18c1c22ea238c4b783031402496164f0351b531
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Fri Mar 1 20:05:12 2019 -0800
Revert "HDDS-1072. Implement RetryProxy and FailoverProxy for OM client."
This reverts commit 8e1225991d8da7d6801fc3753319139873f23bc9.
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 17 --
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 -
.../common/src/main/resources/ozone-default.xml | 43 +---
.../ozone/client/protocol/ClientProtocol.java | 4 +-
.../hadoop/ozone/client/rest/RestClient.java | 4 +-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 18 +-
.../hadoop/ozone/client/rpc/ha/OMProxyInfo.java} | 32 ++-
.../ozone/client/rpc/ha/OMProxyProvider.java | 177 ++++++++++++++
.../hadoop/ozone/client/rpc}/ha/package-info.java | 2 +-
.../ozone/om/ha/OMFailoverProxyProvider.java | 266 ---------------------
.../ozone/om/protocol/OzoneManagerProtocol.java | 7 -
...OzoneManagerProtocolClientSideTranslatorPB.java | 105 +-------
.../src/main/proto/OzoneManagerProtocol.proto | 2 -
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 38 ++-
.../client/rpc/TestOzoneRpcClientAbstract.java | 8 +-
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 184 +++-----------
.../org/apache/hadoop/ozone/om/OzoneManager.java | 6 -
.../hadoop/ozone/om/ratis/OMRatisHelper.java | 9 +-
.../ozone/om/ratis/OzoneManagerRatisClient.java | 27 +--
...OzoneManagerProtocolServerSideTranslatorPB.java | 3 +-
.../om/ratis/TestOzoneManagerRatisServer.java | 23 ++
21 files changed, 322 insertions(+), 656 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0d73905..cd40f7c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -379,23 +379,6 @@ public final class OzoneConfigKeys {
public static final String OZONE_FS_ISOLATED_CLASSLOADER =
"ozone.fs.isolated-classloader";
- // Ozone Client Retry and Failover configurations
- public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY =
- "ozone.client.retry.max.attempts";
- public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT =
- 10;
- public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
- "ozone.client.failover.max.attempts";
- public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT =
- 15;
- public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY =
- "ozone.client.failover.sleep.base.millis";
- public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT =
- 500;
- public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
- "ozone.client.failover.sleep.max.millis";
- public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT =
- 15000;
public static final String OZONE_FREON_HTTP_ENABLED_KEY =
"ozone.freon.http.enabled";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 8e3b02a..45b46b8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -276,7 +276,4 @@ public final class OzoneConsts {
// Default OMServiceID for OM Ratis servers to use as RaftGroupId
public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
-
- // Dummy OMNodeID for OM Clients to use for a non-HA OM setup
- public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f7fecb7..8469fdc 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2029,45 +2029,4 @@
</description>
</property>
- <property>
- <name>ozone.client.retry.max.attempts</name>
- <value>10</value>
- <description>
- Max retry attempts for Ozone RpcClient talking to OzoneManagers.
- </description>
- </property>
- <property>
- <name>ozone.client.failover.max.attempts</name>
- <value>15</value>
- <description>
- Expert only. The number of client failover attempts that should be
- made before the failover is considered failed.
- </description>
- </property>
- <property>
- <name>ozone.client.failover.sleep.base.millis</name>
- <value>500</value>
- <description>
- Expert only. The time to wait, in milliseconds, between failover
- attempts increases exponentially as a function of the number of
- attempts made so far, with a random factor of +/- 50%. This option
- specifies the base value used in the failover calculation. The
- first failover will retry immediately. The 2nd failover attempt
- will delay at least ozone.client.failover.sleep.base.millis
- milliseconds. And so on.
- </description>
- </property>
- <property>
- <name>ozone.client.failover.sleep.max.millis</name>
- <value>15000</value>
- <description>
- Expert only. The time to wait, in milliseconds, between failover
- attempts increases exponentially as a function of the number of
- attempts made so far, with a random factor of +/- 50%. This option
- specifies the maximum value to wait between failovers.
- Specifically, the time between two failover attempts will not
- exceed +/- 50% of ozone.client.failover.sleep.max.millis
- milliseconds.
- </description>
- </property>
-</configuration>
\ No newline at end of file
+</configuration>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 2bf9089..494afae 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -510,5 +510,5 @@ public interface ClientProtocol {
S3SecretValue getS3Secret(String kerberosID) throws IOException;
@VisibleForTesting
- OMFailoverProxyProvider getOMProxyProvider();
+ OMProxyProvider getOMProxyProvider();
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index eea2809..b69d972 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.ozone.client.rest.headers.Header;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -725,7 +725,7 @@ public class RestClient implements ClientProtocol {
}
@Override
- public OMFailoverProxyProvider getOMProxyProvider() {
+ public OMProxyProvider getOMProxyProvider() {
return null;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 4b44770..0875046 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,8 +50,8 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -66,8 +66,6 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.om.protocolPB
- .OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
@@ -87,7 +85,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.util.Strings;
-import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,6 +107,7 @@ public class RpcClient implements ClientProtocol {
private final OzoneConfiguration conf;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
+ private final OMProxyProvider omProxyProvider;
private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
@@ -123,7 +121,6 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferMaxSize;
private final long blockSize;
private final long watchTimeout;
- private final ClientId clientId = ClientId.randomId();
/**
* Creates RpcClient instance with the given configuration.
@@ -140,8 +137,11 @@ public class RpcClient implements ClientProtocol {
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB(
- this.conf, clientId.toString(), ugi);
+ this.omProxyProvider = new OMProxyProvider(conf, ugi);
+ this.ozoneManagerClient =
+ TracingUtil.createProxy(
+ this.omProxyProvider.getProxy(),
+ OzoneManagerProtocol.class);
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@@ -492,8 +492,8 @@ public class RpcClient implements ClientProtocol {
@Override
@VisibleForTesting
- public OMFailoverProxyProvider getOMProxyProvider() {
- return ozoneManagerClient.getOMFailoverProxyProvider();
+ public OMProxyProvider getOMProxyProvider() {
+ return omProxyProvider;
}
@Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
similarity index 53%
copy from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
copy to hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
index a95f09f..01e5562 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
@@ -16,8 +16,34 @@
* limitations under the License.
*/
-package org.apache.hadoop.ozone.om.ha;
+package org.apache.hadoop.ozone.client.rpc.ha;
+
+import org.apache.hadoop.ozone.om.protocolPB
+ .OzoneManagerProtocolClientSideTranslatorPB;
+
+import java.net.InetSocketAddress;
/**
- * This package contains Ozone Client's OM Proxy classes.
- */
\ No newline at end of file
+ * Proxy information of OM.
+ */
+public final class OMProxyInfo {
+ private InetSocketAddress address;
+ private OzoneManagerProtocolClientSideTranslatorPB omClient;
+
+ public OMProxyInfo(InetSocketAddress addr) {
+ this.address = addr;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
+ return omClient;
+ }
+
+ public void setOMProxy(
+ OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
+ this.omClient = clientProxy;
+ }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
new file mode 100644
index 0000000..574cb5f
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.protocolPB
+ .OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+
+/**
+ * A failover proxy provider implementation which allows clients to configure
+ * multiple OMs to connect to. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class OMProxyProvider implements Closeable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(OMProxyProvider.class);
+
+ private List<OMProxyInfo> omProxies;
+
+ private int currentProxyIndex = 0;
+
+ private final Configuration conf;
+ private final long omVersion;
+ private final UserGroupInformation ugi;
+ private ClientId clientId = ClientId.randomId();
+
+ public OMProxyProvider(Configuration configuration,
+ UserGroupInformation ugi) {
+ this.conf = configuration;
+ this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+ this.ugi = ugi;
+ loadOMClientConfigs(conf);
+ }
+
+ private void loadOMClientConfigs(Configuration config) {
+ this.omProxies = new ArrayList<>();
+
+ Collection<String> omServiceIds = config.getTrimmedStringCollection(
+ OZONE_OM_SERVICE_IDS_KEY);
+
+ if (omServiceIds.size() > 1) {
+ throw new IllegalArgumentException("Multi-OM Services is not supported." +
+ " Please configure only one OM Service ID in " +
+ OZONE_OM_SERVICE_IDS_KEY);
+ }
+
+ for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
+ Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
+
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+ String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ serviceId, nodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+ if (rpcAddrStr == null) {
+ continue;
+ }
+
+ InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
+
+ // Add the OM client proxy info to list of proxies
+ if (addr != null) {
+ OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
+ omProxies.add(omProxyInfo);
+ } else {
+ LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
+ }
+ }
+ }
+
+ if (omProxies.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for OM. Please configure the system with "
+ + OZONE_OM_ADDRESS_KEY);
+ }
+ }
+
+ private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
+ InetSocketAddress omAddress) throws IOException {
+ return new OzoneManagerProtocolClientSideTranslatorPB(
+ RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
+ conf, NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf)), clientId.toString());
+ }
+
+ /**
+ * Get the proxy object which should be used until the next failover event
+ * occurs. RPC proxy object is intialized lazily.
+ * @return the OM proxy object to invoke methods upon
+ */
+ public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
+ OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
+ return createOMClientIfNeeded(currentOMProxyInfo);
+ }
+
+ private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
+ OMProxyInfo proxyInfo) {
+ if (proxyInfo.getOMProxy() == null) {
+ try {
+ proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to OM at {}",
+ this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ return proxyInfo.getOMProxy();
+ }
+
+ /**
+ * Called whenever an error warrants failing over. It is determined by the
+ * retry policy.
+ */
+ public void performFailover() {
+ incrementProxyIndex();
+ }
+
+ synchronized void incrementProxyIndex() {
+ currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
+ }
+
+ /**
+ * Close all the proxy objects which have been opened over the lifetime of
+ * the proxy provider.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ for (OMProxyInfo proxy : omProxies) {
+ OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
+ if (omProxy != null) {
+ RPC.stopProxy(omProxy);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public List<OMProxyInfo> getOMProxies() {
+ return omProxies;
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
similarity index 94%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
rename to hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
index a95f09f..df0e69c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.ozone.om.ha;
+package org.apache.hadoop.ozone.client.rpc.ha;
/**
* This package contains Ozone Client's OM Proxy classes.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
deleted file mode 100644
index f5fdf6f..0000000
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.ha;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
-
-/**
- * A failover proxy provider implementation which allows clients to configure
- * multiple OMs to connect to. In case of OM failover, client can try
- * connecting to another OM node from the list of proxies.
- */
-public class OMFailoverProxyProvider implements
- FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
-
- public static final Logger LOG =
- LoggerFactory.getLogger(OMFailoverProxyProvider.class);
-
- // Map of OMNodeID to its proxy
- private Map<String, OMProxyInfo> omProxies;
- private List<String> omNodeIDList;
-
- private String currentProxyOMNodeId;
- private int currentProxyIndex;
-
- private final Configuration conf;
- private final long omVersion;
- private final UserGroupInformation ugi;
-
- public OMFailoverProxyProvider(OzoneConfiguration configuration,
- UserGroupInformation ugi) throws IOException {
- this.conf = configuration;
- this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
- this.ugi = ugi;
- loadOMClientConfigs(conf);
-
- currentProxyIndex = 0;
- currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
- }
-
- /**
- * Class to store proxy information.
- */
- public final class OMProxyInfo
- extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
- private InetSocketAddress address;
-
- OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
- InetSocketAddress addr) {
- super(proxy, proxyInfoStr);
- this.address = addr;
- }
-
- public InetSocketAddress getAddress() {
- return address;
- }
- }
-
- private void loadOMClientConfigs(Configuration config) throws IOException {
- this.omProxies = new HashMap<>();
- this.omNodeIDList = new ArrayList<>();
-
- Collection<String> omServiceIds = config.getTrimmedStringCollection(
- OZONE_OM_SERVICE_IDS_KEY);
-
- if (omServiceIds.size() > 1) {
- throw new IllegalArgumentException("Multi-OM Services is not supported." +
- " Please configure only one OM Service ID in " +
- OZONE_OM_SERVICE_IDS_KEY);
- }
-
- for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
- Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
-
- for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
- String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
- serviceId, nodeId);
- String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
- if (rpcAddrStr == null) {
- continue;
- }
-
- InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
-
- // Add the OM client proxy info to list of proxies
- if (addr != null) {
- StringBuilder proxyInfo = new StringBuilder()
- .append(nodeId).append("(")
- .append(NetUtils.getHostPortString(addr)).append(")");
- OMProxyInfo omProxyInfo = new OMProxyInfo(null,
- proxyInfo.toString(), addr);
-
- // For a non-HA OM setup, nodeId might be null. If so, we assign it
- // a dummy value
- if (nodeId == null) {
- nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
- }
- omProxies.put(nodeId, omProxyInfo);
- omNodeIDList.add(nodeId);
-
- } else {
- LOG.error("Failed to create OM proxy for {} at address {}",
- nodeId, rpcAddrStr);
- }
- }
- }
-
- if (omProxies.isEmpty()) {
- throw new IllegalArgumentException("Could not find any configured " +
- "addresses for OM. Please configure the system with "
- + OZONE_OM_ADDRESS_KEY);
- }
- }
-
- @VisibleForTesting
- public synchronized String getCurrentProxyOMNodeId() {
- return currentProxyOMNodeId;
- }
-
- private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
- throws IOException {
- return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
- conf, NetUtils.getDefaultSocketFactory(conf),
- Client.getRpcTimeout(conf));
- }
-
- /**
- * Get the proxy object which should be used until the next failover event
- * occurs. RPC proxy object is intialized lazily.
- * @return the OM proxy object to invoke methods upon
- */
- @Override
- public synchronized OMProxyInfo getProxy() {
- OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
- createOMProxyIfNeeded(currentOMProxyInfo);
- return currentOMProxyInfo;
- }
-
- /**
- * Creates OM proxy object if it does not already exist.
- */
- private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
- if (proxyInfo.proxy == null) {
- try {
- proxyInfo.proxy = createOMProxy(proxyInfo.address);
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to OM at {}",
- this.getClass().getSimpleName(), proxyInfo.address, ioe);
- throw new RuntimeException(ioe);
- }
- }
- return proxyInfo;
- }
-
- /**
- * Called whenever an error warrants failing over. It is determined by the
- * retry policy.
- */
- @Override
- public void performFailover(OzoneManagerProtocolPB currentProxy) {
- int newProxyIndex = incrementProxyIndex();
- LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
- newProxyIndex, omNodeIDList.get(newProxyIndex));
- }
-
- /**
- * Update the proxy index to the next proxy in the list.
- * @return the new proxy index
- */
- private synchronized int incrementProxyIndex() {
- currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
- currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
- return currentProxyIndex;
- }
-
- @Override
- public Class<OzoneManagerProtocolPB> getInterface() {
- return OzoneManagerProtocolPB.class;
- }
-
- /**
- * Performs failover if the leaderOMNodeId returned through OMReponse does
- * not match the current leaderOMNodeId cached by the proxy provider.
- */
- public void performFailoverIfRequired(String newLeaderOMNodeId) {
- if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
- LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
- }
- }
-
- /**
- * Failover to the OM proxy specified by the new leader OMNodeId.
- * @param newLeaderOMNodeId OMNodeId to failover to.
- * @return true if failover is successful, false otherwise.
- */
- synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
- if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
- if (omProxies.containsKey(newLeaderOMNodeId)) {
- currentProxyOMNodeId = newLeaderOMNodeId;
- currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
- return true;
- }
- }
- return false;
- }
-
- /**
- * Close all the proxy objects which have been opened over the lifetime of
- * the proxy provider.
- */
- @Override
- public synchronized void close() throws IOException {
- for (OMProxyInfo proxy : omProxies.values()) {
- OzoneManagerProtocolPB omProxy = proxy.proxy;
- if (omProxy != null) {
- RPC.stopProxy(omProxy);
- }
- }
- }
-
- @VisibleForTesting
- public List<OMProxyInfo> getOMProxies() {
- return new ArrayList<>(omProxies.values());
- }
-}
-
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 54f4e82..1573682 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.protocol;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -385,11 +384,5 @@ public interface OzoneManagerProtocol
* @throws IOException
*/
S3SecretValue getS3Secret(String kerberosID) throws IOException;
-
- /**
- * Get the OM Client's Retry and Failover Proxy provider.
- * @return OMFailoverProxyProvider
- */
- OMFailoverProxyProvider getOMFailoverProxyProvider();
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index ff7a1d8..51ce94f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -17,26 +17,18 @@
*/
package org.apache.hadoop.ozone.om.protocolPB;
-import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -111,7 +103,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@@ -121,9 +112,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@@ -144,89 +132,20 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
- private final OMFailoverProxyProvider omFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
private final String clientID;
- private static final Logger FAILOVER_PROXY_PROVIDER_LOG =
- LoggerFactory.getLogger(OMFailoverProxyProvider.class);
-
- public OzoneManagerProtocolClientSideTranslatorPB(
- OzoneManagerProtocolPB proxy, String clientId) {
- this.rpcProxy = proxy;
- this.clientID = clientId;
- this.omFailoverProxyProvider = null;
- }
/**
- * Constructor for OM Protocol Client. This creates a {@link RetryProxy}
- * over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has
- * one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the
- * cluster.
+ * Constructor for KeySpaceManger Client.
+ * @param rpcProxy
*/
- public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
- String clientId, UserGroupInformation ugi) throws IOException {
- this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
-
- int maxRetries = conf.getInt(
- OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
- OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
- int maxFailovers = conf.getInt(
- OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
- OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
- int sleepBase = conf.getInt(
- OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
- OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
- int sleepMax = conf.getInt(
- OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
- OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
-
- this.rpcProxy = createRetryProxy(omFailoverProxyProvider,
- maxRetries, maxFailovers, sleepBase, sleepMax);
+ public OzoneManagerProtocolClientSideTranslatorPB(
+ OzoneManagerProtocolPB rpcProxy, String clientId) {
+ this.rpcProxy = rpcProxy;
this.clientID = clientId;
}
/**
- * Creates a {@link RetryProxy} encapsulating the
- * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
- * exception or if the current proxy is not the leader OM.
- */
- private OzoneManagerProtocolPB createRetryProxy(
- OMFailoverProxyProvider failoverProxyProvider,
- int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
- RetryPolicy retryPolicyOnNetworkException = RetryPolicies
- .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
- maxFailovers, maxRetries, delayMillis, maxDelayBase);
- RetryPolicy retryPolicy = new RetryPolicy() {
- @Override
- public RetryAction shouldRetry(Exception exception, int retries,
- int failovers, boolean isIdempotentOrAtMostOnce)
- throws Exception {
- if (exception instanceof EOFException ||
- exception instanceof ServiceException) {
- if (retries < maxRetries && failovers < maxFailovers) {
- return RetryAction.FAILOVER_AND_RETRY;
- } else {
- FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
- "Attempted {} retries and {} failovers", retries, failovers);
- return RetryAction.FAIL;
- }
- } else {
- return retryPolicyOnNetworkException.shouldRetry(
- exception, retries, failovers, isIdempotentOrAtMostOnce);
- }
- }
- };
- OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
- OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
- return proxy;
- }
-
- @VisibleForTesting
- public OMFailoverProxyProvider getOMFailoverProxyProvider() {
- return omFailoverProxyProvider;
- }
-
- /**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
@@ -277,19 +196,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
OMRequest payload = OMRequest.newBuilder(omRequest)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
-
- OMResponse omResponse =
- rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
-
- if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
- String leaderOmId = omResponse.getLeaderOMNodeId();
-
- // Failover to the OM node returned by OMReponse leaderOMNodeId if
- // current proxy is not pointing to that node.
- omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
- }
-
- return omResponse;
+ return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index b116826..aaf3c85 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -140,8 +140,6 @@ message OMResponse {
required Status status = 5;
- optional string leaderOMNodeId = 6;
-
optional CreateVolumeResponse createVolumeResponse = 11;
optional SetVolumePropertyResponse setVolumePropertyResponse = 12;
optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f84f95e..a1ef1f6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@@ -50,7 +48,6 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
- private Map<String, OzoneManager> ozoneManagerMap;
private List<OzoneManager> ozoneManagers;
private static final Random RANDOM = new Random();
@@ -66,12 +63,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
- Map<String, OzoneManager> omMap,
+ List<OzoneManager> omList,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
super(conf, scm, hddsDatanodes);
- this.ozoneManagerMap = omMap;
- this.ozoneManagers = new ArrayList<>(omMap.values());
+ this.ozoneManagers = omList;
}
/**
@@ -111,10 +107,6 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
ozoneManagers.get(index).stop();
}
- public void stopOzoneManager(String omNodeId) {
- ozoneManagerMap.get(omNodeId).stop();
- }
-
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@@ -136,17 +128,17 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
- Map<String, OzoneManager> omMap;
+ List<OzoneManager> omList;
try {
scm = createSCM();
scm.start();
- omMap = createOMService();
+ omList = createOMService();
} catch (AuthenticationException ex) {
throw new IOException("Unable to build MiniOzoneCluster. ", ex);
}
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
- MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
+ MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
@@ -179,10 +171,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
* @throws IOException
* @throws AuthenticationException
*/
- private Map<String, OzoneManager> createOMService() throws IOException,
+ private List<OzoneManager> createOMService() throws IOException,
AuthenticationException {
- Map<String, OzoneManager> omMap = new HashMap<>();
+ List<OzoneManager> omList = new ArrayList<>(numOfOMs);
int retryCount = 0;
int basePort = 10000;
@@ -194,11 +186,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
for (int i = 1; i<= numOfOMs; i++) {
// Set nodeId
- String nodeId = nodeIdBaseStr + i;
- conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
+ conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
// Set metadata/DB dir base path
- String metaDirPath = path + "/" + nodeId;
+ String metaDirPath = path + "/" + nodeIdBaseStr + i;
conf.set(OZONE_METADATA_DIRS, metaDirPath);
OMStorage omStore = new OMStorage(conf);
initializeOmStorage(omStore);
@@ -210,7 +201,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
OzoneManager om = OzoneManager.createOm(null, conf);
om.setCertClient(certClient);
- omMap.put(nodeId, om);
+ omList.add(om);
om.start();
LOG.info("Started OzoneManager RPC server at " +
@@ -220,24 +211,23 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
// Set default OM address to point to the first OM. Clients would
// try connecting to this address by default
conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
- NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1)
- .getOmRpcServerAddr()));
+ NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
break;
} catch (BindException e) {
- for (OzoneManager om : omMap.values()) {
+ for (OzoneManager om : omList) {
om.stop();
om.join();
LOG.info("Stopping OzoneManager server at " +
om.getOmRpcServerAddr());
}
- omMap.clear();
+ omList.clear();
++retryCount;
LOG.info("MiniOzoneHACluster port conflicts, retried " +
retryCount + " times");
}
}
- return omMap;
+ return omList;
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 0828fe8..32792ae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -74,7 +76,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -188,10 +189,9 @@ public abstract class TestOzoneRpcClientAbstract {
*/
@Test
public void testOMClientProxyProvider() {
- OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
+ OMProxyProvider omProxyProvider = store.getClientProxy()
.getOMProxyProvider();
- List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
- omFailoverProxyProvider.getOMProxies();
+ List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
// For a non-HA OM service, there should be only one OM proxy.
Assert.assertEquals(1, omProxies.size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index da8f870..62cda91 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -18,29 +18,30 @@ package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdfs.LogVerificationAppender;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
-import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
@@ -49,14 +50,6 @@ import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
.NODE_FAILURE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
/**
@@ -65,7 +58,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
public class TestOzoneManagerHA {
private MiniOzoneHAClusterImpl cluster = null;
- private ObjectStore objectStore;
+ private StorageHandler storageHandler;
+ private UserArgs userArgs;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
@@ -75,7 +69,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none();
@Rule
- public Timeout timeout = new Timeout(120_000);
+ public Timeout timeout = new Timeout(60_000);
/**
* Create a MiniDFSCluster for testing.
@@ -91,9 +85,6 @@ public class TestOzoneManagerHA {
scmId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
- conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
- conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
- conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
@@ -102,7 +93,9 @@ public class TestOzoneManagerHA {
.setNumOfOzoneManagers(numOfOMs)
.build();
cluster.waitForClusterToBeReady();
- objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+ storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+ userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+ null, null, null, null);
}
/**
@@ -122,7 +115,7 @@ public class TestOzoneManagerHA {
*/
@Test
public void testAllOMNodesRunning() throws Exception {
- createVolumeTest(true);
+ testCreateVolume(true);
}
/**
@@ -133,56 +126,52 @@ public class TestOzoneManagerHA {
cluster.stopOzoneManager(1);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
- createVolumeTest(true);
+ testCreateVolume(true);
}
/**
* Test client request fails when 2 OMs are down.
*/
@Test
+ @Ignore("TODO:HDDS-1158")
public void testTwoOMNodesDown() throws Exception {
cluster.stopOzoneManager(1);
cluster.stopOzoneManager(2);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
- createVolumeTest(false);
+ testCreateVolume(false);
}
/**
* Create a volume and test its attribute.
*/
- private void createVolumeTest(boolean checkSuccess) throws Exception {
+ private void testCreateVolume(boolean checkSuccess) throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
- VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
- .setOwner(userName)
- .setAdmin(adminName)
- .build();
+ VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ createVolumeArgs.setUserName(userName);
+ createVolumeArgs.setAdminName(adminName);
try {
- objectStore.createVolume(volumeName, createVolumeArgs);
+ storageHandler.createVolume(createVolumeArgs);
- OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+ VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
if (checkSuccess) {
- Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
- Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
- Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+ Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
+ Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
} else {
// Verify that the request failed
+ Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
Assert.fail("There is no quorum. Request should have failed");
}
- } catch (ConnectException | RemoteException e) {
+ } catch (OMException e) {
if (!checkSuccess) {
- // If the last OM to be tried by the RetryProxy is down, we would get
- // ConnectException. Otherwise, we would get a RemoteException from the
- // last running OM as it would fail to get a quorum.
- if (e instanceof RemoteException) {
- GenericTestUtils.assertExceptionContains(
- "RaftRetryFailureException", e);
- }
+ GenericTestUtils.assertExceptionContains(
+ "RaftRetryFailureException", e);
} else {
throw e;
}
@@ -190,16 +179,14 @@ public class TestOzoneManagerHA {
}
/**
- * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
- * cluster.
+ * Test that OMProxyProvider creates an OM proxy for each OM in the cluster.
*/
@Test
- public void testOMProxyProviderInitialization() throws Exception {
+ public void testOMClientProxyProvide() throws Exception {
OzoneClient rpcClient = cluster.getRpcClient();
- OMFailoverProxyProvider omFailoverProxyProvider =
+ OMProxyProvider omProxyProvider =
rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
- List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
- omFailoverProxyProvider.getOMProxies();
+ List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
Assert.assertEquals(numOfOMs, omProxies.size());
@@ -207,7 +194,7 @@ public class TestOzoneManagerHA {
InetSocketAddress omRpcServerAddr =
cluster.getOzoneManager(i).getOmRpcServerAddr();
boolean omClientProxyExists = false;
- for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
+ for (OMProxyInfo omProxyInfo : omProxies) {
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true;
break;
@@ -218,99 +205,4 @@ public class TestOzoneManagerHA {
omClientProxyExists);
}
}
-
- /**
- * Test OMFailoverProxyProvider failover on connection exception to OM client.
- */
- @Test
- public void testOMProxyProviderFailoverOnConnectionFailure()
- throws Exception {
- OMFailoverProxyProvider omFailoverProxyProvider =
- objectStore.getClientProxy().getOMProxyProvider();
- String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- createVolumeTest(true);
-
- // On stopping the current OM Proxy, the next connection attempt should
- // failover to a another OM proxy.
- cluster.stopOzoneManager(firstProxyNodeId);
- Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4);
-
- // Next request to the proxy provider should result in a failover
- createVolumeTest(true);
- Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
-
- // Get the new OM Proxy NodeId
- String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // Verify that a failover occured. the new proxy nodeId should be
- // different from the old proxy nodeId.
- Assert.assertNotEquals("Failover did not occur as expected",
- firstProxyNodeId, newProxyNodeId);
- }
-
- /**
- * Test OMFailoverProxyProvider failover when current OM proxy is not
- * the current OM Leader.
- */
- @Test
- public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
- OMFailoverProxyProvider omFailoverProxyProvider =
- objectStore.getClientProxy().getOMProxyProvider();
-
- // Run couple of createVolume tests to discover the current Leader OM
- createVolumeTest(true);
- createVolumeTest(true);
-
- // The OMFailoverProxyProvider will point to the current leader OM node.
- String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // Perform a manual failover of the proxy provider to move the
- // currentProxyIndex to a node other than the leader OM.
- omFailoverProxyProvider.performFailover(
- omFailoverProxyProvider.getProxy().proxy);
-
- String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
- Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
-
- // Once another request is sent to this new proxy node, the leader
- // information must be returned via the response and a failover must
- // happen to the leader proxy node.
- createVolumeTest(true);
- Thread.sleep(2000);
-
- String newLeaderOMNodeId =
- omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // The old and new Leader OM NodeId must match since there was no new
- // election in the Ratis ring.
- Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId);
- }
-
- @Test
- public void testOMRetryProxy() throws Exception {
- // Stop all the OMs. After making 5 (set maxRetries value) attempts at
- // connection, the RpcClient should give up.
- for (int i = 0; i < numOfOMs; i++) {
- cluster.stopOzoneManager(i);
- }
-
- final LogVerificationAppender appender = new LogVerificationAppender();
- final org.apache.log4j.Logger logger = Logger.getRootLogger();
- logger.addAppender(appender);
-
- try {
- createVolumeTest(true);
- Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
- } catch (ConnectException e) {
- // Each retry attempt tries upto 10 times to connect. So there should be
- // 3*10 "Retrying connect to server" messages
- Assert.assertEquals(30,
- appender.countLinesWithMessage("Retrying connect to server:"));
-
- Assert.assertEquals(1,
- appender.countLinesWithMessage("Failed to connect to OM. Attempted " +
- "3 retries and 3 failovers"));
- }
- }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index cacdca8..ff94935 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -2604,11 +2603,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return peerNodes;
}
- @Override
- public OMFailoverProxyProvider getOMFailoverProxyProvider() {
- return null;
- }
-
@VisibleForTesting
public CertificateClient getCertificateClient() {
return certClient;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
index 8e4582d..9115421 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
@@ -32,7 +32,6 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
@@ -101,12 +100,10 @@ public final class OMRatisHelper {
return Message.valueOf(ByteString.copyFrom(requestBytes));
}
- static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
+ static OMResponse convertByteStringToOMResponse(ByteString byteString)
throws InvalidProtocolBufferException {
- byte[] bytes = reply.getMessage().getContent().toByteArray();
- return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
- .setLeaderOMNodeId(reply.getReplierId())
- .build();
+ byte[] bytes = byteString.toByteArray();
+ return OMResponse.parseFrom(bytes);
}
static OMResponse getErrorResponse(Type cmdType, Exception e) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index 1b4c634..9e1cafc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -23,10 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ServiceException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -56,24 +53,24 @@ public final class OzoneManagerRatisClient implements Closeable {
OzoneManagerRatisClient.class);
private final RaftGroup raftGroup;
- private final String omNodeID;
+ private final String omID;
private final RpcType rpcType;
private RaftClient raftClient;
private final RetryPolicy retryPolicy;
private final Configuration conf;
- private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup,
+ private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
RpcType rpcType, RetryPolicy retryPolicy,
Configuration config) {
this.raftGroup = raftGroup;
- this.omNodeID = omNodeId;
+ this.omID = omId;
this.rpcType = rpcType;
this.retryPolicy = retryPolicy;
this.conf = config;
}
public static OzoneManagerRatisClient newOzoneManagerRatisClient(
- String omNodeId, RaftGroup raftGroup, Configuration conf) {
+ String omId, RaftGroup raftGroup, Configuration conf) {
final String rpcType = conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
@@ -90,19 +87,19 @@ public final class OzoneManagerRatisClient implements Closeable {
final RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
- return new OzoneManagerRatisClient(omNodeId, raftGroup,
+ return new OzoneManagerRatisClient(omId, raftGroup,
SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
}
public void connect() {
LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
- raftGroup.getGroupId().getUuid().toString(), omNodeID);
+ raftGroup.getGroupId().getUuid().toString(), omID);
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
- raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup,
+ raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
retryPolicy, conf);
}
@@ -122,12 +119,13 @@ public final class OzoneManagerRatisClient implements Closeable {
* @param request Request
* @return Response to the command
*/
- public OMResponse sendCommand(OMRequest request) throws ServiceException {
+ public OMResponse sendCommand(OMRequest request) {
try {
CompletableFuture<OMResponse> reply = sendCommandAsync(request);
return reply.get();
} catch (ExecutionException | InterruptedException e) {
- throw new ServiceException(e);
+ LOG.error("Failed to execute command: " + request, e);
+ return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
}
}
@@ -154,10 +152,9 @@ public final class OzoneManagerRatisClient implements Closeable {
if (raftRetryFailureException != null) {
throw new CompletionException(raftRetryFailureException);
}
-
OMResponse response = OMRatisHelper
- .getOMResponseFromRaftClientReply(reply);
-
+ .convertByteStringToOMResponse(reply.getMessage()
+ .getContent());
return response;
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 2f1d64d8..5684fa5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -80,8 +80,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
/**
* Submits request to OM's Ratis server.
*/
- private OMResponse submitRequestToRatis(OMRequest request)
- throws ServiceException {
+ private OMResponse submitRequestToRatis(OMRequest request) {
return omRatisClient.sendCommand(request);
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 8a8be35..83d2245 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
@@ -108,6 +110,27 @@ public class TestOzoneManagerRatisServer {
}
/**
+ * Submit any request to OM Ratis server and check that the dummy response
+ * message is received.
+ */
+ @Test
+ public void testSubmitRatisRequest() throws Exception {
+ // Wait for leader election
+ Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
+ OMRequest request = OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+ .setClientId(clientId)
+ .build();
+
+ OMResponse response = omRatisClient.sendCommand(request);
+
+ Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
+ response.getCmdType());
+ Assert.assertEquals(false, response.getSuccess());
+ Assert.assertEquals(false, response.hasCreateVolumeResponse());
+ }
+
+ /**
* Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
* categorized in {@link OmUtils#isReadOnly(OMRequest)}.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org