You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/10/04 17:51:42 UTC

[hadoop] branch branch-3.1 updated: HDFS-14162. [SBN read] Allow Balancer to work with Observer node. Add a new ProxyCombiner allowing for multiple related protocols to be combined. Allow AlignmentContext to be passed in NameNodeProxyFactory. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 56caaca  HDFS-14162. [SBN read] Allow Balancer to work with Observer node. Add a new ProxyCombiner allowing for multiple related protocols to be combined. Allow AlignmentContext to be passed in NameNodeProxyFactory. Contributed by Erik Krogen.
56caaca is described below

commit 56caacac1f84ceb2eff90c39a63111a219c93439
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Thu Dec 20 17:49:22 2018 -0800

    HDFS-14162. [SBN read] Allow Balancer to work with Observer node. Add a new ProxyCombiner allowing for multiple related protocols to be combined. Allow AlignmentContext to be passed in NameNodeProxyFactory. Contributed by Erik Krogen.
    
    (cherry picked from 64f28f9efa2ef3cd9dd54a6c5009029721e030ed)
    (cherry picked from 69b0c513a9b11cc7f795747732173b36aacbe794)
---
 .../java/org/apache/hadoop/ipc/ProxyCombiner.java  | 137 +++++++++++++++++++++
 .../hdfs/server/namenode/ha/HAProxyFactory.java    |   9 ++
 .../namenode/ha/ObserverReadProxyProvider.java     |   2 +-
 .../org/apache/hadoop/hdfs/NameNodeProxies.java    | 108 ++++++++++------
 .../hdfs/server/balancer/NameNodeConnector.java    |  11 +-
 .../server/namenode/ha/NameNodeHAProxyFactory.java |   9 +-
 .../hdfs/server/protocol/BalancerProtocols.java    |  30 +++++
 .../balancer/TestBalancerWithHANameNodes.java      | 101 ++++++++++-----
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java |  12 +-
 9 files changed, 338 insertions(+), 81 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java
new file mode 100644
index 0000000..fbafabc
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A utility class used to combine two protocol proxies.
+ * See {@link #combine(Class, Object...)}.
+ */
+public final class ProxyCombiner {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ProxyCombiner.class);
+
+  private ProxyCombiner() { }
+
+  /**
+   * Combine two or more proxies which together comprise a single proxy
+   * interface. This can be used for a protocol interface which {@code extends}
+   * multiple other protocol interfaces. The returned proxy will implement
+   * all of the methods of the combined proxy interface, delegating calls
+   * to which proxy implements that method. If multiple proxies implement the
+   * same method, the first in the list will be used for delegation.
+   *
+   * <p/>This will check that every method on the combined interface is
+   * implemented by at least one of the supplied proxy objects.
+   *
+   * @param combinedProxyInterface The interface of the combined proxy.
+   * @param proxies The proxies which should be used as delegates.
+   * @param <T> The type of the proxy that will be returned.
+   * @return The combined proxy.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T combine(Class<T> combinedProxyInterface,
+      Object... proxies) {
+    methodLoop:
+    for (Method m : combinedProxyInterface.getMethods()) {
+      for (Object proxy : proxies) {
+        try {
+          proxy.getClass().getMethod(m.getName(), m.getParameterTypes());
+          continue methodLoop; // go to the next method
+        } catch (NoSuchMethodException nsme) {
+          // Continue to try the next proxy
+        }
+      }
+      throw new IllegalStateException("The proxies specified for "
+          + combinedProxyInterface + " do not cover method " + m);
+    }
+
+    InvocationHandler handler = new CombinedProxyInvocationHandler(proxies);
+    return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
+        new Class[] {combinedProxyInterface}, handler);
+  }
+
+  private static final class CombinedProxyInvocationHandler
+      implements RpcInvocationHandler {
+
+    private final Object[] proxies;
+
+    private CombinedProxyInvocationHandler(Object[] proxies) {
+      this.proxies = proxies;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      Exception lastException = null;
+      for (Object underlyingProxy : proxies) {
+        try {
+          return method.invoke(underlyingProxy, args);
+        } catch (IllegalAccessException|IllegalArgumentException e) {
+          lastException = e;
+        }
+      }
+      // This shouldn't happen since the method coverage was verified in build()
+      LOG.error("BUG: Method {} was unable to be found on any of the "
+          + "underlying proxies for {}", method, proxy.getClass());
+      throw new IllegalArgumentException("Method " + method + " not supported",
+          lastException);
+    }
+
+    /**
+     * Since this is incapable of returning multiple connection IDs, simply
+     * return the first one. In most cases, the connection ID should be the same
+     * for all proxies.
+     */
+    @Override
+    public ConnectionId getConnectionId() {
+      return RPC.getConnectionIdForProxy(proxies[0]);
+    }
+
+    @Override
+    public void close() throws IOException {
+      MultipleIOException.Builder exceptionBuilder =
+          new MultipleIOException.Builder();
+      for (Object proxy : proxies) {
+        if (proxy instanceof Closeable) {
+          try {
+            ((Closeable) proxy).close();
+          } catch (IOException ioe) {
+            exceptionBuilder.add(ioe);
+          }
+        }
+      }
+      if (!exceptionBuilder.isEmpty()) {
+        throw exceptionBuilder.build();
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
index f92a74f..9364780 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -41,4 +42,12 @@ public interface HAProxyFactory<T> {
   T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries) throws IOException;
 
+  /**
+   * Set the alignment context to be used when creating new proxies using
+   * this factory. Not all implementations will use this alignment context.
+   */
+  default void setAlignmentContext(AlignmentContext alignmentContext) {
+    // noop
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 3cccecf..2ccf885 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -156,7 +156,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     super(conf, uri, xface, factory);
     this.failoverProxy = failoverProxy;
     this.alignmentContext = new ClientGSIContext();
-    ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
+    factory.setAlignmentContext(alignmentContext);
 
     // Don't bother configuring the number of retries and such on the retry
     // policy since it is mainly only used for determining whether or not an
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index d556c90..2e2c5d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -37,13 +37,16 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProxyCombiner;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
@@ -118,7 +121,7 @@ public class NameNodeProxies {
     if (failoverProxyProvider == null) {
       return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
           xface, UserGroupInformation.getCurrentUser(), true,
-          fallbackToSimpleAuth);
+          fallbackToSimpleAuth, null);
     } else {
       return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
           failoverProxyProvider);
@@ -141,7 +144,7 @@ public class NameNodeProxies {
   public static <T> ProxyAndInfo<T> createNonHAProxy(
       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries) throws IOException {
-    return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
+    return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null, null);
   }
 
   /**
@@ -163,27 +166,36 @@ public class NameNodeProxies {
   public static <T> ProxyAndInfo<T> createNonHAProxy(
       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries,
-      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+      AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+      throws IOException {
     Text dtService = SecurityUtil.buildTokenService(nnAddr);
   
     T proxy;
     if (xface == ClientProtocol.class) {
-      proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
-          nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
+      proxy = (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
+          nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth,
+          alignmentContext);
     } else if (xface == JournalProtocol.class) {
-      proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi,
+          alignmentContext);
     } else if (xface == NamenodeProtocol.class) {
       proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
-          withRetries);
+          withRetries, alignmentContext);
     } else if (xface == GetUserMappingsProtocol.class) {
-      proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi,
+          alignmentContext);
     } else if (xface == RefreshUserMappingsProtocol.class) {
-      proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf,
+          ugi, alignmentContext);
     } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
       proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
-          conf, ugi);
+          conf, ugi, alignmentContext);
     } else if (xface == RefreshCallQueueProtocol.class) {
-      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi,
+          alignmentContext);
+    } else if (xface == BalancerProtocols.class) {
+      proxy = (T) createNNProxyWithBalancerProtocol(nnAddr, conf, ugi,
+          withRetries, fallbackToSimpleAuth, alignmentContext);
     } else {
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to NameNode: " +
@@ -194,52 +206,57 @@ public class NameNodeProxies {
 
     return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
   }
-  
+
   private static JournalProtocol createNNProxyWithJournalProtocol(
-      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
-      throws IOException {
-    JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
-        conf, ugi, JournalProtocolPB.class, 30000);
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    JournalProtocolPB proxy = createNameNodeProxy(address,
+        conf, ugi, JournalProtocolPB.class, 30000, alignmentContext);
     return new JournalProtocolTranslatorPB(proxy);
   }
 
   private static RefreshAuthorizationPolicyProtocol
       createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
-          Configuration conf, UserGroupInformation ugi) throws IOException {
-    RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
-        createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0);
+      Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address,
+        conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0,
+        alignmentContext);
     return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
   }
   
   private static RefreshUserMappingsProtocol
       createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
-          Configuration conf, UserGroupInformation ugi) throws IOException {
-    RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
-        createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0);
+      Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf,
+        ugi, RefreshUserMappingsProtocolPB.class, 0, alignmentContext);
     return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
   }
 
   private static RefreshCallQueueProtocol
       createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
-          Configuration conf, UserGroupInformation ugi) throws IOException {
-    RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
-        createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0);
+      Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
+        RefreshCallQueueProtocolPB.class, 0, alignmentContext);
     return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
   }
 
   private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
-      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
-      throws IOException {
-    GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
-        createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
+        GetUserMappingsProtocolPB.class, 0, alignmentContext);
     return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
   }
   
   private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
-      boolean withRetries) throws IOException {
-    NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
-        address, conf, ugi, NamenodeProtocolPB.class, 0);
+      boolean withRetries, AlignmentContext alignmentContext)
+      throws IOException {
+    NamenodeProtocolPB proxy = createNameNodeProxy(
+        address, conf, ugi, NamenodeProtocolPB.class, 0, alignmentContext);
     if (withRetries) { // create the proxy with retries
       RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
               TimeUnit.MILLISECONDS);
@@ -256,13 +273,28 @@ public class NameNodeProxies {
     }
   }
 
-  private static Object createNameNodeProxy(InetSocketAddress address,
-      Configuration conf, UserGroupInformation ugi, Class<?> xface,
-      int rpcTimeout) throws IOException {
+  private static BalancerProtocols createNNProxyWithBalancerProtocol(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
+      AlignmentContext alignmentContext) throws IOException {
+    NamenodeProtocol namenodeProtocol = createNNProxyWithNamenodeProtocol(
+        address, conf, ugi, withRetries, alignmentContext);
+    ClientProtocol clientProtocol =
+        NameNodeProxiesClient.createProxyWithAlignmentContext(address,
+            conf, ugi, withRetries, fallbackToSimpleAuth, alignmentContext);
+
+    return ProxyCombiner.combine(BalancerProtocols.class,
+        namenodeProtocol, clientProtocol);
+  }
+
+  private static <T> T createNameNodeProxy(InetSocketAddress address,
+      Configuration conf, UserGroupInformation ugi, Class<T> xface,
+      int rpcTimeout, AlignmentContext alignmentContext) throws IOException {
     RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
-    Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
-        ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
-    return proxy;
+    return RPC.getProtocolProxy(xface,
+        RPC.getProtocolVersion(xface), address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf), rpcTimeout, null, null,
+        alignmentContext).getProxy();
   }
 
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index b0dd779..2819137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -43,11 +43,11 @@ import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -104,8 +104,7 @@ public class NameNodeConnector implements Closeable {
   private final URI nameNodeUri;
   private final String blockpoolID;
 
-  private final NamenodeProtocol namenode;
-  private final ClientProtocol client;
+  private final BalancerProtocols namenode;
   private final KeyManager keyManager;
   final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
@@ -129,9 +128,7 @@ public class NameNodeConnector implements Closeable {
     this.maxNotChangedIterations = maxNotChangedIterations;
 
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
-        NamenodeProtocol.class).getProxy();
-    this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
-        ClientProtocol.class, fallbackToSimpleAuth).getProxy();
+        BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
     this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -185,7 +182,7 @@ public class NameNodeConnector implements Closeable {
   /** @return live datanode storage reports. */
   public DatanodeStorageReport[] getLiveDatanodeStorageReport()
       throws IOException {
-    return client.getDatanodeStorageReport(DatanodeReportType.LIVE);
+    return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE);
   }
 
   /** @return the key manager */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
index 036b6eb..1aaaa38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -27,12 +28,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
 
+  private AlignmentContext alignmentContext;
+
   @Override
   public T createProxy(Configuration conf, InetSocketAddress nnAddr,
       Class<T> xface, UserGroupInformation ugi, boolean withRetries,
       AtomicBoolean fallbackToSimpleAuth) throws IOException {
     return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
-      ugi, withRetries, fallbackToSimpleAuth).getProxy();
+        ugi, withRetries, fallbackToSimpleAuth, alignmentContext).getProxy();
   }
 
   @Override
@@ -42,4 +45,8 @@ public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
     return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
       ugi, withRetries).getProxy();
   }
+
+  public void setAlignmentContext(AlignmentContext alignmentContext) {
+    this.alignmentContext = alignmentContext;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java
new file mode 100644
index 0000000..d23f6cb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+
+/** The full set of protocols used by the Balancer. */
+@InterfaceAudience.Private
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
+public interface BalancerProtocols extends ClientProtocol, NamenodeProtocol { }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index 1444193..4a398db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -18,14 +18,13 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.net.URI;
 import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
@@ -33,7 +32,9 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
 import org.junit.Test;
 
 /**
@@ -43,6 +44,13 @@ public class TestBalancerWithHANameNodes {
   private MiniDFSCluster cluster;
   ClientProtocol client;
 
+  // array of racks for original nodes in cluster
+  private static final String[] TEST_RACKS =
+      {TestBalancer.RACK0, TestBalancer.RACK1};
+  // array of capacities for original nodes in cluster
+  private static final long[] TEST_CAPACITIES =
+      {TestBalancer.CAPACITY, TestBalancer.CAPACITY};
+
   static {
     TestBalancer.initTestSetup();
   }
@@ -57,52 +65,79 @@ public class TestBalancerWithHANameNodes {
   public void testBalancerWithHANameNodes() throws Exception {
     Configuration conf = new HdfsConfiguration();
     TestBalancer.initConf(conf);
-    long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
-    String newNodeRack = TestBalancer.RACK2; // new node's rack
-    // array of racks for original nodes in cluster
-    String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 };
-    // array of capacities of original nodes in cluster
-    long[] capacities = new long[] { TestBalancer.CAPACITY,
-        TestBalancer.CAPACITY };
-    assertEquals(capacities.length, racks.length);
-    int numOfDatanodes = capacities.length;
+    assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
     NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
     nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
     Configuration copiedConf = new Configuration(conf);
     cluster = new MiniDFSCluster.Builder(copiedConf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(capacities.length)
-        .racks(racks)
-        .simulatedCapacities(capacities)
+        .numDataNodes(TEST_CAPACITIES.length)
+        .racks(TEST_RACKS)
+        .simulatedCapacities(TEST_CAPACITIES)
         .build();
     HATestUtil.setFailoverConfigurations(cluster, conf);
     try {
       cluster.waitActive();
-      cluster.transitionToActive(1);
+      cluster.transitionToActive(0);
       Thread.sleep(500);
       client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
           ClientProtocol.class).getProxy();
-      long totalCapacity = TestBalancer.sum(capacities);
-      // fill up the cluster to be 30% full
-      long totalUsedSpace = totalCapacity * 3 / 10;
-      TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
-          / numOfDatanodes, (short) numOfDatanodes, 1);
 
-      // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
-          new long[] { newNodeCapacity });
-      totalCapacity += newNodeCapacity;
-      TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
-          cluster);
-      Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-      assertEquals(1, namenodes.size());
-      assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
-      final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
-      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
-      TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
-          cluster, BalancerParameters.DEFAULT);
+      doTest(conf);
     } finally {
       cluster.shutdown();
     }
   }
+
+  void doTest(Configuration conf) throws Exception {
+    int numOfDatanodes = TEST_CAPACITIES.length;
+    long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
+    // fill up the cluster to be 30% full
+    long totalUsedSpace = totalCapacity * 3 / 10;
+    TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
+        / numOfDatanodes, (short) numOfDatanodes, 0);
+
+    // start up an empty node with the same capacity and on the same rack
+    long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
+    String newNodeRack = TestBalancer.RACK2; // new node's rack
+    cluster.startDataNodes(conf, 1, true, null, new String[] {newNodeRack},
+        new long[] {newNodeCapacity});
+    totalCapacity += newNodeCapacity;
+    TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
+        cluster);
+    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    assertEquals(1, namenodes.size());
+    final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+    assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+    TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+        cluster, BalancerParameters.DEFAULT);
+  }
+
+  /**
+   * Test Balancer with ObserverNodes.
+   */
+  @Test(timeout = 60000)
+  public void testBalancerWithObserver() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    TestBalancer.initConf(conf);
+
+    MiniQJMHACluster qjmhaCluster = null;
+    try {
+      qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2,
+          TEST_CAPACITIES.length, true, TEST_CAPACITIES, TEST_RACKS);
+      cluster = qjmhaCluster.getDfsCluster();
+      cluster.waitClusterUp();
+      cluster.waitActive();
+
+      DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
+          cluster, conf, ObserverReadProxyProvider.class, true);
+      client = dfs.getClient().getNamenode();
+
+      doTest(conf);
+    } finally {
+      if (qjmhaCluster != null) {
+        qjmhaCluster.shutdown();
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 40b2fe8..b932746 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -209,6 +209,14 @@ public abstract class HATestUtil {
   public static MiniQJMHACluster setUpObserverCluster(
       Configuration conf, int numObservers, int numDataNodes,
       boolean fastTailing) throws IOException {
+    return setUpObserverCluster(conf, numObservers, numDataNodes,
+        fastTailing, null, null);
+  }
+
+  public static MiniQJMHACluster setUpObserverCluster(
+      Configuration conf, int numObservers, int numDataNodes,
+      boolean fastTailing, long[] simulatedCapacities,
+      String[] racks) throws IOException {
     // disable block scanner
     conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
 
@@ -224,7 +232,9 @@ public abstract class HATestUtil {
 
     MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
         .setNumNameNodes(2 + numObservers);
-    qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes);
+    qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes)
+        .simulatedCapacities(simulatedCapacities)
+        .racks(racks);
     MiniQJMHACluster qjmhaCluster = qjmBuilder.build();
     MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org