You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/07 20:26:24 UTC

[32/50] [abbrv] hadoop git commit: Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou"

Revert "HADOOP-12957. Limit the number of outstanding async calls.  Contributed by Xiaobing Zhou"

This reverts commit 1b9f18623ab55507bea94888317c7d63d0f4a6f2.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d36b221
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d36b221
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d36b221

Branch: refs/heads/YARN-4757
Commit: 4d36b221a24e3b626bb91093b0bb0fd377061cae
Parents: f23d5df
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:09:18 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:09:18 2016 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |   3 -
 .../ipc/AsyncCallLimitExceededException.java    |  36 ---
 .../main/java/org/apache/hadoop/ipc/Client.java |  66 +----
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 199 ++--------------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  12 +-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 238 ++++++-------------
 6 files changed, 109 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d36b221/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 06614db..86e1b43 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -324,9 +324,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
     4*60*60; // 4 hours
   
-  public static final String  IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
-      "ipc.client.async.calls.max";
-  public static final int     IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
   public static final String  IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d36b221/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
deleted file mode 100644
index db97b6c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import java.io.IOException;
-
-/**
- * Signals that an AsyncCallLimitExceededException has occurred. This class is
- * used to make application code using async RPC aware that limit of max async
- * calls is reached, application code need to retrieve results from response of
- * established async calls to avoid buffer overflow in order for follow-on async
- * calls going correctly.
- */
-public class AsyncCallLimitExceededException extends IOException {
-  private static final long serialVersionUID = 1L;
-
-  public AsyncCallLimitExceededException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d36b221/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 9be4649..d59aeb89 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,9 +159,7 @@ public class Client implements AutoCloseable {
 
   private final boolean fallbackAllowed;
   private final byte[] clientId;
-  private final int maxAsyncCalls;
-  private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
-
+  
   /**
    * Executor on which IPC calls' parameters are sent.
    * Deferring the sending of parameters to a separate
@@ -1290,9 +1288,6 @@ public class Client implements AutoCloseable {
         CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.clientId = ClientId.getClientId();
     this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
-    this.maxAsyncCalls = conf.getInt(
-        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
   }
 
   /**
@@ -1359,20 +1354,6 @@ public class Client implements AutoCloseable {
       fallbackToSimpleAuth);
   }
 
-  private void checkAsyncCall() throws IOException {
-    if (isAsynchronousMode()) {
-      if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
-        asyncCallCounter.decrementAndGet();
-        String errMsg = String.format(
-            "Exceeded limit of max asynchronous calls: %d, " +
-            "please configure %s to adjust it.",
-            maxAsyncCalls,
-            CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
-        throw new AsyncCallLimitExceededException(errMsg);
-      }
-    }
-  }
-
   /**
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc response.
@@ -1393,38 +1374,24 @@ public class Client implements AutoCloseable {
     final Call call = createCall(rpcKind, rpcRequest);
     final Connection connection = getConnection(remoteId, call, serviceClass,
         fallbackToSimpleAuth);
-
     try {
-      checkAsyncCall();
-      try {
-        connection.sendRpcRequest(call);                 // send the rpc request
-      } catch (RejectedExecutionException e) {
-        throw new IOException("connection has been closed", e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.warn("interrupted waiting to send rpc request to server", e);
-        throw new IOException(e);
-      }
-    } catch(Exception e) {
-      if (isAsynchronousMode()) {
-        releaseAsyncCall();
-      }
-      throw e;
+      connection.sendRpcRequest(call);                 // send the rpc request
+    } catch (RejectedExecutionException e) {
+      throw new IOException("connection has been closed", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("interrupted waiting to send rpc request to server", e);
+      throw new IOException(e);
     }
 
     if (isAsynchronousMode()) {
       Future<Writable> returnFuture = new AbstractFuture<Writable>() {
-        private final AtomicBoolean callled = new AtomicBoolean(false);
         @Override
         public Writable get() throws InterruptedException, ExecutionException {
-          if (callled.compareAndSet(false, true)) {
-            try {
-              set(getRpcResponse(call, connection));
-            } catch (IOException ie) {
-              setException(ie);
-            } finally {
-              releaseAsyncCall();
-            }
+          try {
+            set(getRpcResponse(call, connection));
+          } catch (IOException ie) {
+            setException(ie);
           }
           return super.get();
         }
@@ -1460,15 +1427,6 @@ public class Client implements AutoCloseable {
     asynchronousMode.set(async);
   }
 
-  private void releaseAsyncCall() {
-    asyncCallCounter.decrementAndGet();
-  }
-
-  @VisibleForTesting
-  int getAsyncCallCount() {
-    return asyncCallCounter.get();
-  }
-
   private Writable getRpcResponse(final Call call, final Connection connection)
       throws IOException {
     synchronized (call) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d36b221/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c..6cf75c7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -35,7 +34,6 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -56,13 +54,12 @@ public class TestAsyncIPC {
   @Before
   public void setupConf() {
     conf = new Configuration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
     Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
     // set asynchronous mode for main thread
     Client.setAsynchronousMode(true);
   }
 
-  static class AsyncCaller extends Thread {
+  protected static class SerialCaller extends Thread {
     private Client client;
     private InetSocketAddress server;
     private int count;
@@ -71,11 +68,11 @@ public class TestAsyncIPC {
         new HashMap<Integer, Future<LongWritable>>();
     Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
 
-    public AsyncCaller(Client client, InetSocketAddress server, int count) {
+    public SerialCaller(Client client, InetSocketAddress server, int count) {
       this.client = client;
       this.server = server;
       this.count = count;
-      // set asynchronous mode, since AsyncCaller extends Thread
+      // set asynchronous mode, since SerialCaller extends Thread
       Client.setAsynchronousMode(true);
     }
 
@@ -110,111 +107,14 @@ public class TestAsyncIPC {
     }
   }
 
-  static class AsyncLimitlCaller extends Thread {
-    private Client client;
-    private InetSocketAddress server;
-    private int count;
-    private boolean failed;
-    Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
-    Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
-    int start = 0, end = 0;
-
-    int getStart() {
-      return start;
-    }
-
-    int getEnd() {
-      return end;
-    }
-
-    int getCount() {
-      return count;
-    }
-
-    public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
-      this(0, client, server, count);
-    }
-
-    final int callerId;
-
-    public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
-        int count) {
-      this.client = client;
-      this.server = server;
-      this.count = count;
-      // set asynchronous mode, since AsyncLimitlCaller extends Thread
-      Client.setAsynchronousMode(true);
-      this.callerId = callerId;
-    }
-
-    @Override
-    public void run() {
-      // in case Thread#Start is called, which will spawn new thread
-      Client.setAsynchronousMode(true);
-      for (int i = 0; i < count; i++) {
-        try {
-          final long param = TestIPC.RANDOM.nextLong();
-          runCall(i, param);
-        } catch (Exception e) {
-          LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
-              StringUtils.stringifyException(e)));
-          failed = true;
-        }
-      }
-    }
-
-    private void runCall(final int idx, final long param)
-        throws InterruptedException, ExecutionException, IOException {
-      for (;;) {
-        try {
-          doCall(idx, param);
-          return;
-        } catch (AsyncCallLimitExceededException e) {
-          /**
-           * reached limit of async calls, fetch results of finished async calls
-           * to let follow-on calls go
-           */
-          start = end;
-          end = idx;
-          waitForReturnValues(start, end);
-        }
-      }
-    }
-
-    private void doCall(final int idx, final long param) throws IOException {
-      TestIPC.call(client, param, server, conf);
-      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
-      returnFutures.put(idx, returnFuture);
-      expectedValues.put(idx, param);
-    }
-
-    private void waitForReturnValues(final int start, final int end)
-        throws InterruptedException, ExecutionException {
-      for (int i = start; i < end; i++) {
-        LongWritable value = returnFutures.get(i).get();
-        if (expectedValues.get(i) != value.get()) {
-          LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
-          failed = true;
-          break;
-        }
-      }
-    }
-  }
-
-  @Test(timeout = 60000)
-  public void testAsyncCall() throws IOException, InterruptedException,
+  @Test
+  public void testSerial() throws IOException, InterruptedException,
       ExecutionException {
-    internalTestAsyncCall(3, false, 2, 5, 100);
-    internalTestAsyncCall(3, true, 2, 5, 10);
+    internalTestSerial(3, false, 2, 5, 100);
+    internalTestSerial(3, true, 2, 5, 10);
   }
 
-  @Test(timeout = 60000)
-  public void testAsyncCallLimit() throws IOException,
-      InterruptedException, ExecutionException {
-    internalTestAsyncCallLimit(100, false, 5, 10, 500);
-  }
-
-  public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
+  public void internalTestSerial(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
     Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -226,9 +126,9 @@ public class TestAsyncIPC {
       clients[i] = new Client(LongWritable.class, conf);
     }
 
-    AsyncCaller[] callers = new AsyncCaller[callerCount];
+    SerialCaller[] callers = new SerialCaller[callerCount];
     for (int i = 0; i < callerCount; i++) {
-      callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
+      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
       callers[i].start();
     }
     for (int i = 0; i < callerCount; i++) {
@@ -244,75 +144,6 @@ public class TestAsyncIPC {
     server.stop();
   }
 
-  @Test(timeout = 60000)
-  public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
-      InterruptedException, ExecutionException {
-    int handlerCount = 10, callCount = 100;
-    Server server = new TestIPC.TestServer(handlerCount, false, conf);
-    InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
-    final Client client = new Client(LongWritable.class, conf);
-
-    int asyncCallCount = client.getAsyncCallCount();
-
-    try {
-      AsyncCaller caller = new AsyncCaller(client, addr, callCount);
-      caller.run();
-
-      caller.waitForReturnValues();
-      String msg = String.format(
-          "First time, expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
-
-      caller.waitForReturnValues();
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
-      msg = String.format("Second time, expected not failed for caller: %s.",
-          caller);
-      assertFalse(msg, caller.failed);
-
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
-    } finally {
-      client.stop();
-      server.stop();
-    }
-  }
-
-  public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
-      int clientCount, int callerCount, int callCount) throws IOException,
-      InterruptedException, ExecutionException {
-    Configuration conf = new Configuration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
-    Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
-
-    Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
-    InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
-
-    Client[] clients = new Client[clientCount];
-    for (int i = 0; i < clientCount; i++) {
-      clients[i] = new Client(LongWritable.class, conf);
-    }
-
-    AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
-    for (int i = 0; i < callerCount; i++) {
-      callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
-          callCount);
-      callers[i].start();
-    }
-    for (int i = 0; i < callerCount; i++) {
-      callers[i].join();
-      callers[i].waitForReturnValues(callers[i].getStart(),
-          callers[i].getCount());
-      String msg = String.format("Expected not failed for caller-%d: %s.", i,
-          callers[i]);
-      assertFalse(msg, callers[i].failed);
-    }
-    for (int i = 0; i < clientCount; i++) {
-      clients[i].stop();
-    }
-    server.stop();
-  }
-
   /**
    * Test if (1) the rpc server uses the call id/retry provided by the rpc
    * client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -365,7 +196,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 4);
+      final SerialCaller caller = new SerialCaller(client, addr, 4);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -404,7 +235,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -441,7 +272,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -482,9 +313,9 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      AsyncCaller[] callers = new AsyncCaller[callerCount];
+      SerialCaller[] callers = new SerialCaller[callerCount];
       for (int i = 0; i < callerCount; ++i) {
-        callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
+        callers[i] = new SerialCaller(client, addr, perCallerCallCount);
         callers[i].start();
       }
       for (int i = 0; i < callerCount; ++i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d36b221/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 356ae3f..37899aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
@@ -51,14 +50,11 @@ public class AsyncDistributedFileSystem {
     final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
         .getReturnValueCallback();
     Future<T> returnFuture = new AbstractFuture<T>() {
-      private final AtomicBoolean called = new AtomicBoolean(false);
       public T get() throws InterruptedException, ExecutionException {
-        if (called.compareAndSet(false, true)) {
-          try {
-            set(returnValueCallback.call());
-          } catch (Exception e) {
-            setException(e);
-          }
+        try {
+          set(returnValueCallback.call());
+        } catch (Exception e) {
+          setException(e);
         }
         return super.get();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d36b221/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index d129299..9322e1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
@@ -30,25 +31,80 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
+  final Path asyncRenameDir = new Path("/test/async_rename/");
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+  final private static Configuration CONF = new HdfsConfiguration();
+
+  final private static String GROUP1_NAME = "group1";
+  final private static String GROUP2_NAME = "group2";
+  final private static String USER1_NAME = "user1";
+  private static final UserGroupInformation USER1;
+
+  private MiniDFSCluster gCluster;
+
+  static {
+    // explicitly turn on permission checking
+    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+    // Initiate all four users
+    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+        GROUP1_NAME, GROUP2_NAME });
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+    gCluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (gCluster != null) {
+      gCluster.shutdown();
+      gCluster = null;
+    }
+  }
+
+  static int countLease(MiniDFSCluster cluster) {
+    return TestDFSRename.countLease(cluster);
+  }
+
+  void list(DistributedFileSystem dfs, String name) throws IOException {
+    FileSystem.LOG.info("\n\n" + name);
+    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+      FileSystem.LOG.info("" + s.getPath());
+    }
+  }
+
+  static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+    DataOutputStream a_out = dfs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
 
   /**
    * Check the blocks of dst file are cleaned after rename with overwrite
    * Restart NN to check the rename successfully
    */
-  @Test(timeout = 60000)
+  @Test
   public void testAsyncRenameWithOverwrite() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
@@ -113,134 +169,38 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test(timeout = 60000)
-  public void testCallGetReturnValueMultipleTimes() throws Exception {
+  @Test
+  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
     final Path renameDir = new Path(
-        "/test/testCallGetReturnValueMultipleTimes/");
-    final Configuration conf = new HdfsConfiguration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(2).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-    final int count = 100;
-    long fileLen = blockSize * 3;
-    final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
-    assertTrue(dfs.mkdirs(renameDir));
-
-    try {
-      // concurrently invoking many rename
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
-        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
-      }
-
-      for (int i = 0; i < 5; i++) {
-        verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
-            renameDir, dfs);
-      }
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  private void verifyCallGetReturnValueMultipleTimes(
-      Map<Integer, Future<Void>> returnFutures, int count,
-      MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
-      throws InterruptedException, ExecutionException, IOException {
-    // wait for completing the calls
-    for (int i = 0; i < count; i++) {
-      returnFutures.get(i).get();
-    }
-
-    // Restart NN and check the rename successfully
-    cluster.restartNameNodes();
-
-    // very the src dir should not exist, dst should
-    for (int i = 0; i < count; i++) {
-      Path src = new Path(renameDir, "src" + i);
-      Path dst = new Path(renameDir, "dst" + i);
-      assertFalse(dfs.exists(src));
-      assertTrue(dfs.exists(dst));
-    }
-  }
-
-  @Test(timeout = 120000)
-  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
-      throws Exception {
-    internalTestConcurrentAsyncRenameWithOverwrite(100,
-        "testAggressiveConcurrentAsyncRenameWithOverwrite");
-  }
-
-  @Test(timeout = 60000)
-  public void testConservativeConcurrentAsyncRenameWithOverwrite()
-      throws Exception {
-    internalTestConcurrentAsyncRenameWithOverwrite(10000,
-        "testConservativeConcurrentAsyncRenameWithOverwrite");
-  }
-
-  private void internalTestConcurrentAsyncRenameWithOverwrite(
-      final int asyncCallLimit, final String basePath) throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
-    final Path renameDir = new Path(String.format("/test/%s/", basePath));
+        "/test/concurrent_reanme_with_overwrite_dir/");
     Configuration conf = new HdfsConfiguration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        asyncCallLimit);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
         .build();
     cluster.waitActive();
     DistributedFileSystem dfs = cluster.getFileSystem();
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     int count = 1000;
-    long fileLen = blockSize * 3;
-    int start = 0, end = 0;
-    Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
-    assertTrue(dfs.mkdirs(renameDir));
 
     try {
+      long fileLen = blockSize * 3;
+      assertTrue(dfs.mkdirs(renameDir));
+
+      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
       // concurrently invoking many rename
       for (int i = 0; i < count; i++) {
         Path src = new Path(renameDir, "src" + i);
         Path dst = new Path(renameDir, "dst" + i);
         DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
         DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        for (;;) {
-          try {
-            LOG.info("rename #" + i);
-            Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-            returnFutures.put(i, returnFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            /**
-             * reached limit of async calls, fetch results of finished async
-             * calls to let follow-on calls go
-             */
-            LOG.error(e);
-            start = end;
-            end = i;
-            LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
-            waitForReturnValues(returnFutures, start, end);
-          }
-        }
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
       }
 
       // wait for completing the calls
-      for (int i = start; i < count; i++) {
+      for (int i = 0; i < count; i++) {
         returnFutures.get(i).get();
       }
 
@@ -255,60 +215,26 @@ public class TestAsyncDFSRename {
         assertTrue(dfs.exists(dst));
       }
     } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
+      dfs.delete(renameDir, true);
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
 
-  private void waitForReturnValues(
-      final Map<Integer, Future<Void>> returnFutures, final int start,
-      final int end) throws InterruptedException, ExecutionException {
-    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
-    for (int i = start; i < end; i++) {
-      LOG.info("calling Future#get #" + i);
-      returnFutures.get(i).get();
-    }
-  }
-
-  @Test(timeout = 60000)
+  @Test
   public void testAsyncRenameWithException() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-    UserGroupInformation ugi1;
-
-    // explicitly turn on permission checking
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(user1, new String[] { group1, group2 });
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
-
-    // Initiate all four users
-    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
-        group1, group2 });
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(3).build();
-    cluster.waitActive();
-
-    FileSystem rootFs = FileSystem.get(conf);
+    FileSystem rootFs = FileSystem.get(CONF);
     final Path renameDir = new Path("/test/async_rename_exception/");
     final Path src = new Path(renameDir, "src");
     final Path dst = new Path(renameDir, "dst");
     rootFs.mkdirs(src);
 
-    AsyncDistributedFileSystem adfs = ugi1
+    AsyncDistributedFileSystem adfs = USER1
         .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
           @Override
           public AsyncDistributedFileSystem run() throws Exception {
-            return cluster.getFileSystem().getAsyncDistributedFileSystem();
+            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
           }
         });
 
@@ -316,24 +242,16 @@ public class TestAsyncDFSRename {
       Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
       returnFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-    } finally {
-      if (rootFs != null) {
-        rootFs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      checkPermissionDenied(e, src);
     }
   }
 
-  private void checkPermissionDenied(final Exception e, final Path dir,
-      final String user) {
+  private void checkPermissionDenied(final Exception e, final Path dir) {
     assertTrue(e.getCause() instanceof ExecutionException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(user));
+        .getMessage().contains(USER1_NAME));
     assertTrue("Permission denied messages must carry the path parent", e
         .getMessage().contains(dir.getParent().toUri().getPath()));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org