You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/10 03:35:23 UTC

[12/50] [abbrv] hadoop git commit: Revert "Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou""

Revert "Revert "HADOOP-12957. Limit the number of outstanding async calls.  Contributed by Xiaobing Zhou""

This reverts commit 4d36b221a24e3b626bb91093b0bb0fd377061cae.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa20fa15
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa20fa15
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa20fa15

Branch: refs/heads/HDFS-7240
Commit: aa20fa150d522b9fe469dd99a8e24d7e27d888ea
Parents: eded3d1
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:28:47 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:28:47 2016 +0800

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |   3 +
 .../ipc/AsyncCallLimitExceededException.java    |  36 +++
 .../main/java/org/apache/hadoop/ipc/Client.java |  66 ++++-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 199 ++++++++++++++--
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  12 +-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 238 +++++++++++++------
 6 files changed, 445 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 86e1b43..06614db 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -324,6 +324,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
     4*60*60; // 4 hours
   
+  public static final String  IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
+      "ipc.client.async.calls.max";
+  public static final int     IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
   public static final String  IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
new file mode 100644
index 0000000..db97b6c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+/**
+ * Signals that an AsyncCallLimitExceededException has occurred. This class is
+ * used to make application code using async RPC aware that limit of max async
+ * calls is reached, application code need to retrieve results from response of
+ * established async calls to avoid buffer overflow in order for follow-on async
+ * calls going correctly.
+ */
+public class AsyncCallLimitExceededException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public AsyncCallLimitExceededException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d59aeb89..9be4649 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,7 +159,9 @@ public class Client implements AutoCloseable {
 
   private final boolean fallbackAllowed;
   private final byte[] clientId;
-  
+  private final int maxAsyncCalls;
+  private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
+
   /**
    * Executor on which IPC calls' parameters are sent.
    * Deferring the sending of parameters to a separate
@@ -1288,6 +1290,9 @@ public class Client implements AutoCloseable {
         CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.clientId = ClientId.getClientId();
     this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
+    this.maxAsyncCalls = conf.getInt(
+        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
   }
 
   /**
@@ -1354,6 +1359,20 @@ public class Client implements AutoCloseable {
       fallbackToSimpleAuth);
   }
 
+  private void checkAsyncCall() throws IOException {
+    if (isAsynchronousMode()) {
+      if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
+        asyncCallCounter.decrementAndGet();
+        String errMsg = String.format(
+            "Exceeded limit of max asynchronous calls: %d, " +
+            "please configure %s to adjust it.",
+            maxAsyncCalls,
+            CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
+        throw new AsyncCallLimitExceededException(errMsg);
+      }
+    }
+  }
+
   /**
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc response.
@@ -1374,24 +1393,38 @@ public class Client implements AutoCloseable {
     final Call call = createCall(rpcKind, rpcRequest);
     final Connection connection = getConnection(remoteId, call, serviceClass,
         fallbackToSimpleAuth);
+
     try {
-      connection.sendRpcRequest(call);                 // send the rpc request
-    } catch (RejectedExecutionException e) {
-      throw new IOException("connection has been closed", e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn("interrupted waiting to send rpc request to server", e);
-      throw new IOException(e);
+      checkAsyncCall();
+      try {
+        connection.sendRpcRequest(call);                 // send the rpc request
+      } catch (RejectedExecutionException e) {
+        throw new IOException("connection has been closed", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("interrupted waiting to send rpc request to server", e);
+        throw new IOException(e);
+      }
+    } catch(Exception e) {
+      if (isAsynchronousMode()) {
+        releaseAsyncCall();
+      }
+      throw e;
     }
 
     if (isAsynchronousMode()) {
       Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+        private final AtomicBoolean callled = new AtomicBoolean(false);
         @Override
         public Writable get() throws InterruptedException, ExecutionException {
-          try {
-            set(getRpcResponse(call, connection));
-          } catch (IOException ie) {
-            setException(ie);
+          if (callled.compareAndSet(false, true)) {
+            try {
+              set(getRpcResponse(call, connection));
+            } catch (IOException ie) {
+              setException(ie);
+            } finally {
+              releaseAsyncCall();
+            }
           }
           return super.get();
         }
@@ -1427,6 +1460,15 @@ public class Client implements AutoCloseable {
     asynchronousMode.set(async);
   }
 
+  private void releaseAsyncCall() {
+    asyncCallCounter.decrementAndGet();
+  }
+
+  @VisibleForTesting
+  int getAsyncCallCount() {
+    return asyncCallCounter.get();
+  }
+
   private Writable getRpcResponse(final Call call, final Connection connection)
       throws IOException {
     synchronized (call) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c7..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -34,6 +35,7 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -54,12 +56,13 @@ public class TestAsyncIPC {
   @Before
   public void setupConf() {
     conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
     Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
     // set asynchronous mode for main thread
     Client.setAsynchronousMode(true);
   }
 
-  protected static class SerialCaller extends Thread {
+  static class AsyncCaller extends Thread {
     private Client client;
     private InetSocketAddress server;
     private int count;
@@ -68,11 +71,11 @@ public class TestAsyncIPC {
         new HashMap<Integer, Future<LongWritable>>();
     Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
 
-    public SerialCaller(Client client, InetSocketAddress server, int count) {
+    public AsyncCaller(Client client, InetSocketAddress server, int count) {
       this.client = client;
       this.server = server;
       this.count = count;
-      // set asynchronous mode, since SerialCaller extends Thread
+      // set asynchronous mode, since AsyncCaller extends Thread
       Client.setAsynchronousMode(true);
     }
 
@@ -107,14 +110,111 @@ public class TestAsyncIPC {
     }
   }
 
-  @Test
-  public void testSerial() throws IOException, InterruptedException,
+  static class AsyncLimitlCaller extends Thread {
+    private Client client;
+    private InetSocketAddress server;
+    private int count;
+    private boolean failed;
+    Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
+    Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
+    int start = 0, end = 0;
+
+    int getStart() {
+      return start;
+    }
+
+    int getEnd() {
+      return end;
+    }
+
+    int getCount() {
+      return count;
+    }
+
+    public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
+      this(0, client, server, count);
+    }
+
+    final int callerId;
+
+    public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
+        int count) {
+      this.client = client;
+      this.server = server;
+      this.count = count;
+      // set asynchronous mode, since AsyncLimitlCaller extends Thread
+      Client.setAsynchronousMode(true);
+      this.callerId = callerId;
+    }
+
+    @Override
+    public void run() {
+      // in case Thread#Start is called, which will spawn new thread
+      Client.setAsynchronousMode(true);
+      for (int i = 0; i < count; i++) {
+        try {
+          final long param = TestIPC.RANDOM.nextLong();
+          runCall(i, param);
+        } catch (Exception e) {
+          LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
+              StringUtils.stringifyException(e)));
+          failed = true;
+        }
+      }
+    }
+
+    private void runCall(final int idx, final long param)
+        throws InterruptedException, ExecutionException, IOException {
+      for (;;) {
+        try {
+          doCall(idx, param);
+          return;
+        } catch (AsyncCallLimitExceededException e) {
+          /**
+           * reached limit of async calls, fetch results of finished async calls
+           * to let follow-on calls go
+           */
+          start = end;
+          end = idx;
+          waitForReturnValues(start, end);
+        }
+      }
+    }
+
+    private void doCall(final int idx, final long param) throws IOException {
+      TestIPC.call(client, param, server, conf);
+      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+      returnFutures.put(idx, returnFuture);
+      expectedValues.put(idx, param);
+    }
+
+    private void waitForReturnValues(final int start, final int end)
+        throws InterruptedException, ExecutionException {
+      for (int i = start; i < end; i++) {
+        LongWritable value = returnFutures.get(i).get();
+        if (expectedValues.get(i) != value.get()) {
+          LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
+          failed = true;
+          break;
+        }
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testAsyncCall() throws IOException, InterruptedException,
       ExecutionException {
-    internalTestSerial(3, false, 2, 5, 100);
-    internalTestSerial(3, true, 2, 5, 10);
+    internalTestAsyncCall(3, false, 2, 5, 100);
+    internalTestAsyncCall(3, true, 2, 5, 10);
   }
 
-  public void internalTestSerial(int handlerCount, boolean handlerSleep,
+  @Test(timeout = 60000)
+  public void testAsyncCallLimit() throws IOException,
+      InterruptedException, ExecutionException {
+    internalTestAsyncCallLimit(100, false, 5, 10, 500);
+  }
+
+  public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
     Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -126,9 +226,9 @@ public class TestAsyncIPC {
       clients[i] = new Client(LongWritable.class, conf);
     }
 
-    SerialCaller[] callers = new SerialCaller[callerCount];
+    AsyncCaller[] callers = new AsyncCaller[callerCount];
     for (int i = 0; i < callerCount; i++) {
-      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+      callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
       callers[i].start();
     }
     for (int i = 0; i < callerCount; i++) {
@@ -144,6 +244,75 @@ public class TestAsyncIPC {
     server.stop();
   }
 
+  @Test(timeout = 60000)
+  public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
+      InterruptedException, ExecutionException {
+    int handlerCount = 10, callCount = 100;
+    Server server = new TestIPC.TestServer(handlerCount, false, conf);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+    final Client client = new Client(LongWritable.class, conf);
+
+    int asyncCallCount = client.getAsyncCallCount();
+
+    try {
+      AsyncCaller caller = new AsyncCaller(client, addr, callCount);
+      caller.run();
+
+      caller.waitForReturnValues();
+      String msg = String.format(
+          "First time, expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
+
+      caller.waitForReturnValues();
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
+      msg = String.format("Second time, expected not failed for caller: %s.",
+          caller);
+      assertFalse(msg, caller.failed);
+
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
+
+  public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
+      int clientCount, int callerCount, int callCount) throws IOException,
+      InterruptedException, ExecutionException {
+    Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
+    Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
+
+    Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+
+    AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
+          callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      callers[i].waitForReturnValues(callers[i].getStart(),
+          callers[i].getCount());
+      String msg = String.format("Expected not failed for caller-%d: %s.", i,
+          callers[i]);
+      assertFalse(msg, callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    server.stop();
+  }
+
   /**
    * Test if (1) the rpc server uses the call id/retry provided by the rpc
    * client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -196,7 +365,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final SerialCaller caller = new SerialCaller(client, addr, 4);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 4);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -235,7 +404,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -272,7 +441,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -313,9 +482,9 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      SerialCaller[] callers = new SerialCaller[callerCount];
+      AsyncCaller[] callers = new AsyncCaller[callerCount];
       for (int i = 0; i < callerCount; ++i) {
-        callers[i] = new SerialCaller(client, addr, perCallerCallCount);
+        callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
         callers[i].start();
       }
       for (int i = 0; i < callerCount; ++i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 37899aa..356ae3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
@@ -50,11 +51,14 @@ public class AsyncDistributedFileSystem {
     final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
         .getReturnValueCallback();
     Future<T> returnFuture = new AbstractFuture<T>() {
+      private final AtomicBoolean called = new AtomicBoolean(false);
       public T get() throws InterruptedException, ExecutionException {
-        try {
-          set(returnValueCallback.call());
-        } catch (Exception e) {
-          setException(e);
+        if (called.compareAndSet(false, true)) {
+          try {
+            set(returnValueCallback.call());
+          } catch (Exception e) {
+            setException(e);
+          }
         }
         return super.get();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 9322e1a..d129299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
@@ -31,80 +30,25 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
-  final Path asyncRenameDir = new Path("/test/async_rename/");
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  final private static Configuration CONF = new HdfsConfiguration();
-
-  final private static String GROUP1_NAME = "group1";
-  final private static String GROUP2_NAME = "group2";
-  final private static String USER1_NAME = "user1";
-  private static final UserGroupInformation USER1;
-
-  private MiniDFSCluster gCluster;
-
-  static {
-    // explicitly turn on permission checking
-    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
-    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
-    // Initiate all four users
-    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
-        GROUP1_NAME, GROUP2_NAME });
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
-    gCluster.waitActive();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (gCluster != null) {
-      gCluster.shutdown();
-      gCluster = null;
-    }
-  }
-
-  static int countLease(MiniDFSCluster cluster) {
-    return TestDFSRename.countLease(cluster);
-  }
-
-  void list(DistributedFileSystem dfs, String name) throws IOException {
-    FileSystem.LOG.info("\n\n" + name);
-    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
-      FileSystem.LOG.info("" + s.getPath());
-    }
-  }
-
-  static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
-    DataOutputStream a_out = dfs.create(f);
-    a_out.writeBytes("something");
-    a_out.close();
-  }
 
   /**
    * Check the blocks of dst file are cleaned after rename with overwrite
    * Restart NN to check the rename successfully
    */
-  @Test
+  @Test(timeout = 60000)
   public void testAsyncRenameWithOverwrite() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
@@ -169,38 +113,134 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test
-  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+  @Test(timeout = 60000)
+  public void testCallGetReturnValueMultipleTimes() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
     final Path renameDir = new Path(
-        "/test/concurrent_reanme_with_overwrite_dir/");
+        "/test/testCallGetReturnValueMultipleTimes/");
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+    final int count = 100;
+    long fileLen = blockSize * 3;
+    final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+    assertTrue(dfs.mkdirs(renameDir));
+
+    try {
+      // concurrently invoking many rename
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
+      }
+
+      for (int i = 0; i < 5; i++) {
+        verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
+            renameDir, dfs);
+      }
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void verifyCallGetReturnValueMultipleTimes(
+      Map<Integer, Future<Void>> returnFutures, int count,
+      MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
+      throws InterruptedException, ExecutionException, IOException {
+    // wait for completing the calls
+    for (int i = 0; i < count; i++) {
+      returnFutures.get(i).get();
+    }
+
+    // Restart NN and check the rename successfully
+    cluster.restartNameNodes();
+
+    // very the src dir should not exist, dst should
+    for (int i = 0; i < count; i++) {
+      Path src = new Path(renameDir, "src" + i);
+      Path dst = new Path(renameDir, "dst" + i);
+      assertFalse(dfs.exists(src));
+      assertTrue(dfs.exists(dst));
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+      throws Exception {
+    internalTestConcurrentAsyncRenameWithOverwrite(100,
+        "testAggressiveConcurrentAsyncRenameWithOverwrite");
+  }
+
+  @Test(timeout = 60000)
+  public void testConservativeConcurrentAsyncRenameWithOverwrite()
+      throws Exception {
+    internalTestConcurrentAsyncRenameWithOverwrite(10000,
+        "testConservativeConcurrentAsyncRenameWithOverwrite");
+  }
+
+  private void internalTestConcurrentAsyncRenameWithOverwrite(
+      final int asyncCallLimit, final String basePath) throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
+    final Path renameDir = new Path(String.format("/test/%s/", basePath));
     Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        asyncCallLimit);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
         .build();
     cluster.waitActive();
     DistributedFileSystem dfs = cluster.getFileSystem();
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     int count = 1000;
+    long fileLen = blockSize * 3;
+    int start = 0, end = 0;
+    Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
-    try {
-      long fileLen = blockSize * 3;
-      assertTrue(dfs.mkdirs(renameDir));
-
-      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+    assertTrue(dfs.mkdirs(renameDir));
 
+    try {
       // concurrently invoking many rename
       for (int i = 0; i < count; i++) {
         Path src = new Path(renameDir, "src" + i);
         Path dst = new Path(renameDir, "dst" + i);
         DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
         DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
+        for (;;) {
+          try {
+            LOG.info("rename #" + i);
+            Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+            returnFutures.put(i, returnFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            /**
+             * reached limit of async calls, fetch results of finished async
+             * calls to let follow-on calls go
+             */
+            LOG.error(e);
+            start = end;
+            end = i;
+            LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+            waitForReturnValues(returnFutures, start, end);
+          }
+        }
       }
 
       // wait for completing the calls
-      for (int i = 0; i < count; i++) {
+      for (int i = start; i < count; i++) {
         returnFutures.get(i).get();
       }
 
@@ -215,26 +255,60 @@ public class TestAsyncDFSRename {
         assertTrue(dfs.exists(dst));
       }
     } finally {
-      dfs.delete(renameDir, true);
+      if (dfs != null) {
+        dfs.close();
+      }
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
 
-  @Test
+  private void waitForReturnValues(
+      final Map<Integer, Future<Void>> returnFutures, final int start,
+      final int end) throws InterruptedException, ExecutionException {
+    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+    for (int i = start; i < end; i++) {
+      LOG.info("calling Future#get #" + i);
+      returnFutures.get(i).get();
+    }
+  }
+
+  @Test(timeout = 60000)
   public void testAsyncRenameWithException() throws Exception {
-    FileSystem rootFs = FileSystem.get(CONF);
+    Configuration conf = new HdfsConfiguration();
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+    UserGroupInformation ugi1;
+
+    // explicitly turn on permission checking
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(user1, new String[] { group1, group2 });
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
+    // Initiate all four users
+    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+        group1, group2 });
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem rootFs = FileSystem.get(conf);
     final Path renameDir = new Path("/test/async_rename_exception/");
     final Path src = new Path(renameDir, "src");
     final Path dst = new Path(renameDir, "dst");
     rootFs.mkdirs(src);
 
-    AsyncDistributedFileSystem adfs = USER1
+    AsyncDistributedFileSystem adfs = ugi1
         .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
           @Override
           public AsyncDistributedFileSystem run() throws Exception {
-            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+            return cluster.getFileSystem().getAsyncDistributedFileSystem();
           }
         });
 
@@ -242,16 +316,24 @@ public class TestAsyncDFSRename {
       Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
       returnFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src);
+      checkPermissionDenied(e, src, user1);
+    } finally {
+      if (rootFs != null) {
+        rootFs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
-  private void checkPermissionDenied(final Exception e, final Path dir) {
+  private void checkPermissionDenied(final Exception e, final Path dir,
+      final String user) {
     assertTrue(e.getCause() instanceof ExecutionException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(USER1_NAME));
+        .getMessage().contains(user));
     assertTrue("Permission denied messages must carry the path parent", e
         .getMessage().contains(dir.getParent().toUri().getPath()));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org