You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/08/25 11:40:18 UTC

[GitHub] [hadoop-ozone] GlenGeng commented on a change in pull request #1340: HDDS-3188 Enable SCM group with failover proxy for SCM block location.

GlenGeng commented on a change in pull request #1340:
URL: https://github.com/apache/hadoop-ozone/pull/1340#discussion_r476376037



##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
##########
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+
+/**
+ * Failover proxy provider for SCM.
+ */
+public class SCMBlockLocationFailoverProxyProvider implements
+    FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
+
+  private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
+  private Map<String, SCMProxyInfo> scmProxyInfoMap;
+  private List<String> scmNodeIDList;
+
+  private String currentProxySCMNodeId;
+  private int currentProxyIndex;
+  private ScmBlockLocationProtocol currentProxy;
+
+  private final ConfigurationSource conf;
+  private final long scmVersion;
+
+  private final String scmServiceId;
+
+  private String lastAttemptedLeader;
+
+  private final int maxRetryCount;
+  private final long retryInterval;
+
+  public static final String SCM_DUMMY_NODEID_PREFIX = "scm";
+
+  public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
+    this.conf = conf;
+    this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocol.class);
+    this.scmServiceId = conf.getTrimmed(OZONE_SCM_SERVICE_IDS_KEY);
+    this.scmProxies = new HashMap<>();
+    this.scmProxyInfoMap = new HashMap<>();
+    this.scmNodeIDList = new ArrayList<>();
+    loadConfigs();
+
+    this.currentProxyIndex = 0;
+    currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex);
+
+    this.maxRetryCount = conf.getObject(SCMBlockClientConfig.class)
+        .getRetryCount();
+    this.retryInterval = conf.getObject(SCMBlockClientConfig.class)
+        .getRetryInterval();
+  }
+
+  private Collection<InetSocketAddress> getSCMAddressList() {
+    Collection<String> scmAddressList =
+        conf.getTrimmedStringCollection(OZONE_SCM_NAMES);
+    Collection<InetSocketAddress> resultList = new ArrayList<>();
+    if (scmAddressList.isEmpty()) {
+      // fall back
+      resultList.add(getScmAddressForBlockClients(conf));
+    } else {
+      for (String scmAddress : scmAddressList) {
+        resultList.add(NetUtils.createSocketAddr(scmAddress));
+      }
+    }
+    return resultList;
+  }
+
+  private void loadConfigs() {
+    Collection<InetSocketAddress> scmAddressList = getSCMAddressList();
+    int scmNodeIndex = 1;
+    for (InetSocketAddress scmAddress : scmAddressList) {
+      String nodeId = SCM_DUMMY_NODEID_PREFIX + scmNodeIndex;
+      if (scmAddress == null) {
+        LOG.error("Failed to create SCM proxy for {}.", nodeId);
+        continue;
+      }
+      scmNodeIndex++;
+      SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
+          scmServiceId, nodeId, scmAddress);
+      ProxyInfo<ScmBlockLocationProtocolPB> proxy = new ProxyInfo<>(
+          null, scmProxyInfo.toString());
+      scmProxies.put(nodeId, proxy);
+      scmProxyInfoMap.put(nodeId, scmProxyInfo);
+      scmNodeIDList.add(nodeId);
+    }
+
+    if (scmProxies.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for SCM. Please configure the system with "
+          + OZONE_SCM_NAMES);
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized String getCurrentProxyOMNodeId() {
+    return currentProxySCMNodeId;
+  }
+
+  @Override
+  public synchronized ProxyInfo getProxy() {
+    ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId);
+    createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
+    return currentProxyInfo;
+  }
+
+  public ScmBlockLocationProtocolPB getCurrentProxy() {

Review comment:
       I guess this method is not needed.

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
##########
@@ -73,15 +75,19 @@
   private static final RpcController NULL_RPC_CONTROLLER = null;
 
   private final ScmBlockLocationProtocolPB rpcProxy;
+  private SCMBlockLocationFailoverProxyProvider failoverProxyProvider;
 
   /**
    * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
    *
-   * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
+   * @param proxyProvider {@link SCMBlockLocationFailoverProxyProvider}
+   * failover proxy provider.
    */
   public ScmBlockLocationProtocolClientSideTranslatorPB(
-      ScmBlockLocationProtocolPB rpcProxy) {
-    this.rpcProxy = rpcProxy;
+      SCMBlockLocationFailoverProxyProvider proxyProvider) {
+    Preconditions.checkState(proxyProvider != null);
+    this.failoverProxyProvider = proxyProvider;
+    this.rpcProxy = proxyProvider.getCurrentProxy();

Review comment:
       refer to OM-HA, the `rpcProxy` should be a result of `Proxy.newProxyInstance(...`. which relies on `FailoverProxyProvider` to switch between different `ScmBlockLocationProtocolService`.
   
   ```
       OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
           OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
       return proxy;
   ```

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
##########
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+
+/**
+ * Failover proxy provider for SCM.
+ */
+public class SCMBlockLocationFailoverProxyProvider implements
+    FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
+
+  private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
+  private Map<String, SCMProxyInfo> scmProxyInfoMap;
+  private List<String> scmNodeIDList;
+
+  private String currentProxySCMNodeId;
+  private int currentProxyIndex;
+  private ScmBlockLocationProtocol currentProxy;
+
+  private final ConfigurationSource conf;
+  private final long scmVersion;
+
+  private final String scmServiceId;
+
+  private String lastAttemptedLeader;
+
+  private final int maxRetryCount;
+  private final long retryInterval;
+
+  public static final String SCM_DUMMY_NODEID_PREFIX = "scm";
+
+  public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
+    this.conf = conf;
+    this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocol.class);
+    this.scmServiceId = conf.getTrimmed(OZONE_SCM_SERVICE_IDS_KEY);
+    this.scmProxies = new HashMap<>();
+    this.scmProxyInfoMap = new HashMap<>();
+    this.scmNodeIDList = new ArrayList<>();
+    loadConfigs();
+
+    this.currentProxyIndex = 0;
+    currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex);
+
+    this.maxRetryCount = conf.getObject(SCMBlockClientConfig.class)
+        .getRetryCount();
+    this.retryInterval = conf.getObject(SCMBlockClientConfig.class)
+        .getRetryInterval();
+  }
+
+  private Collection<InetSocketAddress> getSCMAddressList() {
+    Collection<String> scmAddressList =
+        conf.getTrimmedStringCollection(OZONE_SCM_NAMES);
+    Collection<InetSocketAddress> resultList = new ArrayList<>();
+    if (scmAddressList.isEmpty()) {
+      // fall back
+      resultList.add(getScmAddressForBlockClients(conf));
+    } else {
+      for (String scmAddress : scmAddressList) {
+        resultList.add(NetUtils.createSocketAddr(scmAddress));
+      }
+    }
+    return resultList;
+  }
+
+  private void loadConfigs() {
+    Collection<InetSocketAddress> scmAddressList = getSCMAddressList();
+    int scmNodeIndex = 1;
+    for (InetSocketAddress scmAddress : scmAddressList) {
+      String nodeId = SCM_DUMMY_NODEID_PREFIX + scmNodeIndex;
+      if (scmAddress == null) {
+        LOG.error("Failed to create SCM proxy for {}.", nodeId);
+        continue;
+      }
+      scmNodeIndex++;
+      SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
+          scmServiceId, nodeId, scmAddress);
+      ProxyInfo<ScmBlockLocationProtocolPB> proxy = new ProxyInfo<>(
+          null, scmProxyInfo.toString());
+      scmProxies.put(nodeId, proxy);
+      scmProxyInfoMap.put(nodeId, scmProxyInfo);
+      scmNodeIDList.add(nodeId);
+    }
+
+    if (scmProxies.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for SCM. Please configure the system with "
+          + OZONE_SCM_NAMES);
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized String getCurrentProxyOMNodeId() {
+    return currentProxySCMNodeId;
+  }
+
+  @Override
+  public synchronized ProxyInfo getProxy() {
+    ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId);
+    createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
+    return currentProxyInfo;
+  }
+
+  public ScmBlockLocationProtocolPB getCurrentProxy() {
+    return (ScmBlockLocationProtocolPB) getProxy().proxy;
+  }
+
+  @Override
+  public void performFailover(ScmBlockLocationProtocolPB newLeader) {
+    // By default, will round robin to next proxy.
+    nextProxyIndex();
+    LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId());
+  }
+
+  public void performFailoverToAssignedLeader(String newLeader) {
+    if (newLeader == null) {
+      // If newLeader is not assigned, it will fail over to next proxy.
+      nextProxyIndex();
+    } else {
+      if (!assignLeaderToNode(newLeader)) {
+        LOG.debug("Failing over OM proxy to nodeId: {}", newLeader);
+        nextProxyIndex();
+      }
+    }
+  }
+
+  @Override
+  public Class<ScmBlockLocationProtocolPB> getInterface() {
+    return ScmBlockLocationProtocolPB.class;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    for (ProxyInfo<ScmBlockLocationProtocolPB> proxy : scmProxies.values()) {
+      ScmBlockLocationProtocolPB scmProxy = proxy.proxy;
+      if (scmProxy != null) {
+        RPC.stopProxy(scmProxy);
+      }
+    }
+  }
+
+  public RetryAction getRetryAction(int failovers) {
+    if (failovers < maxRetryCount) {
+      return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+          getRetryInterval());
+    } else {
+      return RetryAction.FAIL;
+    }
+  }
+
+  private synchronized long getRetryInterval() {
+    // TODO add exponential backup
+    return retryInterval;
+  }
+
+  private synchronized int nextProxyIndex() {
+    lastAttemptedLeader = currentProxySCMNodeId;
+
+    // round robin the next proxy
+    currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+    currentProxySCMNodeId =  scmNodeIDList.get(currentProxyIndex);
+    return currentProxyIndex;
+  }
+
+  synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
+    if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+      if (scmProxies.containsKey(newLeaderNodeId)) {
+        lastAttemptedLeader = currentProxySCMNodeId;
+        currentProxySCMNodeId = newLeaderNodeId;
+        currentProxyIndex = scmNodeIDList.indexOf(currentProxySCMNodeId);
+        return true;
+      }
+    } else {
+      lastAttemptedLeader = currentProxySCMNodeId;
+    }
+    return false;
+  }
+
+  /**
+   * Creates proxy object if it does not already exist.
+   */
+  private void createSCMProxyIfNeeded(ProxyInfo proxyInfo,
+                                     String nodeId) {
+    if (proxyInfo.proxy == null) {
+      InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress();
+      try {
+        ScmBlockLocationProtocolPB proxy = createSCMProxy(address);
+        try {
+          proxyInfo.proxy = proxy;
+        } catch (IllegalAccessError iae) {
+          scmProxies.put(nodeId,
+              new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
+        }
+      } catch (IOException ioe) {
+        LOG.error("{} Failed to create RPC proxy to SCM at {}",
+            this.getClass().getSimpleName(), address, ioe);
+        throw new RuntimeException(ioe);
+      }
+    }
+  }
+
+  private ScmBlockLocationProtocolPB createSCMProxy(
+      InetSocketAddress scmAddress) throws IOException {
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+    RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocol.class,
+        ProtobufRpcEngine.class);
+    return RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+        scmAddress, UserGroupInformation.getCurrentUser(), hadoopConf,
+        NetUtils.getDefaultSocketFactory(hadoopConf),
+        (int)conf.getObject(SCMBlockClientConfig.class).getRpcTimeOut());
+  }
+
+  public RetryPolicy getSCMBlockLocationRetryPolicy(
+      String suggestedLeader) {
+    RetryPolicy retryPolicy = new RetryPolicy() {
+      @Override
+      public RetryAction shouldRetry(Exception e, int retry,
+                                     int failover, boolean b) {
+        if (suggestedLeader == null) {
+          performFailover(null);

Review comment:
       Should not call `performFailover()` in `FailoverProxyProvider`, it is job of `RetryInvocationHandler` to call `performFailover()` in its `RetryInvocationHandler.ProxyDescriptor.failover()`

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
##########
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+
+/**
+ * Failover proxy provider for SCM.
+ */
+public class SCMBlockLocationFailoverProxyProvider implements
+    FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
+
+  private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
+  private Map<String, SCMProxyInfo> scmProxyInfoMap;
+  private List<String> scmNodeIDList;
+
+  private String currentProxySCMNodeId;
+  private int currentProxyIndex;
+  private ScmBlockLocationProtocol currentProxy;

Review comment:
       This is not used.

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
##########
@@ -105,6 +111,11 @@ private SCMBlockLocationResponse submitRequest(
     try {
       SCMBlockLocationResponse response =
           rpcProxy.send(NULL_RPC_CONTROLLER, req);
+      if (response.getStatus() ==

Review comment:
       We need consider the case that current follower SCM does not know current leader.
   
   check this
   ```
     public OMResponse submitRequest(OMRequest payload) throws IOException {
       try {
         OMResponse omResponse =
             rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
   
         if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
           String leaderOmId = omResponse.getLeaderOMNodeId();
   
           // Failover to the OM node returned by OMResponse leaderOMNodeId if
           // current proxy is not pointing to that node.
           omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
         }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-issues-help@hadoop.apache.org