You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/06/04 01:22:52 UTC
[1/8] hadoop git commit: Revert "HDFS-10430. Reuse FileSystem#access
in TestAsyncDFS. Contributed by Xiaobing Zhou."
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 6f6911341 -> d740a9026
Revert "HDFS-10430. Reuse FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou."
This reverts commit de69b1da34ed419946b56f63efa82a6162788df9.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1763113
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1763113
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1763113
Branch: refs/heads/branch-2.8
Commit: f176311366eab0364458d055f248fb5796bdec23
Parents: 886e239
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:30 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:30 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 36 +++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1763113/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index c7615a9..ddcf492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,16 +46,19 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -441,7 +445,7 @@ public class TestAsyncDFS {
for (int i = 0; i < NUM_TESTS; i++) {
assertTrue(fs.exists(dsts[i]));
FsPermission fsPerm = new FsPermission(permissions[i]);
- fs.access(dsts[i], fsPerm.getUserAction());
+ checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
}
// test setOwner
@@ -470,4 +474,34 @@ public class TestAsyncDFS {
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
}
}
+
+ static void checkAccessPermissions(FileStatus stat, FsAction mode)
+ throws IOException {
+ checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+ }
+
+ static void checkAccessPermissions(final UserGroupInformation ugi,
+ FileStatus stat, FsAction mode) throws IOException {
+ FsPermission perm = stat.getPermission();
+ String user = ugi.getShortUserName();
+ List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+ if (user.equals(stat.getOwner())) {
+ if (perm.getUserAction().implies(mode)) {
+ return;
+ }
+ } else if (groups.contains(stat.getGroup())) {
+ if (perm.getGroupAction().implies(mode)) {
+ return;
+ }
+ } else {
+ if (perm.getOtherAction().implies(mode)) {
+ return;
+ }
+ }
+ throw new AccessControlException(String.format(
+ "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+ .getPath(), stat.getOwner(), stat.getGroup(),
+ stat.isDirectory() ? "d" : "-", perm));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[4/8] hadoop git commit: Revert "HDFS-10431 Refactor and speedup
TestAsyncDFSRename. Contributed by Xiaobing Zhou"
Posted by wa...@apache.org.
Revert "HDFS-10431 Refactor and speedup TestAsyncDFSRename. Contributed by Xiaobing Zhou"
This reverts commit 2e755a7f0ebb05d7f79ec30aba2d6b288936dacc.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0af02cfd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0af02cfd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0af02cfd
Branch: refs/heads/branch-2.8
Commit: 0af02cfd37a564b4a0701dbc6d562fe237565bd5
Parents: f176311
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:31 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:31 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 233 +-------
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 563 +++++++++++++++----
2 files changed, 483 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af02cfd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index ddcf492..67262dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -29,16 +29,13 @@ import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -46,21 +43,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -72,28 +63,21 @@ import com.google.common.collect.Lists;
* */
public class TestAsyncDFS {
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
- private final short replFactor = 1;
- private final long blockSize = 512;
- private long fileLen = blockSize * 3;
- private final long seed = Time.now();
- private final Random r = new Random(seed);
- private final PermissionGenerator permGenerator = new PermissionGenerator(r);
- private static final int NUM_TESTS = 50;
+ private static final int NUM_TESTS = 1000;
private static final int NUM_NN_HANDLER = 10;
- private static final int ASYNC_CALL_LIMIT = 1000;
+ private static final int ASYNC_CALL_LIMIT = 100;
private Configuration conf;
private MiniDFSCluster cluster;
private FileSystem fs;
- private AsyncDistributedFileSystem adfs;
@Before
public void setup() throws IOException {
conf = new HdfsConfiguration();
// explicitly turn on acl
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
- // explicitly turn on permission checking
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+ // explicitly turn on ACL
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
// set the limit of max async calls
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
ASYNC_CALL_LIMIT);
@@ -102,7 +86,6 @@ public class TestAsyncDFS {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
fs = FileSystem.get(conf);
- adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
}
@After
@@ -147,9 +130,13 @@ public class TestAsyncDFS {
final String basePath = "testBatchAsyncAcl";
final Path parent = new Path(String.format("/test/%s/", basePath));
+ AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+ .getAsyncDistributedFileSystem();
+
// prepare test
- final Path[] paths = new Path[NUM_TESTS];
- for (int i = 0; i < NUM_TESTS; i++) {
+ int count = NUM_TESTS;
+ final Path[] paths = new Path[count];
+ for (int i = 0; i < count; i++) {
paths[i] = new Path(parent, "acl" + i);
FileSystem.mkdirs(fs, paths[i],
FsPermission.createImmutable((short) 0750));
@@ -166,7 +153,7 @@ public class TestAsyncDFS {
int start = 0, end = 0;
try {
// test setAcl
- for (int i = 0; i < NUM_TESTS; i++) {
+ for (int i = 0; i < count; i++) {
for (;;) {
try {
Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
@@ -179,12 +166,12 @@ public class TestAsyncDFS {
}
}
}
- waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS);
+ waitForAclReturnValues(setAclRetFutures, end, count);
// test getAclStatus
start = 0;
end = 0;
- for (int i = 0; i < NUM_TESTS; i++) {
+ for (int i = 0; i < count; i++) {
for (;;) {
try {
Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
@@ -198,23 +185,13 @@ public class TestAsyncDFS {
}
}
}
- waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths,
+ waitForAclReturnValues(getAclRetFutures, end, count, paths,
expectedAclSpec);
} catch (Exception e) {
throw e;
}
}
- static void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
- final int start, final int end)
- throws InterruptedException, ExecutionException {
- LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
- for (int i = start; i < end; i++) {
- LOG.info("calling Future#get #" + i);
- retFutures.get(i).get();
- }
- }
-
private void waitForAclReturnValues(
final Map<Integer, Future<Void>> aclRetFutures, final int start,
final int end) throws InterruptedException, ExecutionException {
@@ -289,12 +266,9 @@ public class TestAsyncDFS {
final Path parent = new Path("/test/async_api_exception/");
final Path aclDir = new Path(parent, "aclDir");
- final Path src = new Path(parent, "src");
- final Path dst = new Path(parent, "dst");
- fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700));
- fs.mkdirs(src);
+ fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
- AsyncDistributedFileSystem adfs1 = ugi1
+ AsyncDistributedFileSystem adfs = ugi1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
@@ -303,36 +277,9 @@ public class TestAsyncDFS {
});
Future<Void> retFuture;
- // test rename
- try {
- retFuture = adfs1.rename(src, dst, Rename.OVERWRITE);
- retFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the path parent", e
- .getMessage().contains(src.getParent().toUri().getPath()));
- }
-
- // test setPermission
- FsPermission fsPerm = new FsPermission(permGenerator.next());
- try {
- retFuture = adfs1.setPermission(src, fsPerm);
- retFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- }
-
- // test setOwner
- try {
- retFuture = adfs1.setOwner(src, "user1", "group2");
- retFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- }
-
// test setAcl
try {
- retFuture = adfs1.setAcl(aclDir,
+ retFuture = adfs.setAcl(aclDir,
Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
retFuture.get();
fail("setAcl should fail with permission denied");
@@ -342,7 +289,7 @@ public class TestAsyncDFS {
// test getAclStatus
try {
- Future<AclStatus> aclRetFuture = adfs1.getAclStatus(aclDir);
+ Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
aclRetFuture.get();
fail("getAclStatus should fail with permission denied");
} catch (ExecutionException e) {
@@ -360,148 +307,4 @@ public class TestAsyncDFS {
assertTrue("Permission denied messages must carry the name of the path",
e.getMessage().contains(dir.getName()));
}
-
-
- @Test(timeout = 120000)
- public void testConcurrentAsyncAPI() throws Exception {
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
-
- // create fake mapping for the groups
- Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
- u2gMap.put(user1, new String[] {group1, group2});
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
- // prepare for test
- final Path parent = new Path(
- String.format("/test/%s/", "testConcurrentAsyncAPI"));
- final Path[] srcs = new Path[NUM_TESTS];
- final Path[] dsts = new Path[NUM_TESTS];
- short[] permissions = new short[NUM_TESTS];
- for (int i = 0; i < NUM_TESTS; i++) {
- srcs[i] = new Path(parent, "src" + i);
- dsts[i] = new Path(parent, "dst" + i);
- DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
- DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
- assertTrue(fs.exists(srcs[i]));
- assertTrue(fs.getFileStatus(srcs[i]).isFile());
- assertTrue(fs.exists(dsts[i]));
- assertTrue(fs.getFileStatus(dsts[i]).isFile());
- permissions[i] = permGenerator.next();
- }
-
- Map<Integer, Future<Void>> renameRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<Void>> permRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<Void>> ownerRetFutures =
- new HashMap<Integer, Future<Void>>();
- int start = 0, end = 0;
- // test rename
- for (int i = 0; i < NUM_TESTS; i++) {
- for (;;) {
- try {
- Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
- Rename.OVERWRITE);
- renameRetFutures.put(i, returnFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(renameRetFutures, start, end);
- }
- }
- }
-
- // wait for completing the calls
- waitForAclReturnValues(renameRetFutures, end, NUM_TESTS);
-
- // verify the src should not exist, dst should
- for (int i = 0; i < NUM_TESTS; i++) {
- assertFalse(fs.exists(srcs[i]));
- assertTrue(fs.exists(dsts[i]));
- }
-
- // test permissions
- for (int i = 0; i < NUM_TESTS; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setPermission(dsts[i],
- new FsPermission(permissions[i]));
- permRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(permRetFutures, start, end);
- }
- }
- }
- // wait for completing the calls
- waitForAclReturnValues(permRetFutures, end, NUM_TESTS);
-
- // verify the permission
- for (int i = 0; i < NUM_TESTS; i++) {
- assertTrue(fs.exists(dsts[i]));
- FsPermission fsPerm = new FsPermission(permissions[i]);
- checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
- }
-
- // test setOwner
- start = 0;
- end = 0;
- for (int i = 0; i < NUM_TESTS; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setOwner(dsts[i], "user1", "group2");
- ownerRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(ownerRetFutures, start, end);
- }
- }
- }
- // wait for completing the calls
- waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS);
-
- // verify the owner
- for (int i = 0; i < NUM_TESTS; i++) {
- assertTrue(fs.exists(dsts[i]));
- assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner()));
- assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
- }
- }
-
- static void checkAccessPermissions(FileStatus stat, FsAction mode)
- throws IOException {
- checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
- }
-
- static void checkAccessPermissions(final UserGroupInformation ugi,
- FileStatus stat, FsAction mode) throws IOException {
- FsPermission perm = stat.getPermission();
- String user = ugi.getShortUserName();
- List<String> groups = Arrays.asList(ugi.getGroupNames());
-
- if (user.equals(stat.getOwner())) {
- if (perm.getUserAction().implies(mode)) {
- return;
- }
- } else if (groups.contains(stat.getGroup())) {
- if (perm.getGroupAction().implies(mode)) {
- return;
- }
- } else {
- if (perm.getOtherAction().implies(mode)) {
- return;
- }
- }
- throw new AccessControlException(String.format(
- "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
- .getPath(), stat.getOwner(), stat.getGroup(),
- stat.isDirectory() ? "d" : "-", perm));
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af02cfd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 8d3e509..03c8151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -31,157 +34,521 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestAsyncDFSRename {
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- private final short replFactor = 1;
+ private final long seed = Time.now();
+ private final Random r = new Random(seed);
+ private final PermissionGenerator permGenerator = new PermissionGenerator(r);
+ private final short replFactor = 2;
private final long blockSize = 512;
private long fileLen = blockSize * 3;
- private static final int NUM_TESTS = 50;
- private static final int NUM_NN_HANDLER = 10;
- private static final int ASYNC_CALL_LIMIT = 1000;
-
- private Configuration conf;
- private MiniDFSCluster cluster;
- private FileSystem fs;
- private AsyncDistributedFileSystem adfs;
-
- @Before
- public void setup() throws IOException {
- conf = new HdfsConfiguration();
- // set the limit of max async calls
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- ASYNC_CALL_LIMIT);
- // set server handlers
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+ /**
+ * Check the blocks of dst file are cleaned after rename with overwrite
+ * Restart NN to check the rename successfully
+ */
+ @Test(timeout = 60000)
+ public void testAsyncRenameWithOverwrite() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ replFactor).build();
cluster.waitActive();
- fs = FileSystem.get(conf);
- adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
- }
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- @After
- public void tearDown() throws IOException {
- if (fs != null) {
- fs.close();
- fs = null;
- }
- if (cluster != null) {
- cluster.shutdown();
- cluster = null;
+ try {
+ String src = "/foo/src";
+ String dst = "/foo/dst";
+ String src2 = "/foo/src2";
+ String dst2 = "/foo/dst2";
+ Path srcPath = new Path(src);
+ Path dstPath = new Path(dst);
+ Path srcPath2 = new Path(src2);
+ Path dstPath2 = new Path(dst2);
+
+ DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
+
+ LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), dst, 0, fileLen);
+ LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), dst2, 0, fileLen);
+ BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
+ .getBlockManager();
+ assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) != null);
+ assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) != null);
+
+ Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+ Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
+ retVal1.get();
+ retVal2.get();
+
+ assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) == null);
+ assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) == null);
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+ assertFalse(dfs.exists(srcPath));
+ assertTrue(dfs.exists(dstPath));
+ assertFalse(dfs.exists(srcPath2));
+ assertTrue(dfs.exists(dstPath2));
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
@Test(timeout = 60000)
public void testCallGetReturnValueMultipleTimes() throws Exception {
- final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/");
- assertTrue(fs.mkdirs(parent));
+ final Path renameDir = new Path(
+ "/test/testCallGetReturnValueMultipleTimes/");
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ final int count = 100;
+ final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
- // prepare test
- final Path[] srcs = new Path[NUM_TESTS];
- final Path[] dsts = new Path[NUM_TESTS];
- for (int i = 0; i < NUM_TESTS; i++) {
- srcs[i] = new Path(parent, "src" + i);
- dsts[i] = new Path(parent, "dst" + i);
- DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
- DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
+ assertTrue(dfs.mkdirs(renameDir));
+
+ try {
+ // concurrently invoking many rename
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
+ renameDir, dfs);
+ }
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
+ }
- // concurrently invoking many rename
- final Map<Integer, Future<Void>> reFutures =
- new HashMap<Integer, Future<Void>>();
- for (int i = 0; i < NUM_TESTS; i++) {
- Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
- Rename.OVERWRITE);
- reFutures.put(i, retFuture);
+ private void verifyCallGetReturnValueMultipleTimes(
+ Map<Integer, Future<Void>> returnFutures, int count,
+ MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
+ throws InterruptedException, ExecutionException, IOException {
+ // wait for completing the calls
+ for (int i = 0; i < count; i++) {
+ returnFutures.get(i).get();
}
- assertEquals(NUM_TESTS, reFutures.size());
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
- for (int i = 0; i < 5; i++) {
- verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts);
+ // very the src dir should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ assertFalse(dfs.exists(src));
+ assertTrue(dfs.exists(dst));
}
}
- private void verifyCallGetReturnValueMultipleTimes(
- final Map<Integer, Future<Void>> reFutures, final Path[] srcs,
- final Path[] dsts)
- throws InterruptedException, ExecutionException, IOException {
+ @Test
+ public void testConservativeConcurrentAsyncRenameWithOverwrite()
+ throws Exception {
+ internalTestConcurrentAsyncRenameWithOverwrite(100,
+ "testAggressiveConcurrentAsyncRenameWithOverwrite");
+ }
- // wait for completing the calls
- waitForReturnValues(reFutures, 0, NUM_TESTS);
+ @Test(timeout = 60000)
+ public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+ throws Exception {
+ internalTestConcurrentAsyncRenameWithOverwrite(10000,
+ "testConservativeConcurrentAsyncRenameWithOverwrite");
+ }
+
+ private void internalTestConcurrentAsyncRenameWithOverwrite(
+ final int asyncCallLimit, final String basePath) throws Exception {
+ final Path renameDir = new Path(String.format("/test/%s/", basePath));
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ asyncCallLimit);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ int count = 1000;
+ int start = 0, end = 0;
+ Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+ assertTrue(dfs.mkdirs(renameDir));
+
+ try {
+ // concurrently invoking many rename
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+ for (;;) {
+ try {
+ LOG.info("rename #" + i);
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ /**
+ * reached limit of async calls, fetch results of finished async
+ * calls to let follow-on calls go
+ */
+ LOG.error(e);
+ start = end;
+ end = i;
+ LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+ waitForReturnValues(returnFutures, start, end);
+ }
+ }
+ }
- // verify the src dir should not exist, dst should
- verifyRenames(srcs, dsts);
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
+ returnFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src dir should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ assertFalse(dfs.exists(src));
+ assertTrue(dfs.exists(dst));
+ }
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void waitForReturnValues(
+ final Map<Integer, Future<Void>> returnFutures, final int start,
+ final int end) throws InterruptedException, ExecutionException {
+ LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+ for (int i = start; i < end; i++) {
+ LOG.info("calling Future#get #" + i);
+ returnFutures.get(i).get();
+ }
+ }
+
+ @Test
+ public void testConservativeConcurrentAsyncAPI() throws Exception {
+ internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
}
@Test(timeout = 60000)
- public void testConcurrentAsyncRename() throws Exception {
- final Path parent = new Path(
- String.format("/test/%s/", "testConcurrentAsyncRename"));
- assertTrue(fs.mkdirs(parent));
-
- // prepare test
- final Path[] srcs = new Path[NUM_TESTS];
- final Path[] dsts = new Path[NUM_TESTS];
- for (int i = 0; i < NUM_TESTS; i++) {
+ public void testAggressiveConcurrentAsyncAPI() throws Exception {
+ internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
+ }
+
+ private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
+ final String basePath) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+ int count = 500;
+
+ // explicitly turn on permission checking
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+ // set the limit of max async calls
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ asyncCallLimit);
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+ u2gMap.put(user1, new String[] {group1, group2});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+ // start mini cluster
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3).build();
+ cluster.waitActive();
+ AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+ .getAsyncDistributedFileSystem();
+
+ // prepare for test
+ FileSystem rootFs = FileSystem.get(conf);
+ final Path parent = new Path(String.format("/test/%s/", basePath));
+ final Path[] srcs = new Path[count];
+ final Path[] dsts = new Path[count];
+ short[] permissions = new short[count];
+ for (int i = 0; i < count; i++) {
srcs[i] = new Path(parent, "src" + i);
dsts[i] = new Path(parent, "dst" + i);
- DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
- DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
+ DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
+ DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
+ assertTrue(rootFs.exists(srcs[i]));
+ assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
+ assertTrue(rootFs.exists(dsts[i]));
+ assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
+ permissions[i] = permGenerator.next();
}
- // concurrently invoking many rename
- int start = 0, end = 0;
- Map<Integer, Future<Void>> retFutures =
+ Map<Integer, Future<Void>> renameRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<Void>> permRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<Void>> ownerRetFutures =
new HashMap<Integer, Future<Void>>();
- for (int i = 0; i < NUM_TESTS; i++) {
+ int start = 0, end = 0;
+ // test rename
+ for (int i = 0; i < count; i++) {
for (;;) {
try {
- LOG.info("rename #" + i);
- Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
+ Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
Rename.OVERWRITE);
- retFutures.put(i, retFuture);
+ renameRetFutures.put(i, returnFuture);
break;
} catch (AsyncCallLimitExceededException e) {
- /**
- * reached limit of async calls, fetch results of finished async calls
- * to let follow-on calls go
- */
- LOG.error(e);
start = end;
end = i;
- LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
- waitForReturnValues(retFutures, start, end);
+ waitForReturnValues(renameRetFutures, start, end);
}
}
}
// wait for completing the calls
- waitForReturnValues(retFutures, end, NUM_TESTS);
+ for (int i = start; i < count; i++) {
+ renameRetFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ assertFalse(rootFs.exists(srcs[i]));
+ assertTrue(rootFs.exists(dsts[i]));
+ }
+
+ // test permissions
+ try {
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setPermission(dsts[i],
+ new FsPermission(permissions[i]));
+ permRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(permRetFutures, start, end);
+ }
+ }
+ }
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
+ permRetFutures.get(i).get();
+ }
+
+ // Restart NN and check permission then
+ cluster.restartNameNodes();
+
+ // verify the permission
+ for (int i = 0; i < count; i++) {
+ assertTrue(rootFs.exists(dsts[i]));
+ FsPermission fsPerm = new FsPermission(permissions[i]);
+ checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
+ fsPerm.getUserAction());
+ }
+
+ // test setOwner
+ start = 0;
+ end = 0;
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
+ "group2");
+ ownerRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(ownerRetFutures, start, end);
+ }
+ }
+ }
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
+ ownerRetFutures.get(i).get();
+ }
- // verify the src dir should not exist, dst should
- verifyRenames(srcs, dsts);
+ // Restart NN and check owner then
+ cluster.restartNameNodes();
+
+ // verify the owner
+ for (int i = 0; i < count; i++) {
+ assertTrue(rootFs.exists(dsts[i]));
+ assertTrue(
+ "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
+ assertTrue(
+ "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
+ }
+ } catch (AccessControlException ace) {
+ throw ace;
+ } finally {
+ if (rootFs != null) {
+ rootFs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
}
- private void verifyRenames(final Path[] srcs, final Path[] dsts)
+ static void checkAccessPermissions(FileStatus stat, FsAction mode)
throws IOException {
- for (int i = 0; i < NUM_TESTS; i++) {
- assertFalse(fs.exists(srcs[i]));
- assertTrue(fs.exists(dsts[i]));
+ checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+ }
+
+ static void checkAccessPermissions(final UserGroupInformation ugi,
+ FileStatus stat, FsAction mode) throws IOException {
+ FsPermission perm = stat.getPermission();
+ String user = ugi.getShortUserName();
+ List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+ if (user.equals(stat.getOwner())) {
+ if (perm.getUserAction().implies(mode)) {
+ return;
+ }
+ } else if (groups.contains(stat.getGroup())) {
+ if (perm.getGroupAction().implies(mode)) {
+ return;
+ }
+ } else {
+ if (perm.getOtherAction().implies(mode)) {
+ return;
+ }
}
+ throw new AccessControlException(String.format(
+ "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+ .getPath(), stat.getOwner(), stat.getGroup(),
+ stat.isDirectory() ? "d" : "-", perm));
}
- void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
- final int start, final int end)
- throws InterruptedException, ExecutionException {
- TestAsyncDFS.waitForReturnValues(retFutures, start, end);
+ @Test(timeout = 60000)
+ public void testAsyncAPIWithException() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+ UserGroupInformation ugi1;
+
+ // explicitly turn on permission checking
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+ u2gMap.put(user1, new String[] {group1, group2});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+ // Initiate all four users
+ ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+ group1, group2 });
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3).build();
+ cluster.waitActive();
+
+ FileSystem rootFs = FileSystem.get(conf);
+ final Path renameDir = new Path("/test/async_api_exception/");
+ final Path src = new Path(renameDir, "src");
+ final Path dst = new Path(renameDir, "dst");
+ rootFs.mkdirs(src);
+
+ AsyncDistributedFileSystem adfs = ugi1
+ .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+ @Override
+ public AsyncDistributedFileSystem run() throws Exception {
+ return cluster.getFileSystem().getAsyncDistributedFileSystem();
+ }
+ });
+
+ Future<Void> retFuture;
+ try {
+ retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ retFuture.get();
+ } catch (ExecutionException e) {
+ TestAsyncDFS.checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the path parent", e
+ .getMessage().contains(src.getParent().toUri().getPath()));
+ }
+
+ FsPermission fsPerm = new FsPermission(permGenerator.next());
+ try {
+ retFuture = adfs.setPermission(src, fsPerm);
+ retFuture.get();
+ } catch (ExecutionException e) {
+ TestAsyncDFS.checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the name of the path",
+ e.getMessage().contains(src.getName()));
+ }
+
+ try {
+ retFuture = adfs.setOwner(src, "user1", "group2");
+ retFuture.get();
+ } catch (ExecutionException e) {
+ TestAsyncDFS.checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the name of the path",
+ e.getMessage().contains(src.getName()));
+ } finally {
+ if (rootFs != null) {
+ rootFs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/8] hadoop git commit: Revert "HADOOP-13226 Support async call
retry and failover."
Posted by wa...@apache.org.
Revert "HADOOP-13226 Support async call retry and failover."
This reverts commit a8941d7790b2209ac779c9372298b833ededd132.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/886e2396
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/886e2396
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/886e2396
Branch: refs/heads/branch-2.8
Commit: 886e2396062ef076b3ea4aa93395cc90287efd4f
Parents: 6f69113
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:30 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:30 2016 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 8 +-
.../hadoop/io/retry/AsyncCallHandler.java | 321 -------------------
.../org/apache/hadoop/io/retry/CallReturn.java | 75 -----
.../hadoop/io/retry/RetryInvocationHandler.java | 134 ++------
.../apache/hadoop/io/retry/RetryPolicies.java | 4 +-
.../main/java/org/apache/hadoop/ipc/Client.java | 25 +-
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 13 +-
.../apache/hadoop/util/concurrent/AsyncGet.java | 17 +-
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 11 +-
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 9 +-
.../ClientNamenodeProtocolTranslatorPB.java | 42 +--
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 43 ++-
.../apache/hadoop/hdfs/TestAsyncHDFSWithHA.java | 182 -----------
.../hdfs/server/namenode/ha/HATestUtil.java | 9 +-
14 files changed, 116 insertions(+), 777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index 4bf1762..4a8cbaf 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -363,13 +363,7 @@
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
- <!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
- <Match>
- <Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
- <Method name="wait" />
- <Bug pattern="WA_NOT_IN_LOOP" />
- </Match>
-
+ <!-- Synchronization performed on util.concurrent instance. -->
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="stop" />
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
deleted file mode 100644
index 5a03b03..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/** Handle async calls. */
-@InterfaceAudience.Private
-public class AsyncCallHandler {
- static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
-
- private static final ThreadLocal<AsyncGet<?, Exception>>
- LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
- private static final ThreadLocal<AsyncGet<Object, Throwable>>
- ASYNC_RETURN = new ThreadLocal<>();
-
- /** @return the async return value from {@link AsyncCallHandler}. */
- @InterfaceStability.Unstable
- @SuppressWarnings("unchecked")
- public static <R, T extends Throwable> AsyncGet<R, T> getAsyncReturn() {
- final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
- if (asyncGet != null) {
- ASYNC_RETURN.set(null);
- return asyncGet;
- } else {
- return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
- }
- }
-
- /** For the lower rpc layers to set the async return value. */
- @InterfaceStability.Unstable
- public static void setLowerLayerAsyncReturn(
- AsyncGet<?, Exception> asyncReturn) {
- LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
- }
-
- private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
- final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
- Preconditions.checkNotNull(asyncGet);
- LOWER_LAYER_ASYNC_RETURN.set(null);
- return asyncGet;
- }
-
- /** A simple concurrent queue which keeping track the empty start time. */
- static class ConcurrentQueue<T> {
- private final Queue<T> queue = new LinkedList<>();
- private long emptyStartTime = Time.monotonicNow();
-
- synchronized int size() {
- return queue.size();
- }
-
- /** Is the queue empty for more than the given time in millisecond? */
- synchronized boolean isEmpty(long time) {
- return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
- }
-
- synchronized void offer(T c) {
- final boolean added = queue.offer(c);
- Preconditions.checkState(added);
- }
-
- synchronized T poll() {
- Preconditions.checkState(!queue.isEmpty());
- final T t = queue.poll();
- if (queue.isEmpty()) {
- emptyStartTime = Time.monotonicNow();
- }
- return t;
- }
- }
-
- /** A queue for handling async calls. */
- static class AsyncCallQueue {
- private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
- private final Processor processor = new Processor();
-
- void addCall(AsyncCall call) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("add " + call);
- }
- queue.offer(call);
- processor.tryStart();
- }
-
- void checkCalls() {
- final int size = queue.size();
- for (int i = 0; i < size; i++) {
- final AsyncCall c = queue.poll();
- if (!c.isDone()) {
- queue.offer(c); // the call is not done yet, add it back.
- }
- }
- }
-
- /** Process the async calls in the queue. */
- private class Processor {
- static final long GRACE_PERIOD = 10*1000L;
- static final long SLEEP_PERIOD = 100L;
-
- private final AtomicReference<Thread> running = new AtomicReference<>();
-
- boolean isRunning(Daemon d) {
- return d == running.get();
- }
-
- void tryStart() {
- final Thread current = Thread.currentThread();
- if (running.compareAndSet(null, current)) {
- final Daemon daemon = new Daemon() {
- @Override
- public void run() {
- for (; isRunning(this);) {
- try {
- Thread.sleep(SLEEP_PERIOD);
- } catch (InterruptedException e) {
- kill(this);
- return;
- }
-
- checkCalls();
- tryStop(this);
- }
- }
- };
-
- final boolean set = running.compareAndSet(current, daemon);
- Preconditions.checkState(set);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
- }
- daemon.start();
- }
- }
-
- void tryStop(Daemon d) {
- if (queue.isEmpty(GRACE_PERIOD)) {
- kill(d);
- }
- }
-
- void kill(Daemon d) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Killing " + d);
- }
- final boolean set = running.compareAndSet(d, null);
- Preconditions.checkState(set);
- }
- }
- }
-
- static class AsyncValue<V> {
- private V value;
-
- synchronized V waitAsyncValue(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException {
- if (value != null) {
- return value;
- }
- AsyncGet.Util.wait(this, timeout, unit);
- if (value != null) {
- return value;
- }
-
- throw new TimeoutException("waitCallReturn timed out "
- + timeout + " " + unit);
- }
-
- synchronized void set(V v) {
- Preconditions.checkNotNull(v);
- Preconditions.checkState(value == null);
- value = v;
- notify();
- }
-
- synchronized boolean isDone() {
- return value != null;
- }
- }
-
- static class AsyncCall extends RetryInvocationHandler.Call {
- private final AsyncCallHandler asyncCallHandler;
-
- private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
- private AsyncGet<?, Exception> lowerLayerAsyncGet;
-
- AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
- RetryInvocationHandler.Counters counters,
- RetryInvocationHandler<?> retryInvocationHandler,
- AsyncCallHandler asyncCallHandler) {
- super(method, args, isRpc, callId, counters, retryInvocationHandler);
-
- this.asyncCallHandler = asyncCallHandler;
- }
-
- /** @return true if the call is done; otherwise, return false. */
- boolean isDone() {
- final CallReturn r = invokeOnce();
- switch (r.getState()) {
- case RETURNED:
- case EXCEPTION:
- asyncCallReturn.set(r); // the async call is done
- return true;
- case RETRY:
- invokeOnce();
- break;
- case ASYNC_CALL_IN_PROGRESS:
- case ASYNC_INVOKED:
- // nothing to do
- break;
- default:
- Preconditions.checkState(false);
- }
- return false;
- }
-
- @Override
- CallReturn invoke() throws Throwable {
- LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
- if (lowerLayerAsyncGet != null) {
- // async call was submitted early, check the lower level async call
- final boolean isDone = lowerLayerAsyncGet.isDone();
- LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
- if (!isDone) {
- return CallReturn.ASYNC_CALL_IN_PROGRESS;
- }
- try {
- return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
- } finally {
- lowerLayerAsyncGet = null;
- }
- }
-
- // submit a new async call
- LOG.trace("invoke: ASYNC_INVOKED");
- final boolean mode = Client.isAsynchronousMode();
- try {
- Client.setAsynchronousMode(true);
- final Object r = invokeMethod();
- // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
- Preconditions.checkState(r == null);
- lowerLayerAsyncGet = getLowerLayerAsyncReturn();
-
- if (counters.isZeros()) {
- // first async attempt, initialize
- LOG.trace("invoke: initAsyncCall");
- asyncCallHandler.initAsyncCall(this, asyncCallReturn);
- }
- return CallReturn.ASYNC_INVOKED;
- } finally {
- Client.setAsynchronousMode(mode);
- }
- }
- }
-
- private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
- private volatile boolean hasSuccessfulCall = false;
-
- AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
- int callId, RetryInvocationHandler.Counters counters,
- RetryInvocationHandler<?> retryInvocationHandler) {
- return new AsyncCall(method, args, isRpc, callId, counters,
- retryInvocationHandler, this);
- }
-
- boolean hasSuccessfulCall() {
- return hasSuccessfulCall;
- }
-
- private void initAsyncCall(final AsyncCall asyncCall,
- final AsyncValue<CallReturn> asyncCallReturn) {
- asyncCalls.addCall(asyncCall);
-
- final AsyncGet<Object, Throwable> asyncGet
- = new AsyncGet<Object, Throwable>() {
- @Override
- public Object get(long timeout, TimeUnit unit) throws Throwable {
- final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
- final Object r = c.getReturnValue();
- hasSuccessfulCall = true;
- return r;
- }
-
- @Override
- public boolean isDone() {
- return asyncCallReturn.isDone();
- }
- };
- ASYNC_RETURN.set(asyncGet);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
deleted file mode 100644
index 943725c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-
-/** The call return from a method invocation. */
-class CallReturn {
- /** The return state. */
- enum State {
- /** Call is returned successfully. */
- RETURNED,
- /** Call throws an exception. */
- EXCEPTION,
- /** Call should be retried according to the {@link RetryPolicy}. */
- RETRY,
- /** Call, which is async, is still in progress. */
- ASYNC_CALL_IN_PROGRESS,
- /** Call, which is async, just has been invoked. */
- ASYNC_INVOKED
- }
-
- static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
- State.ASYNC_CALL_IN_PROGRESS);
- static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
- static final CallReturn RETRY = new CallReturn(State.RETRY);
-
- private final Object returnValue;
- private final Throwable thrown;
- private final State state;
-
- CallReturn(Object r) {
- this(r, null, State.RETURNED);
- }
- CallReturn(Throwable t) {
- this(null, t, State.EXCEPTION);
- Preconditions.checkNotNull(t);
- }
- private CallReturn(State s) {
- this(null, null, s);
- }
- private CallReturn(Object r, Throwable t, State s) {
- Preconditions.checkArgument(r == null || t == null);
- returnValue = r;
- thrown = t;
- state = s;
- }
-
- State getState() {
- return state;
- }
-
- Object getReturnValue() throws Throwable {
- if (state == State.EXCEPTION) {
- throw thrown;
- }
- Preconditions.checkState(state == State.RETURNED, "state == %s", state);
- return returnValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index f2b2c99..300d0c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -42,83 +42,11 @@ import java.util.Map;
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
- static class Call {
- private final Method method;
- private final Object[] args;
- private final boolean isRpc;
- private final int callId;
- final Counters counters;
-
- private final RetryPolicy retryPolicy;
- private final RetryInvocationHandler<?> retryInvocationHandler;
-
- Call(Method method, Object[] args, boolean isRpc, int callId,
- Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
- this.method = method;
- this.args = args;
- this.isRpc = isRpc;
- this.callId = callId;
- this.counters = counters;
-
- this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
- this.retryInvocationHandler = retryInvocationHandler;
- }
-
- /** Invoke the call once without retrying. */
- synchronized CallReturn invokeOnce() {
- try {
- // The number of times this invocation handler has ever been failed over
- // before this method invocation attempt. Used to prevent concurrent
- // failed method invocations from triggering multiple failover attempts.
- final long failoverCount = retryInvocationHandler.getFailoverCount();
- try {
- return invoke();
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this, e);
- }
- if (Thread.currentThread().isInterrupted()) {
- // If interrupted, do not retry.
- throw e;
- }
- retryInvocationHandler.handleException(
- method, retryPolicy, failoverCount, counters, e);
- return CallReturn.RETRY;
- }
- } catch(Throwable t) {
- return new CallReturn(t);
- }
- }
-
- CallReturn invoke() throws Throwable {
- return new CallReturn(invokeMethod());
- }
-
- Object invokeMethod() throws Throwable {
- if (isRpc) {
- Client.setCallIdAndRetryCount(callId, counters.retries);
- }
- return retryInvocationHandler.invokeMethod(method, args);
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "#" + callId + ": "
- + method.getDeclaringClass().getSimpleName() + "." + method.getName()
- + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
- + ")";
- }
- }
-
- static class Counters {
+ private static class Counters {
/** Counter for retries. */
private int retries;
/** Counter for method invocation has been failed over. */
private int failovers;
-
- boolean isZeros() {
- return retries == 0 && failovers == 0;
- }
}
private static class ProxyDescriptor<T> {
@@ -216,13 +144,11 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
private final ProxyDescriptor<T> proxyDescriptor;
- private volatile boolean hasSuccessfulCall = false;
-
+ private volatile boolean hasMadeASuccessfulCall = false;
+
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
- private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
-
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) {
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@@ -241,35 +167,38 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return policy != null? policy: defaultPolicy;
}
- private long getFailoverCount() {
- return proxyDescriptor.getFailoverCount();
- }
-
- private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
- Counters counters) {
- if (Client.isAsynchronousMode()) {
- return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
- counters, this);
- } else {
- return new Call(method, args, isRpc, callId, counters, this);
- }
- }
-
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
- final Counters counters = new Counters();
+ return invoke(method, args, isRpc, callId, new Counters());
+ }
+
+ private Object invoke(final Method method, final Object[] args,
+ final boolean isRpc, final int callId, final Counters counters)
+ throws Throwable {
+ final RetryPolicy policy = getRetryPolicy(method);
- final Call call = newCall(method, args, isRpc, callId, counters);
while (true) {
- final CallReturn c = call.invokeOnce();
- final CallReturn.State state = c.getState();
- if (state == CallReturn.State.ASYNC_INVOKED) {
- return null; // return null for async calls
- } else if (c.getState() != CallReturn.State.RETRY) {
- return c.getReturnValue();
+ // The number of times this invocation handler has ever been failed over,
+ // before this method invocation attempt. Used to prevent concurrent
+ // failed method invocations from triggering multiple failover attempts.
+ final long failoverCount = proxyDescriptor.getFailoverCount();
+
+ if (isRpc) {
+ Client.setCallIdAndRetryCount(callId, counters.retries);
+ }
+ try {
+ final Object ret = invokeMethod(method, args);
+ hasMadeASuccessfulCall = true;
+ return ret;
+ } catch (Exception ex) {
+ if (Thread.currentThread().isInterrupted()) {
+ // If interrupted, do not retry.
+ throw ex;
+ }
+ handleException(method, policy, failoverCount, counters, ex);
}
}
}
@@ -310,8 +239,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
final int failovers, final long delay, final Exception ex) {
// log info if this has made some successful calls or
// this is not the first failover
- final boolean info = hasSuccessfulCall || failovers != 0
- || asyncCallHandler.hasSuccessfulCall();
+ final boolean info = hasMadeASuccessfulCall || failovers != 0;
if (!info && !LOG.isDebugEnabled()) {
return;
}
@@ -337,9 +265,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (!method.isAccessible()) {
method.setAccessible(true);
}
- final Object r = method.invoke(proxyDescriptor.getProxy(), args);
- hasSuccessfulCall = true;
- return r;
+ return method.invoke(proxyDescriptor.getProxy(), args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index c0a14b7..131aa8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.io.retry;
-import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
@@ -648,9 +647,8 @@ public class RetryPolicies {
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
+ retries + ") exceeded maximum allowed (" + maxRetries + ")");
}
-
+
if (e instanceof ConnectException ||
- e instanceof EOFException ||
e instanceof NoRouteToHostException ||
e instanceof UnknownHostException ||
e instanceof StandbyException ||
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2820c93..23b14e1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
@@ -93,8 +94,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
- ASYNC_RPC_RESPONSE = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+ = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -105,9 +106,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T extends Writable> AsyncGet<T, IOException>
- getAsyncRpcResponse() {
- return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
+ public static <T> Future<T> getAsyncRpcResponse() {
+ return (Future<T>) ASYNC_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -1414,16 +1414,9 @@ public class Client implements AutoCloseable {
}
}
}
-
- @Override
- public boolean isDone() {
- synchronized (call) {
- return call.done;
- }
- }
};
- ASYNC_RPC_RESPONSE.set(asyncGet);
+ ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
return null;
} else {
return getRpcResponse(call, connection, -1, null);
@@ -1468,8 +1461,10 @@ public class Client implements AutoCloseable {
synchronized (call) {
while (!call.done) {
try {
- AsyncGet.Util.wait(call, timeout, unit);
- if (timeout >= 0 && !call.done) {
+ final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+ timeout, unit);
+ call.wait(waitTimeout); // wait for the result
+ if (waitTimeout > 0 && !call.done) {
return null;
}
} catch (InterruptedException ie) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index ea629b1..4641a67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -54,6 +54,7 @@ import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -255,18 +256,14 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Client.isAsynchronousMode()) {
- final AsyncGet<RpcResponseWrapper, IOException> arr
- = Client.getAsyncRpcResponse();
+ final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet
= new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
- return getReturnMessage(method, arr.get(timeout, unit));
- }
-
- @Override
- public boolean isDone() {
- return arr.isDone();
+ final RpcResponseWrapper rrw = timeout < 0?
+ frrw.get(): frrw.get(timeout, unit);
+ return getReturnMessage(method, rrw);
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
index f124890..5eac869 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -47,19 +47,14 @@ public interface AsyncGet<R, E extends Throwable> {
R get(long timeout, TimeUnit unit)
throws E, TimeoutException, InterruptedException;
- /** @return true if the underlying computation is done; false, otherwise. */
- boolean isDone();
-
/** Utility */
class Util {
- /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
- public static void wait(Object obj, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (timeout < 0) {
- obj.wait();
- } else if (timeout > 0) {
- obj.wait(unit.toMillis(timeout));
- }
+ /**
+ * @return {@link Object#wait(long)} timeout converted
+ * from {@link #get(long, TimeUnit)} timeout.
+ */
+ public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
+ return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index ef27e12..7623975 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.ipc.TestIPC.TestServer;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -51,12 +50,6 @@ public class TestAsyncIPC {
private static Configuration conf;
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
- static <T extends Writable> AsyncGetFuture<T, IOException>
- getAsyncRpcResponseFuture() {
- return (AsyncGetFuture<T, IOException>) new AsyncGetFuture<>(
- Client.getAsyncRpcResponse());
- }
-
@Before
public void setupConf() {
conf = new Configuration();
@@ -91,7 +84,7 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
+ Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {
@@ -212,7 +205,7 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
+ Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 472b1d4..b507fa5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
@@ -51,8 +52,10 @@ public class AsyncDistributedFileSystem {
this.dfs = dfs;
}
- private static <T> Future<T> getReturnValue() {
- return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
+ static <T> Future<T> getReturnValue() {
+ final AsyncGet<T, Exception> asyncGet
+ = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
+ return new AsyncGetFuture<>(asyncGet);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index dd87b0b..b9dcee5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -175,7 +176,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -204,6 +204,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
+ private static final ThreadLocal<AsyncGet<?, Exception>>
+ ASYNC_RETURN_VALUE = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -236,6 +238,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
+ @SuppressWarnings("unchecked")
+ @Unstable
+ public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+ return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+ }
+
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -374,13 +382,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
asyncReturnMessage.get(timeout, unit);
return null;
}
-
- @Override
- public boolean isDone() {
- return asyncReturnMessage.isDone();
- }
};
- AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+ ASYNC_RETURN_VALUE.set(asyncGet);
}
@Override
@@ -1352,20 +1355,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy.getAclStatus(null, req);
final AsyncGet<Message, Exception> asyncReturnMessage
= ProtobufRpcEngine.getAsyncReturnMessage();
- final AsyncGet<AclStatus, Exception> asyncGet
- = new AsyncGet<AclStatus, Exception>() {
- @Override
- public AclStatus get(long timeout, TimeUnit unit) throws Exception {
- return PBHelperClient.convert((GetAclStatusResponseProto)
- asyncReturnMessage.get(timeout, unit));
- }
-
- @Override
- public boolean isDone() {
- return asyncReturnMessage.isDone();
- }
- };
- AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+ final AsyncGet<AclStatus, Exception> asyncGet =
+ new AsyncGet<AclStatus, Exception>() {
+ @Override
+ public AclStatus get(long timeout, TimeUnit unit)
+ throws Exception {
+ return PBHelperClient
+ .convert((GetAclStatusResponseProto) asyncReturnMessage
+ .get(timeout, unit));
+ }
+ };
+ ASYNC_RETURN_VALUE.set(asyncGet);
return null;
} else {
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index 6a60290..c7615a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -71,7 +70,7 @@ public class TestAsyncDFS {
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
private final short replFactor = 1;
private final long blockSize = 512;
- private long fileLen = 0;
+ private long fileLen = blockSize * 3;
private final long seed = Time.now();
private final Random r = new Random(seed);
private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@@ -81,7 +80,7 @@ public class TestAsyncDFS {
private Configuration conf;
private MiniDFSCluster cluster;
- private DistributedFileSystem fs;
+ private FileSystem fs;
private AsyncDistributedFileSystem adfs;
@Before
@@ -96,10 +95,10 @@ public class TestAsyncDFS {
ASYNC_CALL_LIMIT);
// set server handlers
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
- fs = cluster.getFileSystem();
- adfs = fs.getAsyncDistributedFileSystem();
+ fs = FileSystem.get(conf);
+ adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
}
@After
@@ -114,6 +113,31 @@ public class TestAsyncDFS {
}
}
+ static class AclQueueEntry {
+ private final Object future;
+ private final Path path;
+ private final Boolean isSetAcl;
+
+ AclQueueEntry(final Object future, final Path path,
+ final Boolean isSetAcl) {
+ this.future = future;
+ this.path = path;
+ this.isSetAcl = isSetAcl;
+ }
+
+ public final Object getFuture() {
+ return future;
+ }
+
+ public final Path getPath() {
+ return path;
+ }
+
+ public final Boolean isSetAcl() {
+ return this.isSetAcl;
+ }
+ }
+
@Test(timeout=60000)
public void testBatchAsyncAcl() throws Exception {
final String basePath = "testBatchAsyncAcl";
@@ -324,7 +348,7 @@ public class TestAsyncDFS {
public static void checkPermissionDenied(final Exception e, final Path dir,
final String user) {
- assertTrue(e.getCause() instanceof RemoteException);
+ assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
@@ -446,9 +470,4 @@ public class TestAsyncDFS {
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
}
}
-
- @Test
- public void testAsyncWithoutRetry() throws Exception {
- TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
deleted file mode 100644
index 70ca03d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/** Test async methods with HA setup. */
-public class TestAsyncHDFSWithHA {
- static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
- static {
- GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
- }
-
- private static <T> Future<T> getReturnValue() {
- return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
- }
-
- static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
- Path[] dsts) throws IOException {
- for (int i = 0; i < srcs.length; i++) {
- srcs[i] = new Path(dir, "src" + i);
- dsts[i] = new Path(dir, "dst" + i);
- dfs.mkdirs(srcs[i]);
- }
- }
-
- static void runTestAsyncWithoutRetry(Configuration conf,
- MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
- final int num = 5;
-
- final String renameDir = "/testAsyncWithoutRetry/";
- final Path[] srcs = new Path[num + 1];
- final Path[] dsts = new Path[num + 1];
- mkdirs(dfs, renameDir, srcs, dsts);
-
- // create a proxy without retry.
- final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
- = NameNodeProxies.createNonHAProxy(conf,
- cluster.getNameNode(0).getNameNodeAddress(),
- ClientProtocol.class, UserGroupInformation.getCurrentUser(),
- false);
- final ClientProtocol cp = proxyInfo.getProxy();
-
- // submit async calls
- Client.setAsynchronousMode(true);
- final List<Future<Void>> results = new ArrayList<>();
- for (int i = 0; i < num; i++) {
- final String src = srcs[i].toString();
- final String dst = dsts[i].toString();
- LOG.info(i + ") rename " + src + " -> " + dst);
- cp.rename2(src, dst);
- final Future<Void> returnValue = getReturnValue();
- results.add(returnValue);
- }
- Client.setAsynchronousMode(false);
-
- // wait for the async calls
- for (Future<Void> f : results) {
- f.get();
- }
-
- //check results
- for (int i = 0; i < num; i++) {
- Assert.assertEquals(false, dfs.exists(srcs[i]));
- Assert.assertEquals(true, dfs.exists(dsts[i]));
- }
- }
-
- /** Testing HDFS async methods with HA setup. */
- @Test(timeout = 120000)
- public void testAsyncWithHAFailover() throws Exception {
- final int num = 10;
-
- final Configuration conf = new HdfsConfiguration();
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(0).build();
-
- try {
- cluster.waitActive();
- cluster.transitionToActive(0);
-
- final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
- cluster, conf);
- runTestAsyncWithoutRetry(conf, cluster, dfs);
-
- final String renameDir = "/testAsyncWithHAFailover/";
- final Path[] srcs = new Path[num + 1];
- final Path[] dsts = new Path[num + 1];
- mkdirs(dfs, renameDir, srcs, dsts);
-
- // submit async calls and trigger failover in the middle.
- final AsyncDistributedFileSystem adfs
- = dfs.getAsyncDistributedFileSystem();
- final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
-
- final List<Future<Void>> results = new ArrayList<>();
- final List<IOException> exceptions = new ArrayList<>();
- final List<Future<?>> futures = new ArrayList<>();
- final int half = num/2;
- for(int i = 0; i <= num; i++) {
- final int id = i;
- futures.add(executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- if (id == half) {
- // failover
- cluster.shutdownNameNode(0);
- cluster.transitionToActive(1);
- } else {
- // rename
- results.add(adfs.rename(srcs[id], dsts[id]));
- }
- } catch (IOException e) {
- exceptions.add(e);
- }
- }
- }));
- }
-
- // wait for the tasks
- Assert.assertEquals(num + 1, futures.size());
- for(int i = 0; i <= num; i++) {
- futures.get(i).get();
- }
- // wait for the async calls
- Assert.assertEquals(num, results.size());
- Assert.assertTrue(exceptions.isEmpty());
- for(Future<Void> r : results) {
- r.get();
- }
-
- // check results
- for(int i = 0; i <= num; i++) {
- final boolean renamed = i != half;
- Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
- Assert.assertEquals(renamed, dfs.exists(dsts[i]));
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 05e8412..c7c4a77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -132,8 +131,7 @@ public abstract class HATestUtil {
}
/** Gets the filesystem instance by setting the failover configurations */
- public static DistributedFileSystem configureFailoverFs(
- MiniDFSCluster cluster, Configuration conf)
+ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
throws IOException, URISyntaxException {
return configureFailoverFs(cluster, conf, 0);
}
@@ -145,14 +143,13 @@ public abstract class HATestUtil {
* @param nsIndex namespace index starting with zero
* @throws IOException if an error occurs rolling the edit log
*/
- public static DistributedFileSystem configureFailoverFs(
- MiniDFSCluster cluster, Configuration conf,
+ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
int nsIndex) throws IOException, URISyntaxException {
conf = new Configuration(conf);
String logicalName = getLogicalHostname(cluster);
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
- return (DistributedFileSystem)fs;
+ return fs;
}
public static void setFailoverConfigurations(MiniDFSCluster cluster,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[8/8] hadoop git commit: Revert "HDFS-10224. Implement asynchronous
rename for DistributedFileSystem. Contributed by Xiaobing Zhou"
Posted by wa...@apache.org.
Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou"
This reverts commit bdc45bef646cafdc04a59c19e23dcba3bb16b20c.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d740a902
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d740a902
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d740a902
Branch: refs/heads/branch-2.8
Commit: d740a90260d9d26a67688b004e5fcda60cc30723
Parents: 8a07026
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:14:02 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:17:33 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileSystem.java | 1 +
.../main/java/org/apache/hadoop/ipc/Client.java | 11 +-
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 34 +--
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 2 +-
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 --------
.../hadoop/hdfs/DistributedFileSystem.java | 22 +-
.../ClientNamenodeProtocolTranslatorPB.java | 39 +--
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 258 -------------------
8 files changed, 17 insertions(+), 460 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 6558d98..75d468c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1247,6 +1247,7 @@ public abstract class FileSystem extends Configured implements Closeable {
/**
* Renames Path src to Path dst
* <ul>
+ * <li
* <li>Fails if src is a file and dst is a directory.
* <li>Fails if src is a directory and dst is a file.
* <li>Fails if the parent of dst does not exist or is a file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 35e5f21..befab73 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -119,8 +119,7 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<Future<?>>
- RETURN_RPC_RESPONSE = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -131,8 +130,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Future<T> getReturnRpcResponse() {
- return (Future<T>) RETURN_RPC_RESPONSE.get();
+ public static <T> Future<T> getReturnValue() {
+ return (Future<T>) returnValue.get();
}
/** Set call id and retry count for the next call. */
@@ -1398,7 +1397,7 @@ public class Client implements AutoCloseable {
}
};
- RETURN_RPC_RESPONSE.set(returnFuture);
+ returnValue.set(returnFuture);
return null;
} else {
return getRpcResponse(call, connection);
@@ -1412,7 +1411,7 @@ public class Client implements AutoCloseable {
* synchronous mode.
*/
@Unstable
- public static boolean isAsynchronousMode() {
+ static boolean isAsynchronousMode() {
return asynchronousMode.get();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 350e041..88e2e2e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -26,9 +26,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
@@ -70,9 +67,7 @@ import com.google.protobuf.TextFormat;
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
- private static final ThreadLocal<Callable<?>>
- RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
-
+
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@@ -81,12 +76,6 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
- @SuppressWarnings("unchecked")
- @Unstable
- public static <T> Callable<T> getReturnMessageCallback() {
- return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
- }
-
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
@@ -200,7 +189,7 @@ public class ProtobufRpcEngine implements RpcEngine {
* the server.
*/
@Override
- public Object invoke(Object proxy, final Method method, Object[] args)
+ public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
@@ -262,23 +251,6 @@ public class ProtobufRpcEngine implements RpcEngine {
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
- if (Client.isAsynchronousMode()) {
- final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
- Callable<Message> callback = new Callable<Message>() {
- @Override
- public Message call() throws Exception {
- return getReturnMessage(method, frrw.get());
- }
- };
- RETURN_MESSAGE_CALLBACK.set(callback);
- return null;
- } else {
- return getReturnMessage(method, val);
- }
- }
-
- private Message getReturnMessage(final Method method,
- final RpcResponseWrapper rrw) throws ServiceException {
Message prototype = null;
try {
prototype = getReturnProtoType(method);
@@ -288,7 +260,7 @@ public class ProtobufRpcEngine implements RpcEngine {
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
- .mergeFrom(rrw.theResponseRead).build();
+ .mergeFrom(val.theResponseRead).build();
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c7..de4395e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -84,7 +84,7 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+ Future<LongWritable> returnFuture = Client.getReturnValue();
returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
deleted file mode 100644
index 37899aa..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.ipc.Client;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-/****************************************************************
- * Implementation of the asynchronous distributed file system.
- * This instance of this class is the way end-user code interacts
- * with a Hadoop DistributedFileSystem in an asynchronous manner.
- *
- *****************************************************************/
-@Unstable
-public class AsyncDistributedFileSystem {
-
- private final DistributedFileSystem dfs;
-
- AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
- this.dfs = dfs;
- }
-
- static <T> Future<T> getReturnValue() {
- final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
- .getReturnValueCallback();
- Future<T> returnFuture = new AbstractFuture<T>() {
- public T get() throws InterruptedException, ExecutionException {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
- }
- return super.get();
- }
- };
- return returnFuture;
- }
-
- /**
- * Renames Path src to Path dst
- * <ul>
- * <li>Fails if src is a file and dst is a directory.
- * <li>Fails if src is a directory and dst is a file.
- * <li>Fails if the parent of dst does not exist or is a file.
- * </ul>
- * <p>
- * If OVERWRITE option is not passed as an argument, rename fails if the dst
- * already exists.
- * <p>
- * If OVERWRITE option is passed as an argument, rename overwrites the dst if
- * it is a file or an empty directory. Rename fails if dst is a non-empty
- * directory.
- * <p>
- * Note that atomicity of rename is dependent on the file system
- * implementation. Please refer to the file system documentation for details.
- * This default implementation is non atomic.
- *
- * @param src
- * path to be renamed
- * @param dst
- * new path after rename
- * @throws IOException
- * on failure
- * @return an instance of Future, #get of which is invoked to wait for
- * asynchronous call being finished.
- */
- public Future<Void> rename(Path src, Path dst,
- final Options.Rename... options) throws IOException {
- dfs.getFsStatistics().incrementWriteOps(1);
-
- final Path absSrc = dfs.fixRelativePart(src);
- final Path absDst = dfs.fixRelativePart(dst);
-
- final boolean isAsync = Client.isAsynchronousMode();
- Client.setAsynchronousMode(true);
- try {
- dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
- options);
- return getReturnValue();
- } finally {
- Client.setAsynchronousMode(isAsync);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 2ffe11a..27881d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,7 +31,6 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@@ -206,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
* @return path component of {file}
* @throws IllegalArgumentException if URI does not belong to this DFS
*/
- String getPathName(Path file) {
+ private String getPathName(Path file) {
checkPath(file);
String result = file.toUri().getPath();
if (!DFSUtilClient.isValidName(result)) {
@@ -2480,23 +2479,4 @@ public class DistributedFileSystem extends FileSystem {
}
return ret;
}
-
- private final AsyncDistributedFileSystem adfs =
- new AsyncDistributedFileSystem(this);
-
- /** @return an {@link AsyncDistributedFileSystem} object. */
- @Unstable
- public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
- return adfs;
- }
-
- @Override
- protected Path fixRelativePart(Path p) {
- return super.fixRelativePart(p);
- }
-
- Statistics getFsStatistics() {
- return statistics;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 75fba21..6aeed28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,14 +24,11 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
-import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -155,14 +152,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
@@ -174,9 +170,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -188,9 +182,12 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
+import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
+ .EncryptionZoneProto;
+
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@@ -201,8 +198,6 @@ import com.google.protobuf.ServiceException;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
- private static final ThreadLocal<Callable<?>>
- RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -235,12 +230,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
- @SuppressWarnings("unchecked")
- @Unstable
- public static <T> Callable<T> getReturnValueCallback() {
- return (Callable<T>) RETURN_VALUE_CALLBACK.get();
- }
-
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -476,7 +465,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
RenameRequestProto req = RenameRequestProto.newBuilder()
.setSrc(src)
.setDst(dst).build();
-
try {
return rpcProxy.rename(null, req).getResult();
} catch (ServiceException e) {
@@ -501,22 +489,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
setDst(dst).setOverwriteDest(overwrite).
build();
try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.rename2(null, req);
-
- final Callable<Message> returnMessageCallback = ProtobufRpcEngine
- .getReturnMessageCallback();
- Callable<Void> callBack = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- returnMessageCallback.call();
- return null;
- }
- };
- RETURN_VALUE_CALLBACK.set(callBack);
- } else {
- rpcProxy.rename2(null, req);
- }
+ rpcProxy.rename2(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
deleted file mode 100644
index 9322e1a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestAsyncDFSRename {
- final Path asyncRenameDir = new Path("/test/async_rename/");
- public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- final private static Configuration CONF = new HdfsConfiguration();
-
- final private static String GROUP1_NAME = "group1";
- final private static String GROUP2_NAME = "group2";
- final private static String USER1_NAME = "user1";
- private static final UserGroupInformation USER1;
-
- private MiniDFSCluster gCluster;
-
- static {
- // explicitly turn on permission checking
- CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
- u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
- DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
- // Initiate all four users
- USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
- GROUP1_NAME, GROUP2_NAME });
- }
-
- @Before
- public void setUp() throws IOException {
- gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
- gCluster.waitActive();
- }
-
- @After
- public void tearDown() throws IOException {
- if (gCluster != null) {
- gCluster.shutdown();
- gCluster = null;
- }
- }
-
- static int countLease(MiniDFSCluster cluster) {
- return TestDFSRename.countLease(cluster);
- }
-
- void list(DistributedFileSystem dfs, String name) throws IOException {
- FileSystem.LOG.info("\n\n" + name);
- for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
- FileSystem.LOG.info("" + s.getPath());
- }
- }
-
- static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
- DataOutputStream a_out = dfs.create(f);
- a_out.writeBytes("something");
- a_out.close();
- }
-
- /**
- * Check the blocks of dst file are cleaned after rename with overwrite
- * Restart NN to check the rename successfully
- */
- @Test
- public void testAsyncRenameWithOverwrite() throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
- Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
- replFactor).build();
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-
- try {
-
- long fileLen = blockSize * 3;
- String src = "/foo/src";
- String dst = "/foo/dst";
- String src2 = "/foo/src2";
- String dst2 = "/foo/dst2";
- Path srcPath = new Path(src);
- Path dstPath = new Path(dst);
- Path srcPath2 = new Path(src2);
- Path dstPath2 = new Path(dst2);
-
- DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
-
- LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), dst, 0, fileLen);
- LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), dst2, 0, fileLen);
- BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
- .getBlockManager();
- assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) != null);
- assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) != null);
-
- Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
- Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
- retVal1.get();
- retVal2.get();
-
- assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) == null);
- assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) == null);
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
- assertFalse(dfs.exists(srcPath));
- assertTrue(dfs.exists(dstPath));
- assertFalse(dfs.exists(srcPath2));
- assertTrue(dfs.exists(dstPath2));
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- @Test
- public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
- final Path renameDir = new Path(
- "/test/concurrent_reanme_with_overwrite_dir/");
- Configuration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .build();
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- int count = 1000;
-
- try {
- long fileLen = blockSize * 3;
- assertTrue(dfs.mkdirs(renameDir));
-
- Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
- // concurrently invoking many rename
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- }
-
- // wait for completing the calls
- for (int i = 0; i < count; i++) {
- returnFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src dir should not exist, dst should
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- assertFalse(dfs.exists(src));
- assertTrue(dfs.exists(dst));
- }
- } finally {
- dfs.delete(renameDir, true);
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- @Test
- public void testAsyncRenameWithException() throws Exception {
- FileSystem rootFs = FileSystem.get(CONF);
- final Path renameDir = new Path("/test/async_rename_exception/");
- final Path src = new Path(renameDir, "src");
- final Path dst = new Path(renameDir, "dst");
- rootFs.mkdirs(src);
-
- AsyncDistributedFileSystem adfs = USER1
- .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
- @Override
- public AsyncDistributedFileSystem run() throws Exception {
- return gCluster.getFileSystem().getAsyncDistributedFileSystem();
- }
- });
-
- try {
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src);
- }
- }
-
- private void checkPermissionDenied(final Exception e, final Path dir) {
- assertTrue(e.getCause() instanceof ExecutionException);
- assertTrue("Permission denied messages must carry AccessControlException",
- e.getMessage().contains("AccessControlException"));
- assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(USER1_NAME));
- assertTrue("Permission denied messages must carry the path parent", e
- .getMessage().contains(dir.getParent().toUri().getPath()));
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[5/8] hadoop git commit: Revert "HDFS-10390. Implement asynchronous
setAcl/getAclStatus for DistributedFileSystem. Contributed by Xiaobing Zhou"
Posted by wa...@apache.org.
Revert "HDFS-10390. Implement asynchronous setAcl/getAclStatus for DistributedFileSystem. Contributed by Xiaobing Zhou"
This reverts commit 339b803a52ed5e867b8c405a7e7a92adcfe6c4df.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1de712f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1de712f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1de712f2
Branch: refs/heads/branch-2.8
Commit: 1de712f22abbe313692cca6bece7f265da96aee5
Parents: 0af02cf
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:31 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:31 2016 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 59 ----
.../hadoop/hdfs/DistributedFileSystem.java | 3 -
.../ClientNamenodeProtocolTranslatorPB.java | 30 +-
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 310 -------------------
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 15 +-
.../hdfs/server/namenode/FSAclBaseTest.java | 12 +-
6 files changed, 18 insertions(+), 411 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1de712f2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index b507fa5..1f60df2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,16 +19,12 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
@@ -89,7 +85,6 @@ public class AsyncDistributedFileSystem {
public Future<Void> rename(Path src, Path dst,
final Options.Rename... options) throws IOException {
dfs.getFsStatistics().incrementWriteOps(1);
- dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.RENAME);
final Path absSrc = dfs.fixRelativePart(src);
final Path absDst = dfs.fixRelativePart(dst);
@@ -118,7 +113,6 @@ public class AsyncDistributedFileSystem {
public Future<Void> setPermission(Path p, final FsPermission permission)
throws IOException {
dfs.getFsStatistics().incrementWriteOps(1);
- dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_PERMISSION);
final Path absPath = dfs.fixRelativePart(p);
final boolean isAsync = Client.isAsynchronousMode();
Client.setAsynchronousMode(true);
@@ -150,7 +144,6 @@ public class AsyncDistributedFileSystem {
}
dfs.getFsStatistics().incrementWriteOps(1);
- dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_OWNER);
final Path absPath = dfs.fixRelativePart(p);
final boolean isAsync = Client.isAsynchronousMode();
Client.setAsynchronousMode(true);
@@ -161,56 +154,4 @@ public class AsyncDistributedFileSystem {
Client.setAsynchronousMode(isAsync);
}
}
-
- /**
- * Fully replaces ACL of files and directories, discarding all existing
- * entries.
- *
- * @param p
- * Path to modify
- * @param aclSpec
- * List<AclEntry> describing modifications, must include entries for
- * user, group, and others for compatibility with permission bits.
- * @throws IOException
- * if an ACL could not be modified
- * @return an instance of Future, #get of which is invoked to wait for
- * asynchronous call being finished.
- */
- public Future<Void> setAcl(Path p, final List<AclEntry> aclSpec)
- throws IOException {
- dfs.getFsStatistics().incrementWriteOps(1);
- dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_ACL);
- final Path absPath = dfs.fixRelativePart(p);
- final boolean isAsync = Client.isAsynchronousMode();
- Client.setAsynchronousMode(true);
- try {
- dfs.getClient().setAcl(dfs.getPathName(absPath), aclSpec);
- return getReturnValue();
- } finally {
- Client.setAsynchronousMode(isAsync);
- }
- }
-
- /**
- * Gets the ACL of a file or directory.
- *
- * @param p
- * Path to get
- * @return AclStatus describing the ACL of the file or directory
- * @throws IOException
- * if an ACL could not be read
- * @return an instance of Future, #get of which is invoked to wait for
- * asynchronous call being finished.
- */
- public Future<AclStatus> getAclStatus(Path p) throws IOException {
- final Path absPath = dfs.fixRelativePart(p);
- final boolean isAsync = Client.isAsynchronousMode();
- Client.setAsynchronousMode(true);
- try {
- dfs.getClient().getAclStatus(dfs.getPathName(absPath));
- return getReturnValue();
- } finally {
- Client.setAsynchronousMode(isAsync);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1de712f2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index d81d8d5..2ffe11a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -2499,7 +2499,4 @@ public class DistributedFileSystem extends FileSystem {
return statistics;
}
- DFSOpsCountStatistics getDFSOpsCountStatistics() {
- return storageStatistics;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1de712f2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index b9dcee5..849f06d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto;
@@ -162,7 +161,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTim
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.*;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
@@ -1335,12 +1334,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec))
.build();
try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.setAcl(null, req);
- setAsyncReturnValue();
- } else {
- rpcProxy.setAcl(null, req);
- }
+ rpcProxy.setAcl(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -1351,25 +1345,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetAclStatusRequestProto req = GetAclStatusRequestProto.newBuilder()
.setSrc(src).build();
try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.getAclStatus(null, req);
- final AsyncGet<Message, Exception> asyncReturnMessage
- = ProtobufRpcEngine.getAsyncReturnMessage();
- final AsyncGet<AclStatus, Exception> asyncGet =
- new AsyncGet<AclStatus, Exception>() {
- @Override
- public AclStatus get(long timeout, TimeUnit unit)
- throws Exception {
- return PBHelperClient
- .convert((GetAclStatusResponseProto) asyncReturnMessage
- .get(timeout, unit));
- }
- };
- ASYNC_RETURN_VALUE.set(asyncGet);
- return null;
- } else {
- return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
- }
+ return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1de712f2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
deleted file mode 100644
index 67262dd..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
-import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
-import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
-import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
-import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
-import static org.apache.hadoop.fs.permission.AclEntryType.USER;
-import static org.apache.hadoop.fs.permission.FsAction.ALL;
-import static org.apache.hadoop.fs.permission.FsAction.NONE;
-import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
-import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
-import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
-import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Unit tests for asynchronous distributed filesystem.
- * */
-public class TestAsyncDFS {
- public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
- private static final int NUM_TESTS = 1000;
- private static final int NUM_NN_HANDLER = 10;
- private static final int ASYNC_CALL_LIMIT = 100;
-
- private Configuration conf;
- private MiniDFSCluster cluster;
- private FileSystem fs;
-
- @Before
- public void setup() throws IOException {
- conf = new HdfsConfiguration();
- // explicitly turn on acl
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
- // explicitly turn on ACL
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
- // set the limit of max async calls
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- ASYNC_CALL_LIMIT);
- // set server handlers
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
- cluster.waitActive();
- fs = FileSystem.get(conf);
- }
-
- @After
- public void tearDown() throws IOException {
- if (fs != null) {
- fs.close();
- fs = null;
- }
- if (cluster != null) {
- cluster.shutdown();
- cluster = null;
- }
- }
-
- static class AclQueueEntry {
- private final Object future;
- private final Path path;
- private final Boolean isSetAcl;
-
- AclQueueEntry(final Object future, final Path path,
- final Boolean isSetAcl) {
- this.future = future;
- this.path = path;
- this.isSetAcl = isSetAcl;
- }
-
- public final Object getFuture() {
- return future;
- }
-
- public final Path getPath() {
- return path;
- }
-
- public final Boolean isSetAcl() {
- return this.isSetAcl;
- }
- }
-
- @Test(timeout=60000)
- public void testBatchAsyncAcl() throws Exception {
- final String basePath = "testBatchAsyncAcl";
- final Path parent = new Path(String.format("/test/%s/", basePath));
-
- AsyncDistributedFileSystem adfs = cluster.getFileSystem()
- .getAsyncDistributedFileSystem();
-
- // prepare test
- int count = NUM_TESTS;
- final Path[] paths = new Path[count];
- for (int i = 0; i < count; i++) {
- paths[i] = new Path(parent, "acl" + i);
- FileSystem.mkdirs(fs, paths[i],
- FsPermission.createImmutable((short) 0750));
- assertTrue(fs.exists(paths[i]));
- assertTrue(fs.getFileStatus(paths[i]).isDirectory());
- }
-
- final List<AclEntry> aclSpec = getAclSpec();
- final AclEntry[] expectedAclSpec = getExpectedAclSpec();
- Map<Integer, Future<Void>> setAclRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<AclStatus>> getAclRetFutures =
- new HashMap<Integer, Future<AclStatus>>();
- int start = 0, end = 0;
- try {
- // test setAcl
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
- setAclRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForAclReturnValues(setAclRetFutures, start, end);
- }
- }
- }
- waitForAclReturnValues(setAclRetFutures, end, count);
-
- // test getAclStatus
- start = 0;
- end = 0;
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
- getAclRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForAclReturnValues(getAclRetFutures, start, end, paths,
- expectedAclSpec);
- }
- }
- }
- waitForAclReturnValues(getAclRetFutures, end, count, paths,
- expectedAclSpec);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void waitForAclReturnValues(
- final Map<Integer, Future<Void>> aclRetFutures, final int start,
- final int end) throws InterruptedException, ExecutionException {
- for (int i = start; i < end; i++) {
- aclRetFutures.get(i).get();
- }
- }
-
- private void waitForAclReturnValues(
- final Map<Integer, Future<AclStatus>> aclRetFutures, final int start,
- final int end, final Path[] paths, final AclEntry[] expectedAclSpec)
- throws InterruptedException, ExecutionException, IOException {
- for (int i = start; i < end; i++) {
- AclStatus aclStatus = aclRetFutures.get(i).get();
- verifyGetAcl(aclStatus, expectedAclSpec, paths[i]);
- }
- }
-
- private void verifyGetAcl(final AclStatus aclStatus,
- final AclEntry[] expectedAclSpec, final Path path) throws IOException {
- if (aclStatus == null) {
- return;
- }
-
- // verify permission and acl
- AclEntry[] returned = aclStatus.getEntries().toArray(new AclEntry[0]);
- assertArrayEquals(expectedAclSpec, returned);
- assertPermission(path, (short) 010770);
- FSAclBaseTest.assertAclFeature(cluster, path, true);
- }
-
- private List<AclEntry> getAclSpec() {
- return Lists.newArrayList(
- aclEntry(ACCESS, USER, ALL),
- aclEntry(ACCESS, USER, "foo", ALL),
- aclEntry(ACCESS, GROUP, READ_EXECUTE),
- aclEntry(ACCESS, OTHER, NONE),
- aclEntry(DEFAULT, USER, "foo", ALL));
- }
-
- private AclEntry[] getExpectedAclSpec() {
- return new AclEntry[] {
- aclEntry(ACCESS, USER, "foo", ALL),
- aclEntry(ACCESS, GROUP, READ_EXECUTE),
- aclEntry(DEFAULT, USER, ALL),
- aclEntry(DEFAULT, USER, "foo", ALL),
- aclEntry(DEFAULT, GROUP, READ_EXECUTE),
- aclEntry(DEFAULT, MASK, ALL),
- aclEntry(DEFAULT, OTHER, NONE) };
- }
-
- private void assertPermission(final Path pathToCheck, final short perm)
- throws IOException {
- AclTestHelpers.assertPermission(fs, pathToCheck, perm);
- }
-
- @Test(timeout=60000)
- public void testAsyncAPIWithException() throws Exception {
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
- UserGroupInformation ugi1;
-
- // create fake mapping for the groups
- Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
- u2gMap.put(user1, new String[] {group1, group2});
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
- // Initiate all four users
- ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
- group1, group2 });
-
- final Path parent = new Path("/test/async_api_exception/");
- final Path aclDir = new Path(parent, "aclDir");
- fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
-
- AsyncDistributedFileSystem adfs = ugi1
- .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
- @Override
- public AsyncDistributedFileSystem run() throws Exception {
- return cluster.getFileSystem().getAsyncDistributedFileSystem();
- }
- });
-
- Future<Void> retFuture;
- // test setAcl
- try {
- retFuture = adfs.setAcl(aclDir,
- Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
- retFuture.get();
- fail("setAcl should fail with permission denied");
- } catch (ExecutionException e) {
- checkPermissionDenied(e, aclDir, user1);
- }
-
- // test getAclStatus
- try {
- Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
- aclRetFuture.get();
- fail("getAclStatus should fail with permission denied");
- } catch (ExecutionException e) {
- checkPermissionDenied(e, aclDir, user1);
- }
- }
-
- public static void checkPermissionDenied(final Exception e, final Path dir,
- final String user) {
- assertTrue(e.getCause() instanceof ExecutionException);
- assertTrue("Permission denied messages must carry AccessControlException",
- e.getMessage().contains("AccessControlException"));
- assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(user));
- assertTrue("Permission denied messages must carry the name of the path",
- e.getMessage().contains(dir.getName()));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1de712f2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 03c8151..7539fbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -520,7 +520,7 @@ public class TestAsyncDFSRename {
retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
retFuture.get();
} catch (ExecutionException e) {
- TestAsyncDFS.checkPermissionDenied(e, src, user1);
+ checkPermissionDenied(e, src, user1);
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(src.getParent().toUri().getPath()));
}
@@ -530,7 +530,7 @@ public class TestAsyncDFSRename {
retFuture = adfs.setPermission(src, fsPerm);
retFuture.get();
} catch (ExecutionException e) {
- TestAsyncDFS.checkPermissionDenied(e, src, user1);
+ checkPermissionDenied(e, src, user1);
assertTrue("Permission denied messages must carry the name of the path",
e.getMessage().contains(src.getName()));
}
@@ -539,7 +539,7 @@ public class TestAsyncDFSRename {
retFuture = adfs.setOwner(src, "user1", "group2");
retFuture.get();
} catch (ExecutionException e) {
- TestAsyncDFS.checkPermissionDenied(e, src, user1);
+ checkPermissionDenied(e, src, user1);
assertTrue("Permission denied messages must carry the name of the path",
e.getMessage().contains(src.getName()));
} finally {
@@ -551,4 +551,13 @@ public class TestAsyncDFSRename {
}
}
}
+
+ private void checkPermissionDenied(final Exception e, final Path dir,
+ final String user) {
+ assertTrue(e.getCause() instanceof ExecutionException);
+ assertTrue("Permission denied messages must carry AccessControlException",
+ e.getMessage().contains("AccessControlException"));
+ assertTrue("Permission denied messages must carry the username", e
+ .getMessage().contains(user));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1de712f2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
index 52e638e..f481bc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
@@ -1637,23 +1637,17 @@ public abstract class FSAclBaseTest {
assertAclFeature(path, expectAclFeature);
}
- private static void assertAclFeature(Path pathToCheck,
- boolean expectAclFeature) throws IOException {
- assertAclFeature(cluster, pathToCheck, expectAclFeature);
- }
-
/**
* Asserts whether or not the inode for a specific path has an AclFeature.
*
- * @param miniCluster the cluster into which the path resides
* @param pathToCheck Path inode to check
* @param expectAclFeature boolean true if an AclFeature must be present,
* false if an AclFeature must not be present
* @throws IOException thrown if there is an I/O error
*/
- public static void assertAclFeature(final MiniDFSCluster miniCluster,
- Path pathToCheck, boolean expectAclFeature) throws IOException {
- AclFeature aclFeature = getAclFeature(pathToCheck, miniCluster);
+ private static void assertAclFeature(Path pathToCheck,
+ boolean expectAclFeature) throws IOException {
+ AclFeature aclFeature = getAclFeature(pathToCheck, cluster);
if (expectAclFeature) {
assertNotNull(aclFeature);
// Intentionally capturing a reference to the entries, not using nested
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[7/8] hadoop git commit: Revert "HADOOP-12957. Limit the number of
outstanding async calls. Contributed by Xiaobing Zhou"
Posted by wa...@apache.org.
Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou"
This reverts commit a1ba6eee38420ae9436f9e9ecc3db83702fbe762.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8a07026e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8a07026e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8a07026e
Branch: refs/heads/branch-2.8
Commit: 8a07026e7792e43e692409c3ec998a306b7733ff
Parents: ffa85f2
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:32 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:17:33 2016 -0700
----------------------------------------------------------------------
.../hadoop/fs/CommonConfigurationKeys.java | 3 -
.../ipc/AsyncCallLimitExceededException.java | 36 ---
.../main/java/org/apache/hadoop/ipc/Client.java | 66 +----
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 199 ++--------------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 12 +-
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 238 ++++++-------------
6 files changed, 109 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a07026e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 7f510bd..e706104 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -296,9 +296,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours
- public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
- "ipc.client.async.calls.max";
- public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a07026e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
deleted file mode 100644
index db97b6c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import java.io.IOException;
-
-/**
- * Signals that an AsyncCallLimitExceededException has occurred. This class is
- * used to make application code using async RPC aware that limit of max async
- * calls is reached, application code need to retrieve results from response of
- * established async calls to avoid buffer overflow in order for follow-on async
- * calls going correctly.
- */
-public class AsyncCallLimitExceededException extends IOException {
- private static final long serialVersionUID = 1L;
-
- public AsyncCallLimitExceededException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a07026e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 1f753cb..35e5f21 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,11 +159,9 @@ public class Client implements AutoCloseable {
private final boolean fallbackAllowed;
private final byte[] clientId;
- private final int maxAsyncCalls;
- private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
final static int CONNECTION_CONTEXT_CALL_ID = -3;
-
+
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@@ -1291,9 +1289,6 @@ public class Client implements AutoCloseable {
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
- this.maxAsyncCalls = conf.getInt(
- CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@@ -1360,20 +1355,6 @@ public class Client implements AutoCloseable {
fallbackToSimpleAuth);
}
- private void checkAsyncCall() throws IOException {
- if (isAsynchronousMode()) {
- if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
- asyncCallCounter.decrementAndGet();
- String errMsg = String.format(
- "Exceeded limit of max asynchronous calls: %d, " +
- "please configure %s to adjust it.",
- maxAsyncCalls,
- CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
- throw new AsyncCallLimitExceededException(errMsg);
- }
- }
- }
-
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@@ -1394,38 +1375,24 @@ public class Client implements AutoCloseable {
final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
-
try {
- checkAsyncCall();
- try {
- connection.sendRpcRequest(call); // send the rpc request
- } catch (RejectedExecutionException e) {
- throw new IOException("connection has been closed", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("interrupted waiting to send rpc request to server", e);
- throw new IOException(e);
- }
- } catch(Exception e) {
- if (isAsynchronousMode()) {
- releaseAsyncCall();
- }
- throw e;
+ connection.sendRpcRequest(call); // send the rpc request
+ } catch (RejectedExecutionException e) {
+ throw new IOException("connection has been closed", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("interrupted waiting to send rpc request to server", e);
+ throw new IOException(e);
}
if (isAsynchronousMode()) {
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
- private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
public Writable get() throws InterruptedException, ExecutionException {
- if (callled.compareAndSet(false, true)) {
- try {
- set(getRpcResponse(call, connection));
- } catch (IOException ie) {
- setException(ie);
- } finally {
- releaseAsyncCall();
- }
+ try {
+ set(getRpcResponse(call, connection));
+ } catch (IOException ie) {
+ setException(ie);
}
return super.get();
}
@@ -1461,15 +1428,6 @@ public class Client implements AutoCloseable {
asynchronousMode.set(async);
}
- private void releaseAsyncCall() {
- asyncCallCounter.decrementAndGet();
- }
-
- @VisibleForTesting
- int getAsyncCallCount() {
- return asyncCallCounter.get();
- }
-
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
synchronized (call) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a07026e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c..6cf75c7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -35,7 +34,6 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -56,13 +54,12 @@ public class TestAsyncIPC {
@Before
public void setupConf() {
conf = new Configuration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread
Client.setAsynchronousMode(true);
}
- static class AsyncCaller extends Thread {
+ protected static class SerialCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
@@ -71,11 +68,11 @@ public class TestAsyncIPC {
new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
- public AsyncCaller(Client client, InetSocketAddress server, int count) {
+ public SerialCaller(Client client, InetSocketAddress server, int count) {
this.client = client;
this.server = server;
this.count = count;
- // set asynchronous mode, since AsyncCaller extends Thread
+ // set asynchronous mode, since SerialCaller extends Thread
Client.setAsynchronousMode(true);
}
@@ -110,111 +107,14 @@ public class TestAsyncIPC {
}
}
- static class AsyncLimitlCaller extends Thread {
- private Client client;
- private InetSocketAddress server;
- private int count;
- private boolean failed;
- Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
- Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
- int start = 0, end = 0;
-
- int getStart() {
- return start;
- }
-
- int getEnd() {
- return end;
- }
-
- int getCount() {
- return count;
- }
-
- public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
- this(0, client, server, count);
- }
-
- final int callerId;
-
- public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
- int count) {
- this.client = client;
- this.server = server;
- this.count = count;
- // set asynchronous mode, since AsyncLimitlCaller extends Thread
- Client.setAsynchronousMode(true);
- this.callerId = callerId;
- }
-
- @Override
- public void run() {
- // in case Thread#Start is called, which will spawn new thread
- Client.setAsynchronousMode(true);
- for (int i = 0; i < count; i++) {
- try {
- final long param = TestIPC.RANDOM.nextLong();
- runCall(i, param);
- } catch (Exception e) {
- LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
- StringUtils.stringifyException(e)));
- failed = true;
- }
- }
- }
-
- private void runCall(final int idx, final long param)
- throws InterruptedException, ExecutionException, IOException {
- for (;;) {
- try {
- doCall(idx, param);
- return;
- } catch (AsyncCallLimitExceededException e) {
- /**
- * reached limit of async calls, fetch results of finished async calls
- * to let follow-on calls go
- */
- start = end;
- end = idx;
- waitForReturnValues(start, end);
- }
- }
- }
-
- private void doCall(final int idx, final long param) throws IOException {
- TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
- returnFutures.put(idx, returnFuture);
- expectedValues.put(idx, param);
- }
-
- private void waitForReturnValues(final int start, final int end)
- throws InterruptedException, ExecutionException {
- for (int i = start; i < end; i++) {
- LongWritable value = returnFutures.get(i).get();
- if (expectedValues.get(i) != value.get()) {
- LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
- failed = true;
- break;
- }
- }
- }
- }
-
- @Test(timeout = 60000)
- public void testAsyncCall() throws IOException, InterruptedException,
+ @Test
+ public void testSerial() throws IOException, InterruptedException,
ExecutionException {
- internalTestAsyncCall(3, false, 2, 5, 100);
- internalTestAsyncCall(3, true, 2, 5, 10);
+ internalTestSerial(3, false, 2, 5, 100);
+ internalTestSerial(3, true, 2, 5, 10);
}
- @Test(timeout = 60000)
- public void testAsyncCallLimit() throws IOException,
- InterruptedException, ExecutionException {
- internalTestAsyncCallLimit(100, false, 5, 10, 500);
- }
-
- public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
+ public void internalTestSerial(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -226,9 +126,9 @@ public class TestAsyncIPC {
clients[i] = new Client(LongWritable.class, conf);
}
- AsyncCaller[] callers = new AsyncCaller[callerCount];
+ SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
- callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
+ callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
@@ -244,75 +144,6 @@ public class TestAsyncIPC {
server.stop();
}
- @Test(timeout = 60000)
- public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
- InterruptedException, ExecutionException {
- int handlerCount = 10, callCount = 100;
- Server server = new TestIPC.TestServer(handlerCount, false, conf);
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
- server.start();
- final Client client = new Client(LongWritable.class, conf);
-
- int asyncCallCount = client.getAsyncCallCount();
-
- try {
- AsyncCaller caller = new AsyncCaller(client, addr, callCount);
- caller.run();
-
- caller.waitForReturnValues();
- String msg = String.format(
- "First time, expected not failed for caller: %s.", caller);
- assertFalse(msg, caller.failed);
-
- caller.waitForReturnValues();
- assertTrue(asyncCallCount == client.getAsyncCallCount());
- msg = String.format("Second time, expected not failed for caller: %s.",
- caller);
- assertFalse(msg, caller.failed);
-
- assertTrue(asyncCallCount == client.getAsyncCallCount());
- } finally {
- client.stop();
- server.stop();
- }
- }
-
- public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
- int clientCount, int callerCount, int callCount) throws IOException,
- InterruptedException, ExecutionException {
- Configuration conf = new Configuration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
- Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
-
- Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
- server.start();
-
- Client[] clients = new Client[clientCount];
- for (int i = 0; i < clientCount; i++) {
- clients[i] = new Client(LongWritable.class, conf);
- }
-
- AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
- for (int i = 0; i < callerCount; i++) {
- callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
- callCount);
- callers[i].start();
- }
- for (int i = 0; i < callerCount; i++) {
- callers[i].join();
- callers[i].waitForReturnValues(callers[i].getStart(),
- callers[i].getCount());
- String msg = String.format("Expected not failed for caller-%d: %s.", i,
- callers[i]);
- assertFalse(msg, callers[i].failed);
- }
- for (int i = 0; i < clientCount; i++) {
- clients[i].stop();
- }
- server.stop();
- }
-
/**
* Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -365,7 +196,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final AsyncCaller caller = new AsyncCaller(client, addr, 4);
+ final SerialCaller caller = new SerialCaller(client, addr, 4);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -404,7 +235,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -441,7 +272,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -482,9 +313,9 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- AsyncCaller[] callers = new AsyncCaller[callerCount];
+ SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
- callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
+ callers[i] = new SerialCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a07026e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 356ae3f..37899aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
@@ -51,14 +50,11 @@ public class AsyncDistributedFileSystem {
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future<T> returnFuture = new AbstractFuture<T>() {
- private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
- if (called.compareAndSet(false, true)) {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
- }
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
}
return super.get();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a07026e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index d129299..9322e1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@@ -30,25 +31,80 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
+ final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+ final private static Configuration CONF = new HdfsConfiguration();
+
+ final private static String GROUP1_NAME = "group1";
+ final private static String GROUP2_NAME = "group2";
+ final private static String USER1_NAME = "user1";
+ private static final UserGroupInformation USER1;
+
+ private MiniDFSCluster gCluster;
+
+ static {
+ // explicitly turn on permission checking
+ CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+ u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+ DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+ // Initiate all four users
+ USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+ GROUP1_NAME, GROUP2_NAME });
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+ gCluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (gCluster != null) {
+ gCluster.shutdown();
+ gCluster = null;
+ }
+ }
+
+ static int countLease(MiniDFSCluster cluster) {
+ return TestDFSRename.countLease(cluster);
+ }
+
+ void list(DistributedFileSystem dfs, String name) throws IOException {
+ FileSystem.LOG.info("\n\n" + name);
+ for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+ FileSystem.LOG.info("" + s.getPath());
+ }
+ }
+
+ static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+ DataOutputStream a_out = dfs.create(f);
+ a_out.writeBytes("something");
+ a_out.close();
+ }
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
- @Test(timeout = 60000)
+ @Test
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
@@ -113,134 +169,38 @@ public class TestAsyncDFSRename {
}
}
- @Test(timeout = 60000)
- public void testCallGetReturnValueMultipleTimes() throws Exception {
+ @Test
+ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
- "/test/testCallGetReturnValueMultipleTimes/");
- final Configuration conf = new HdfsConfiguration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(2).build();
- cluster.waitActive();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- final int count = 100;
- long fileLen = blockSize * 3;
- final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
- assertTrue(dfs.mkdirs(renameDir));
-
- try {
- // concurrently invoking many rename
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- }
-
- for (int i = 0; i < 5; i++) {
- verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
- renameDir, dfs);
- }
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- private void verifyCallGetReturnValueMultipleTimes(
- Map<Integer, Future<Void>> returnFutures, int count,
- MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
- throws InterruptedException, ExecutionException, IOException {
- // wait for completing the calls
- for (int i = 0; i < count; i++) {
- returnFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src dir should not exist, dst should
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- assertFalse(dfs.exists(src));
- assertTrue(dfs.exists(dst));
- }
- }
-
- @Test(timeout = 120000)
- public void testAggressiveConcurrentAsyncRenameWithOverwrite()
- throws Exception {
- internalTestConcurrentAsyncRenameWithOverwrite(100,
- "testAggressiveConcurrentAsyncRenameWithOverwrite");
- }
-
- @Test(timeout = 60000)
- public void testConservativeConcurrentAsyncRenameWithOverwrite()
- throws Exception {
- internalTestConcurrentAsyncRenameWithOverwrite(10000,
- "testConservativeConcurrentAsyncRenameWithOverwrite");
- }
-
- private void internalTestConcurrentAsyncRenameWithOverwrite(
- final int asyncCallLimit, final String basePath) throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
- final Path renameDir = new Path(String.format("/test/%s/", basePath));
+ "/test/concurrent_reanme_with_overwrite_dir/");
Configuration conf = new HdfsConfiguration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- asyncCallLimit);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
- long fileLen = blockSize * 3;
- int start = 0, end = 0;
- Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
- assertTrue(dfs.mkdirs(renameDir));
try {
+ long fileLen = blockSize * 3;
+ assertTrue(dfs.mkdirs(renameDir));
+
+ Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- for (;;) {
- try {
- LOG.info("rename #" + i);
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- /**
- * reached limit of async calls, fetch results of finished async
- * calls to let follow-on calls go
- */
- LOG.error(e);
- start = end;
- end = i;
- LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
- waitForReturnValues(returnFutures, start, end);
- }
- }
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
}
// wait for completing the calls
- for (int i = start; i < count; i++) {
+ for (int i = 0; i < count; i++) {
returnFutures.get(i).get();
}
@@ -255,60 +215,26 @@ public class TestAsyncDFSRename {
assertTrue(dfs.exists(dst));
}
} finally {
- if (dfs != null) {
- dfs.close();
- }
+ dfs.delete(renameDir, true);
if (cluster != null) {
cluster.shutdown();
}
}
}
- private void waitForReturnValues(
- final Map<Integer, Future<Void>> returnFutures, final int start,
- final int end) throws InterruptedException, ExecutionException {
- LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
- for (int i = start; i < end; i++) {
- LOG.info("calling Future#get #" + i);
- returnFutures.get(i).get();
- }
- }
-
- @Test(timeout = 60000)
+ @Test
public void testAsyncRenameWithException() throws Exception {
- Configuration conf = new HdfsConfiguration();
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
- UserGroupInformation ugi1;
-
- // explicitly turn on permission checking
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
- u2g_map.put(user1, new String[] { group1, group2 });
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
-
- // Initiate all four users
- ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
- group1, group2 });
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(3).build();
- cluster.waitActive();
-
- FileSystem rootFs = FileSystem.get(conf);
+ FileSystem rootFs = FileSystem.get(CONF);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
- AsyncDistributedFileSystem adfs = ugi1
+ AsyncDistributedFileSystem adfs = USER1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
- return cluster.getFileSystem().getAsyncDistributedFileSystem();
+ return gCluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
@@ -316,24 +242,16 @@ public class TestAsyncDFSRename {
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- } finally {
- if (rootFs != null) {
- rootFs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
+ checkPermissionDenied(e, src);
}
}
- private void checkPermissionDenied(final Exception e, final Path dir,
- final String user) {
+ private void checkPermissionDenied(final Exception e, final Path dir) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(user));
+ .getMessage().contains(USER1_NAME));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[6/8] hadoop git commit: Revert "HADOOP-13168. Support Future.get
with timeout in ipc async calls."
Posted by wa...@apache.org.
Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."
This reverts commit fa7c7f25105bc157c652a2f44ac49620fc61c0f4.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/308d2864
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/308d2864
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/308d2864
Branch: refs/heads/branch-2.8
Commit: 308d28640d8c797f07aa41a8adc5dcd9420f1d0a
Parents: 1de712f
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:31 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:31 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/ipc/Client.java | 119 +++++++++---------
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 62 +++++-----
.../apache/hadoop/util/concurrent/AsyncGet.java | 60 ---------
.../hadoop/util/concurrent/AsyncGetFuture.java | 73 -----------
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 122 ++++++++-----------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 26 +++-
.../ClientNamenodeProtocolTranslatorPB.java | 33 +++--
7 files changed, 183 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 23b14e1..1f753cb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,10 +18,46 @@
package org.apache.hadoop.ipc;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-import java.io.*;
-import java.net.*;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
-import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -94,8 +119,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
- = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>>
+ RETURN_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -106,8 +131,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Future<T> getAsyncRpcResponse() {
- return (Future<T>) ASYNC_RPC_RESPONSE.get();
+ public static <T> Future<T> getReturnRpcResponse() {
+ return (Future<T>) RETURN_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -356,11 +381,6 @@ public class Client implements AutoCloseable {
}
}
- @Override
- public String toString() {
- return getClass().getSimpleName() + id;
- }
-
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
@@ -1394,32 +1414,27 @@ public class Client implements AutoCloseable {
}
if (isAsynchronousMode()) {
- final AsyncGet<Writable, IOException> asyncGet
- = new AsyncGet<Writable, IOException>() {
+ Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+ private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
- public Writable get(long timeout, TimeUnit unit)
- throws IOException, TimeoutException{
- boolean done = true;
- try {
- final Writable w = getRpcResponse(call, connection, timeout, unit);
- if (w == null) {
- done = false;
- throw new TimeoutException(call + " timed out "
- + timeout + " " + unit);
- }
- return w;
- } finally {
- if (done) {
+ public Writable get() throws InterruptedException, ExecutionException {
+ if (callled.compareAndSet(false, true)) {
+ try {
+ set(getRpcResponse(call, connection));
+ } catch (IOException ie) {
+ setException(ie);
+ } finally {
releaseAsyncCall();
}
}
+ return super.get();
}
};
- ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
+ RETURN_RPC_RESPONSE.set(returnFuture);
return null;
} else {
- return getRpcResponse(call, connection, -1, null);
+ return getRpcResponse(call, connection);
}
}
@@ -1455,18 +1470,12 @@ public class Client implements AutoCloseable {
return asyncCallCounter.get();
}
- /** @return the rpc response or, in case of timeout, null. */
- private Writable getRpcResponse(final Call call, final Connection connection,
- final long timeout, final TimeUnit unit) throws IOException {
+ private Writable getRpcResponse(final Call call, final Connection connection)
+ throws IOException {
synchronized (call) {
while (!call.done) {
try {
- final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
- timeout, unit);
- call.wait(waitTimeout); // wait for the result
- if (waitTimeout > 0 && !call.done) {
- return null;
- }
+ call.wait(); // wait for the result
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 4641a67..350e041 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,9 +18,21 @@
package org.apache.hadoop.ipc;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.*;
-import com.google.protobuf.Descriptors.MethodDescriptor;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
/**
* RPC Engine for for protobuf based RPCs.
@@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
- private static final ThreadLocal<AsyncGet<Message, Exception>>
- ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
+ private static final ThreadLocal<Callable<?>>
+ RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
+ @SuppressWarnings("unchecked")
@Unstable
- public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
- return ASYNC_RETURN_MESSAGE.get();
+ public static <T> Callable<T> getReturnMessageCallback() {
+ return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
}
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Client.isAsynchronousMode()) {
- final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
- final AsyncGet<Message, Exception> asyncGet
- = new AsyncGet<Message, Exception>() {
+ final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+ Callable<Message> callback = new Callable<Message>() {
@Override
- public Message get(long timeout, TimeUnit unit) throws Exception {
- final RpcResponseWrapper rrw = timeout < 0?
- frrw.get(): frrw.get(timeout, unit);
- return getReturnMessage(method, rrw);
+ public Message call() throws Exception {
+ return getReturnMessage(method, frrw.get());
}
};
- ASYNC_RETURN_MESSAGE.set(asyncGet);
+ RETURN_MESSAGE_CALLBACK.set(callback);
return null;
} else {
return getReturnMessage(method, val);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
deleted file mode 100644
index 5eac869..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
- *
- * When the return value is still being computed, invoking
- * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
- * The method should be invoked again and again
- * until the underlying computation is completed.
- *
- * @param <R> The type of the return value.
- * @param <E> The exception type that the underlying implementation may throw.
- */
-public interface AsyncGet<R, E extends Throwable> {
- /**
- * Get the result.
- *
- * @param timeout The maximum time period to wait.
- * When timeout == 0, it does not wait at all.
- * When timeout < 0, it waits indefinitely.
- * @param unit The unit of the timeout value
- * @return the result, which is possibly null.
- * @throws E an exception thrown by the underlying implementation.
- * @throws TimeoutException if it cannot return after the given time period.
- * @throws InterruptedException if the thread is interrupted.
- */
- R get(long timeout, TimeUnit unit)
- throws E, TimeoutException, InterruptedException;
-
- /** Utility */
- class Util {
- /**
- * @return {@link Object#wait(long)} timeout converted
- * from {@link #get(long, TimeUnit)} timeout.
- */
- public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
- return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
deleted file mode 100644
index d687867..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/** A {@link Future} implemented using an {@link AsyncGet} object. */
-public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
- public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
-
- private final AtomicBoolean called = new AtomicBoolean(false);
- private final AsyncGet<T, E> asyncGet;
-
- public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
- this.asyncGet = asyncGet;
- }
-
- private void callAsyncGet(long timeout, TimeUnit unit) {
- if (!isCancelled() && called.compareAndSet(false, true)) {
- try {
- set(asyncGet.get(timeout, unit));
- } catch (TimeoutException te) {
- LOG.trace("TRACE", te);
- called.compareAndSet(true, false);
- } catch (Throwable e) {
- LOG.trace("TRACE", e);
- setException(e);
- }
- }
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- callAsyncGet(-1, TimeUnit.MILLISECONDS);
- return super.get();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException, ExecutionException {
- callAsyncGet(timeout, unit);
- return super.get(0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public boolean isDone() {
- callAsyncGet(0, TimeUnit.MILLISECONDS);
- return super.isDone();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 7623975..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,6 +18,20 @@
package org.apache.hadoop.ipc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,17 +48,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
public class TestAsyncIPC {
private static Configuration conf;
@@ -84,51 +87,26 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
+ Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
- throw new RuntimeException(e);
}
}
}
- void assertReturnValues() throws InterruptedException, ExecutionException {
+ public void waitForReturnValues() throws InterruptedException,
+ ExecutionException {
for (int i = 0; i < count; i++) {
LongWritable value = returnFutures.get(i).get();
- Assert.assertEquals("call" + i + " failed.",
- expectedValues.get(i).longValue(), value.get());
- }
- Assert.assertFalse(failed);
- }
-
- void assertReturnValues(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException {
- final boolean[] checked = new boolean[count];
- for(boolean done = false; !done;) {
- done = true;
- for (int i = 0; i < count; i++) {
- if (checked[i]) {
- continue;
- } else {
- done = false;
- }
-
- final LongWritable value;
- try {
- value = returnFutures.get(i).get(timeout, unit);
- } catch (TimeoutException e) {
- LOG.info("call" + i + " caught ", e);
- continue;
- }
-
- Assert.assertEquals("call" + i + " failed.",
- expectedValues.get(i).longValue(), value.get());
- checked[i] = true;
+ if (expectedValues.get(i) != value.get()) {
+ LOG.fatal(String.format("Call-%d failed!", i));
+ failed = true;
+ break;
}
}
- Assert.assertFalse(failed);
}
}
@@ -205,7 +183,7 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
+ Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
@@ -255,7 +233,10 @@ public class TestAsyncIPC {
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
- callers[i].assertReturnValues();
+ callers[i].waitForReturnValues();
+ String msg = String.format("Expected not failed for caller-%d: %s.", i,
+ callers[i]);
+ assertFalse(msg, callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
@@ -277,37 +258,25 @@ public class TestAsyncIPC {
try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run();
- caller.assertReturnValues();
- caller.assertReturnValues();
- caller.assertReturnValues();
- Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
- } finally {
- client.stop();
- server.stop();
- }
- }
- @Test(timeout = 60000)
- public void testFutureGetWithTimeout() throws IOException,
- InterruptedException, ExecutionException {
-// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
- final Server server = new TestIPC.TestServer(10, true, conf);
- final InetSocketAddress addr = NetUtils.getConnectAddress(server);
- server.start();
+ caller.waitForReturnValues();
+ String msg = String.format(
+ "First time, expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
- final Client client = new Client(LongWritable.class, conf);
+ caller.waitForReturnValues();
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
+ msg = String.format("Second time, expected not failed for caller: %s.",
+ caller);
+ assertFalse(msg, caller.failed);
- try {
- final AsyncCaller caller = new AsyncCaller(client, addr, 10);
- caller.run();
- caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
} finally {
client.stop();
server.stop();
}
}
-
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
@@ -398,7 +367,9 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
- caller.assertReturnValues();
+ caller.waitForReturnValues();
+ String msg = String.format("Expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
} finally {
client.stop();
server.stop();
@@ -435,7 +406,9 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
- caller.assertReturnValues();
+ caller.waitForReturnValues();
+ String msg = String.format("Expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
} finally {
client.stop();
server.stop();
@@ -470,7 +443,9 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
- caller.assertReturnValues();
+ caller.waitForReturnValues();
+ String msg = String.format("Expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
} finally {
client.stop();
server.stop();
@@ -514,7 +489,10 @@ public class TestAsyncIPC {
}
for (int i = 0; i < callerCount; ++i) {
callers[i].join();
- callers[i].assertReturnValues();
+ callers[i].waitForReturnValues();
+ String msg = String.format("Expected not failed for caller-%d: %s.", i,
+ callers[i]);
+ assertFalse(msg, callers[i].failed);
}
} finally {
client.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 1f60df2..4fe0861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,17 +19,20 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
+import com.google.common.util.concurrent.AbstractFuture;
+
/****************************************************************
* Implementation of the asynchronous distributed file system.
* This instance of this class is the way end-user code interacts
@@ -49,9 +52,22 @@ public class AsyncDistributedFileSystem {
}
static <T> Future<T> getReturnValue() {
- final AsyncGet<T, Exception> asyncGet
- = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
- return new AsyncGetFuture<>(asyncGet);
+ final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+ .getReturnValueCallback();
+ Future<T> returnFuture = new AbstractFuture<T>() {
+ private final AtomicBoolean called = new AtomicBoolean(false);
+ public T get() throws InterruptedException, ExecutionException {
+ if (called.compareAndSet(false, true)) {
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
+ }
+ }
+ return super.get();
+ }
+ };
+ return returnFuture;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/308d2864/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 849f06d..94c6c0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,8 +24,7 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
-
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -191,7 +190,6 @@ import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.util.concurrent.AsyncGet;
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -203,8 +201,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
- private static final ThreadLocal<AsyncGet<?, Exception>>
- ASYNC_RETURN_VALUE = new ThreadLocal<>();
+ private static final ThreadLocal<Callable<?>>
+ RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -239,8 +237,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
@SuppressWarnings("unchecked")
@Unstable
- public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
- return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+ public static <T> Callable<T> getReturnValueCallback() {
+ return (Callable<T>) RETURN_VALUE_CALLBACK.get();
}
@Override
@@ -362,7 +360,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.setPermission(null, req);
- setAsyncReturnValue();
+ setReturnValueCallback();
} else {
rpcProxy.setPermission(null, req);
}
@@ -371,18 +369,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
- private void setAsyncReturnValue() {
- final AsyncGet<Message, Exception> asyncReturnMessage
- = ProtobufRpcEngine.getAsyncReturnMessage();
- final AsyncGet<Void, Exception> asyncGet
- = new AsyncGet<Void, Exception>() {
+ private void setReturnValueCallback() {
+ final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+ .getReturnMessageCallback();
+ Callable<Void> callBack = new Callable<Void>() {
@Override
- public Void get(long timeout, TimeUnit unit) throws Exception {
- asyncReturnMessage.get(timeout, unit);
+ public Void call() throws Exception {
+ returnMessageCallback.call();
return null;
}
};
- ASYNC_RETURN_VALUE.set(asyncGet);
+ RETURN_VALUE_CALLBACK.set(callBack);
}
@Override
@@ -397,7 +394,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.setOwner(null, req.build());
- setAsyncReturnValue();
+ setReturnValueCallback();
} else {
rpcProxy.setOwner(null, req.build());
}
@@ -529,7 +526,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req);
- setAsyncReturnValue();
+ setReturnValueCallback();
} else {
rpcProxy.rename2(null, req);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/8] hadoop git commit: Revert "HDFS-10346. Implement asynchronous
setPermission/setOwner for DistributedFileSystem. Contributed by Xiaobing
Zhou"
Posted by wa...@apache.org.
Revert "HDFS-10346. Implement asynchronous setPermission/setOwner for DistributedFileSystem. Contributed by Xiaobing Zhou"
This reverts commit ac049004507b8df5d22694300fa845427d757866.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffa85f28
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffa85f28
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffa85f28
Branch: refs/heads/branch-2.8
Commit: ffa85f28a2ec612a3d63c890806a17cfd3a2e7e2
Parents: 308d286
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:31 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:31 2016 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 59 ----
.../ClientNamenodeProtocolTranslatorPB.java | 39 +--
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 267 ++-----------------
.../apache/hadoop/hdfs/TestDFSPermission.java | 29 +-
4 files changed, 43 insertions(+), 351 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffa85f28/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 4fe0861..356ae3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.ipc.Client;
@@ -38,9 +37,6 @@ import com.google.common.util.concurrent.AbstractFuture;
* This instance of this class is the way end-user code interacts
* with a Hadoop DistributedFileSystem in an asynchronous manner.
*
- * This class is unstable, so no guarantee is provided as to reliability,
- * stability or compatibility across any level of release granularity.
- *
*****************************************************************/
@Unstable
public class AsyncDistributedFileSystem {
@@ -115,59 +111,4 @@ public class AsyncDistributedFileSystem {
Client.setAsynchronousMode(isAsync);
}
}
-
- /**
- * Set permission of a path.
- *
- * @param p
- * the path the permission is set to
- * @param permission
- * the permission that is set to a path.
- * @return an instance of Future, #get of which is invoked to wait for
- * asynchronous call being finished.
- */
- public Future<Void> setPermission(Path p, final FsPermission permission)
- throws IOException {
- dfs.getFsStatistics().incrementWriteOps(1);
- final Path absPath = dfs.fixRelativePart(p);
- final boolean isAsync = Client.isAsynchronousMode();
- Client.setAsynchronousMode(true);
- try {
- dfs.getClient().setPermission(dfs.getPathName(absPath), permission);
- return getReturnValue();
- } finally {
- Client.setAsynchronousMode(isAsync);
- }
- }
-
- /**
- * Set owner of a path (i.e. a file or a directory). The parameters username
- * and groupname cannot both be null.
- *
- * @param p
- * The path
- * @param username
- * If it is null, the original username remains unchanged.
- * @param groupname
- * If it is null, the original groupname remains unchanged.
- * @return an instance of Future, #get of which is invoked to wait for
- * asynchronous call being finished.
- */
- public Future<Void> setOwner(Path p, String username, String groupname)
- throws IOException {
- if (username == null && groupname == null) {
- throw new IOException("username == null && groupname == null");
- }
-
- dfs.getFsStatistics().incrementWriteOps(1);
- final Path absPath = dfs.fixRelativePart(p);
- final boolean isAsync = Client.isAsynchronousMode();
- Client.setAsynchronousMode(true);
- try {
- dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname);
- return getReturnValue();
- } finally {
- Client.setAsynchronousMode(isAsync);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffa85f28/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 94c6c0f..75fba21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -358,30 +358,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setPermission(PBHelperClient.convert(permission))
.build();
try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.setPermission(null, req);
- setReturnValueCallback();
- } else {
- rpcProxy.setPermission(null, req);
- }
+ rpcProxy.setPermission(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
- private void setReturnValueCallback() {
- final Callable<Message> returnMessageCallback = ProtobufRpcEngine
- .getReturnMessageCallback();
- Callable<Void> callBack = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- returnMessageCallback.call();
- return null;
- }
- };
- RETURN_VALUE_CALLBACK.set(callBack);
- }
-
@Override
public void setOwner(String src, String username, String groupname)
throws IOException {
@@ -392,12 +374,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (groupname != null)
req.setGroupname(groupname);
try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.setOwner(null, req.build());
- setReturnValueCallback();
- } else {
- rpcProxy.setOwner(null, req.build());
- }
+ rpcProxy.setOwner(null, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -526,7 +503,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req);
- setReturnValueCallback();
+
+ final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+ .getReturnMessageCallback();
+ Callable<Void> callBack = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ returnMessageCallback.call();
+ return null;
+ }
+ };
+ RETURN_VALUE_CALLBACK.set(callBack);
} else {
rpcProxy.rename2(null, req);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffa85f28/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 7539fbd..d129299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -22,11 +22,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -34,30 +31,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestAsyncDFSRename {
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- private final long seed = Time.now();
- private final Random r = new Random(seed);
- private final PermissionGenerator permGenerator = new PermissionGenerator(r);
- private final short replFactor = 2;
- private final long blockSize = 512;
- private long fileLen = blockSize * 3;
/**
* Check the blocks of dst file are cleaned after rename with overwrite
@@ -65,6 +50,8 @@ public class TestAsyncDFSRename {
*/
@Test(timeout = 60000)
public void testAsyncRenameWithOverwrite() throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
replFactor).build();
@@ -73,6 +60,8 @@ public class TestAsyncDFSRename {
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
try {
+
+ long fileLen = blockSize * 3;
String src = "/foo/src";
String dst = "/foo/dst";
String src2 = "/foo/src2";
@@ -126,6 +115,8 @@ public class TestAsyncDFSRename {
@Test(timeout = 60000)
public void testCallGetReturnValueMultipleTimes() throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
final Path renameDir = new Path(
"/test/testCallGetReturnValueMultipleTimes/");
final Configuration conf = new HdfsConfiguration();
@@ -136,6 +127,7 @@ public class TestAsyncDFSRename {
final DistributedFileSystem dfs = cluster.getFileSystem();
final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
final int count = 100;
+ long fileLen = blockSize * 3;
final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
@@ -186,15 +178,15 @@ public class TestAsyncDFSRename {
}
}
- @Test
- public void testConservativeConcurrentAsyncRenameWithOverwrite()
+ @Test(timeout = 120000)
+ public void testAggressiveConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(100,
"testAggressiveConcurrentAsyncRenameWithOverwrite");
}
@Test(timeout = 60000)
- public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+ public void testConservativeConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(10000,
"testConservativeConcurrentAsyncRenameWithOverwrite");
@@ -202,6 +194,8 @@ public class TestAsyncDFSRename {
private void internalTestConcurrentAsyncRenameWithOverwrite(
final int asyncCallLimit, final String basePath) throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
final Path renameDir = new Path(String.format("/test/%s/", basePath));
Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
@@ -212,6 +206,7 @@ public class TestAsyncDFSRename {
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
+ long fileLen = blockSize * 3;
int start = 0, end = 0;
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
@@ -279,206 +274,8 @@ public class TestAsyncDFSRename {
}
}
- @Test
- public void testConservativeConcurrentAsyncAPI() throws Exception {
- internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
- }
-
- @Test(timeout = 60000)
- public void testAggressiveConcurrentAsyncAPI() throws Exception {
- internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
- }
-
- private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
- final String basePath) throws Exception {
- Configuration conf = new HdfsConfiguration();
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
- int count = 500;
-
- // explicitly turn on permission checking
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
- // set the limit of max async calls
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- asyncCallLimit);
-
- // create fake mapping for the groups
- Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
- u2gMap.put(user1, new String[] {group1, group2});
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
- // start mini cluster
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(3).build();
- cluster.waitActive();
- AsyncDistributedFileSystem adfs = cluster.getFileSystem()
- .getAsyncDistributedFileSystem();
-
- // prepare for test
- FileSystem rootFs = FileSystem.get(conf);
- final Path parent = new Path(String.format("/test/%s/", basePath));
- final Path[] srcs = new Path[count];
- final Path[] dsts = new Path[count];
- short[] permissions = new short[count];
- for (int i = 0; i < count; i++) {
- srcs[i] = new Path(parent, "src" + i);
- dsts[i] = new Path(parent, "dst" + i);
- DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
- DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
- assertTrue(rootFs.exists(srcs[i]));
- assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
- assertTrue(rootFs.exists(dsts[i]));
- assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
- permissions[i] = permGenerator.next();
- }
-
- Map<Integer, Future<Void>> renameRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<Void>> permRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<Void>> ownerRetFutures =
- new HashMap<Integer, Future<Void>>();
- int start = 0, end = 0;
- // test rename
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
- Rename.OVERWRITE);
- renameRetFutures.put(i, returnFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(renameRetFutures, start, end);
- }
- }
- }
-
- // wait for completing the calls
- for (int i = start; i < count; i++) {
- renameRetFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src should not exist, dst should
- for (int i = 0; i < count; i++) {
- assertFalse(rootFs.exists(srcs[i]));
- assertTrue(rootFs.exists(dsts[i]));
- }
-
- // test permissions
- try {
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setPermission(dsts[i],
- new FsPermission(permissions[i]));
- permRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(permRetFutures, start, end);
- }
- }
- }
- // wait for completing the calls
- for (int i = start; i < count; i++) {
- permRetFutures.get(i).get();
- }
-
- // Restart NN and check permission then
- cluster.restartNameNodes();
-
- // verify the permission
- for (int i = 0; i < count; i++) {
- assertTrue(rootFs.exists(dsts[i]));
- FsPermission fsPerm = new FsPermission(permissions[i]);
- checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
- fsPerm.getUserAction());
- }
-
- // test setOwner
- start = 0;
- end = 0;
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
- "group2");
- ownerRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(ownerRetFutures, start, end);
- }
- }
- }
- // wait for completing the calls
- for (int i = start; i < count; i++) {
- ownerRetFutures.get(i).get();
- }
-
- // Restart NN and check owner then
- cluster.restartNameNodes();
-
- // verify the owner
- for (int i = 0; i < count; i++) {
- assertTrue(rootFs.exists(dsts[i]));
- assertTrue(
- "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
- assertTrue(
- "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
- }
- } catch (AccessControlException ace) {
- throw ace;
- } finally {
- if (rootFs != null) {
- rootFs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- static void checkAccessPermissions(FileStatus stat, FsAction mode)
- throws IOException {
- checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
- }
-
- static void checkAccessPermissions(final UserGroupInformation ugi,
- FileStatus stat, FsAction mode) throws IOException {
- FsPermission perm = stat.getPermission();
- String user = ugi.getShortUserName();
- List<String> groups = Arrays.asList(ugi.getGroupNames());
-
- if (user.equals(stat.getOwner())) {
- if (perm.getUserAction().implies(mode)) {
- return;
- }
- } else if (groups.contains(stat.getGroup())) {
- if (perm.getGroupAction().implies(mode)) {
- return;
- }
- } else {
- if (perm.getOtherAction().implies(mode)) {
- return;
- }
- }
- throw new AccessControlException(String.format(
- "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
- .getPath(), stat.getOwner(), stat.getGroup(),
- stat.isDirectory() ? "d" : "-", perm));
- }
-
@Test(timeout = 60000)
- public void testAsyncAPIWithException() throws Exception {
+ public void testAsyncRenameWithException() throws Exception {
Configuration conf = new HdfsConfiguration();
String group1 = "group1";
String group2 = "group2";
@@ -489,9 +286,9 @@ public class TestAsyncDFSRename {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
- Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
- u2gMap.put(user1, new String[] {group1, group2});
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+ Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+ u2g_map.put(user1, new String[] { group1, group2 });
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
// Initiate all four users
ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
@@ -502,7 +299,7 @@ public class TestAsyncDFSRename {
cluster.waitActive();
FileSystem rootFs = FileSystem.get(conf);
- final Path renameDir = new Path("/test/async_api_exception/");
+ final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
@@ -515,33 +312,11 @@ public class TestAsyncDFSRename {
}
});
- Future<Void> retFuture;
- try {
- retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- retFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the path parent", e
- .getMessage().contains(src.getParent().toUri().getPath()));
- }
-
- FsPermission fsPerm = new FsPermission(permGenerator.next());
- try {
- retFuture = adfs.setPermission(src, fsPerm);
- retFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the name of the path",
- e.getMessage().contains(src.getName()));
- }
-
try {
- retFuture = adfs.setOwner(src, "user1", "group2");
- retFuture.get();
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFuture.get();
} catch (ExecutionException e) {
checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the name of the path",
- e.getMessage().contains(src.getName()));
} finally {
if (rootFs != null) {
rootFs.close();
@@ -559,5 +334,7 @@ public class TestAsyncDFSRename {
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
.getMessage().contains(user));
+ assertTrue("Permission denied messages must carry the path parent", e
+ .getMessage().contains(dir.getParent().toUri().getPath()));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffa85f28/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
index 66a0380..aa204cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
@@ -196,35 +196,22 @@ public class TestDFSPermission {
return fs.getFileStatus(path).getPermission().toShort();
}
- private void create(OpType op, Path name) throws IOException {
- create(fs, conf, op, name);
- }
-
/* create a file/directory with the default umask and permission */
- static void create(final FileSystem fs, final Configuration fsConf,
- OpType op, Path name) throws IOException {
- create(fs, fsConf, op, name, DEFAULT_UMASK, new FsPermission(
- DEFAULT_PERMISSION));
- }
-
- private void create(OpType op, Path name, short umask,
- FsPermission permission)
- throws IOException {
- create(fs, conf, op, name, umask, permission);
+ private void create(OpType op, Path name) throws IOException {
+ create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION));
}
/* create a file/directory with the given umask and permission */
- static void create(final FileSystem fs, final Configuration fsConf,
- OpType op, Path name, short umask, FsPermission permission)
- throws IOException {
+ private void create(OpType op, Path name, short umask,
+ FsPermission permission) throws IOException {
// set umask in configuration, converting to padded octal
- fsConf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
+ conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
// create the file/directory
switch (op) {
case CREATE:
FSDataOutputStream out = fs.create(name, permission, true,
- fsConf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+ conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
fs.getDefaultReplication(name), fs.getDefaultBlockSize(name), null);
out.close();
break;
@@ -372,7 +359,7 @@ public class TestDFSPermission {
final static private String DIR_NAME = "dir";
final static private String FILE_DIR_NAME = "filedir";
- enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
+ private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE
};
@@ -628,7 +615,7 @@ public class TestDFSPermission {
/* A random permission generator that guarantees that each permission
* value is generated only once.
*/
- static class PermissionGenerator {
+ static private class PermissionGenerator {
private final Random r;
private final short[] permissions = new short[MAX_PERMISSION + 1];
private int numLeft = MAX_PERMISSION + 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org