You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:21 UTC

[hadoop] 18/50: HDFS-13782. ObserverReadProxyProvider should work with IPFailoverProxyProvider. Contributed by Konstantin Shvachko.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 0b56a5300b1e16fc4c75fc71f4879cb7794c7390
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Sat Aug 25 17:32:30 2018 -0700

    HDFS-13782. ObserverReadProxyProvider should work with IPFailoverProxyProvider. Contributed by Konstantin Shvachko.
---
 .../namenode/ha/ObserverReadProxyProvider.java     | 105 +++++++++++++--------
 .../ObserverReadProxyProviderWithIPFailover.java   |  40 ++++++++
 2 files changed, 108 insertions(+), 37 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 754fea4..dcae2db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -17,30 +17,30 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.ClientGSIContext;
-import org.apache.hadoop.hdfs.NameNodeProxiesClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
  * that supports reading from observer namenode(s).
@@ -55,16 +55,20 @@ import org.slf4j.LoggerFactory;
  * observer is turned off.
  */
 public class ObserverReadProxyProvider<T extends ClientProtocol>
-    extends ConfiguredFailoverProxyProvider<T> {
+    extends AbstractNNFailoverProxyProvider<T> {
   private static final Logger LOG = LoggerFactory.getLogger(
       ObserverReadProxyProvider.class);
 
   /** Client-side context for syncing with the NameNode server side */
   private AlignmentContext alignmentContext;
 
+  private AbstractNNFailoverProxyProvider<T> failoverProxy;
+  /** All NameNdoe proxies */
+  private List<NNProxyInfo<T>> nameNodeProxies =
+      new ArrayList<NNProxyInfo<T>>();
   /** Proxies for the observer namenodes */
-  private final List<AddressRpcProxyPair<T>> observerProxies =
-      new ArrayList<>();
+  private final List<NNProxyInfo<T>> observerProxies =
+      new ArrayList<NNProxyInfo<T>>();
 
   /**
    * Whether reading from observer is enabled. If this is false, all read
@@ -81,36 +85,43 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   /** The last proxy that has been used. Only used for testing */
   private volatile ProxyInfo<T> lastProxy = null;
 
-  @SuppressWarnings("unchecked")
+  /**
+   * By default ObserverReadProxyProvider uses
+   * {@link ConfiguredFailoverProxyProvider} for failover.
+   */
   public ObserverReadProxyProvider(
       Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory)
       throws IOException {
+    this(conf, uri, xface, factory,
+        new ConfiguredFailoverProxyProvider<T>(conf, uri, xface,factory));
+  }
+
+  public ObserverReadProxyProvider(
+      Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
+      AbstractNNFailoverProxyProvider<T> failoverProxy)
+      throws IOException {
     super(conf, uri, xface, factory);
-    alignmentContext = new ClientGSIContext();
+    this.failoverProxy = failoverProxy;
+    this.alignmentContext = new ClientGSIContext();
     ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
 
+    // Get all NameNode proxies
+    nameNodeProxies = getProxyAddresses(uri,
+        HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
     // Find out all the observer proxies
-    for (AddressRpcProxyPair<T> ap : this.proxies) {
-      ap.namenode = (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
-          ap.address, conf, ugi, false, getFallbackToSimpleAuth(),
-          alignmentContext);
-      if (isObserverState(ap)) {
-        observerProxies.add(ap);
+    for (NNProxyInfo<T> pi : nameNodeProxies) {
+      createProxyIfNeeded(pi);
+      if (isObserverState(pi)) {
+        observerProxies.add(pi);
       }
     }
 
+    // TODO: No observers is not an error
+    // Just direct all reads go to the active NameNode
     if (observerProxies.isEmpty()) {
       throw new RuntimeException("Couldn't find any namenode proxy in " +
           "OBSERVER state");
     }
-
-    // Randomize the list to prevent all clients pointing to the same one
-    boolean randomized = conf.getBoolean(
-        HdfsClientConfigKeys.Failover.RANDOM_ORDER,
-        HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
-    if (randomized) {
-      Collections.shuffle(observerProxies);
-    }
   }
 
   public synchronized AlignmentContext getAlignmentContext() {
@@ -121,17 +132,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   @Override
   public synchronized ProxyInfo<T> getProxy() {
     // We just create a wrapped proxy containing all the proxies
-    List<ProxyInfo<T>> observerProxies = new ArrayList<>();
     StringBuilder combinedInfo = new StringBuilder("[");
 
     for (int i = 0; i < this.observerProxies.size(); i++) {
       if (i > 0) {
         combinedInfo.append(",");
       }
-      AddressRpcProxyPair<T> p = this.observerProxies.get(i);
-      ProxyInfo<T> pInfo = getProxy(p);
-      observerProxies.add(pInfo);
-      combinedInfo.append(pInfo.proxyInfo);
+      combinedInfo.append(observerProxies.get(i).proxyInfo);
     }
 
     combinedInfo.append(']');
@@ -142,6 +149,11 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     return new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
   }
 
+  @Override
+  public void performFailover(T currentProxy) {
+    failoverProxy.performFailover(currentProxy);
+  }
+
   /**
    * Check if a method is read-only.
    *
@@ -170,14 +182,14 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     return lastProxy;
   }
 
-  boolean isObserverState(AddressRpcProxyPair<T> ap) {
+  boolean isObserverState(NNProxyInfo<T> pi) {
     // TODO: should introduce new ClientProtocol method to verify the
     // underlying service state, which does not require superuser access
     // The is a workaround
     IOException ioe = null;
     try {
       // Verify write access first
-      ap.namenode.reportBadBlocks(new LocatedBlock[0]);
+      pi.proxy.reportBadBlocks(new LocatedBlock[0]);
       return false; // Only active NameNode allows write
     } catch (RemoteException re) {
       IOException sbe = re.unwrapRemoteException(StandbyException.class);
@@ -188,14 +200,14 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       ioe = e;
     }
     if (ioe != null) {
-      LOG.error("Failed to connect to {}", ap.address, ioe);
+      LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
       return false;
     }
     // Verify read access
     // For now we assume only Observer nodes allow reads
     // Stale reads on StandbyNode should be turned off
     try {
-      ap.namenode.checkAccess("/", FsAction.READ);
+      pi.proxy.checkAccess("/", FsAction.READ);
       return true;
     } catch (RemoteException re) {
       IOException sbe = re.unwrapRemoteException(StandbyException.class);
@@ -206,19 +218,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       ioe = e;
     }
     if (ioe != null) {
-      LOG.error("Failed to connect to {}", ap.address, ioe);
+      LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
     }
     return false;
   }
 
 
   class ObserverReadInvocationHandler implements InvocationHandler {
-    final List<ProxyInfo<T>> observerProxies;
+    final List<NNProxyInfo<T>> observerProxies;
     final ProxyInfo<T> activeProxy;
 
-    ObserverReadInvocationHandler(List<ProxyInfo<T>> observerProxies) {
+    ObserverReadInvocationHandler(List<NNProxyInfo<T>> observerProxies) {
       this.observerProxies = observerProxies;
-      this.activeProxy = ObserverReadProxyProvider.super.getProxy();
+      this.activeProxy = failoverProxy.getProxy();
     }
 
     /**
@@ -238,7 +250,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       if (observerReadEnabled && isRead(method)) {
         // Loop through all the proxies, starting from the current index.
         for (int i = 0; i < observerProxies.size(); i++) {
-          ProxyInfo<T> current = observerProxies.get(currentIndex.get());
+          NNProxyInfo<T> current = observerProxies.get(currentIndex.get());
           try {
             retVal = method.invoke(current.proxy, args);
             lastProxy = current;
@@ -269,4 +281,23 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       return retVal;
     }
   }
+
+  @Override
+  public synchronized void close() throws IOException {
+    failoverProxy.close();
+    for (ProxyInfo<T> pi : nameNodeProxies) {
+      if (pi.proxy != null) {
+        if (pi.proxy instanceof Closeable) {
+          ((Closeable)pi.proxy).close();
+        } else {
+          RPC.stopProxy(pi.proxy);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean useLogicalURI() {
+    return failoverProxy.useLogicalURI();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java
new file mode 100644
index 0000000..1dbd02c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+
+/**
+ * ObserverReadProxyProvider with IPFailoverProxyProvider
+ * as the failover method.
+ */
+public class
+ObserverReadProxyProviderWithIPFailover<T extends ClientProtocol>
+extends ObserverReadProxyProvider<T> {
+
+  public ObserverReadProxyProviderWithIPFailover(
+      Configuration conf, URI uri, Class<T> xface,
+      HAProxyFactory<T> factory) throws IOException {
+    super(conf, uri, xface, factory,
+        new IPFailoverProxyProvider<T>(conf, uri, xface,factory));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org