You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2016/06/06 08:50:56 UTC
[1/8] hadoop git commit: Revert "Revert "HDFS-10224. Implement
asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou""
Repository: hadoop
Updated Branches:
refs/heads/trunk 106234d87 -> 35f255b03
Revert "Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou""
This reverts commit 106234d873c60fa52cd0d812fb1cdc0c6b998a6d.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eded3d10
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eded3d10
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eded3d10
Branch: refs/heads/trunk
Commit: eded3d109e4c5225d8c5cd3c2d82e7ac93841263
Parents: 106234d
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:28:21 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:28:21 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileSystem.java | 1 -
.../main/java/org/apache/hadoop/ipc/Client.java | 11 +-
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 34 ++-
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 2 +-
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 ++++++++
.../hadoop/hdfs/DistributedFileSystem.java | 22 +-
.../ClientNamenodeProtocolTranslatorPB.java | 45 +++-
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 258 +++++++++++++++++++
8 files changed, 463 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0ecd8b7..9e13a7a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1252,7 +1252,6 @@ public abstract class FileSystem extends Configured implements Closeable {
/**
* Renames Path src to Path dst
* <ul>
- * <li
* <li>Fails if src is a file and dst is a directory.
* <li>Fails if src is a directory and dst is a file.
* <li>Fails if the parent of dst does not exist or is a file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index f206861..d59aeb89 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -119,7 +119,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>>
+ RETURN_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -130,8 +131,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Future<T> getReturnValue() {
- return (Future<T>) returnValue.get();
+ public static <T> Future<T> getReturnRpcResponse() {
+ return (Future<T>) RETURN_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -1396,7 +1397,7 @@ public class Client implements AutoCloseable {
}
};
- returnValue.set(returnFuture);
+ RETURN_RPC_RESPONSE.set(returnFuture);
return null;
} else {
return getRpcResponse(call, connection);
@@ -1410,7 +1411,7 @@ public class Client implements AutoCloseable {
* synchronous mode.
*/
@Unstable
- static boolean isAsynchronousMode() {
+ public static boolean isAsynchronousMode() {
return asynchronousMode.get();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 071e2e8..8fcdb78 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -26,7 +26,9 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
@@ -67,7 +70,9 @@ import com.google.protobuf.TextFormat;
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-
+ private static final ThreadLocal<Callable<?>>
+ RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
+
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@@ -76,6 +81,12 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
+ @SuppressWarnings("unchecked")
+ @Unstable
+ public static <T> Callable<T> getReturnMessageCallback() {
+ return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
+ }
+
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
@@ -189,7 +200,7 @@ public class ProtobufRpcEngine implements RpcEngine {
* the server.
*/
@Override
- public Object invoke(Object proxy, Method method, Object[] args)
+ public Object invoke(Object proxy, final Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
@@ -251,6 +262,23 @@ public class ProtobufRpcEngine implements RpcEngine {
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
+ if (Client.isAsynchronousMode()) {
+ final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+ Callable<Message> callback = new Callable<Message>() {
+ @Override
+ public Message call() throws Exception {
+ return getReturnMessage(method, frrw.get());
+ }
+ };
+ RETURN_MESSAGE_CALLBACK.set(callback);
+ return null;
+ } else {
+ return getReturnMessage(method, val);
+ }
+ }
+
+ private Message getReturnMessage(final Method method,
+ final RpcResponseWrapper rrw) throws ServiceException {
Message prototype = null;
try {
prototype = getReturnProtoType(method);
@@ -260,7 +288,7 @@ public class ProtobufRpcEngine implements RpcEngine {
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
- .mergeFrom(val.theResponseRead).build();
+ .mergeFrom(rrw.theResponseRead).build();
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index de4395e..6cf75c7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -84,7 +84,7 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getReturnValue();
+ Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
new file mode 100644
index 0000000..37899aa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.ipc.Client;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+/****************************************************************
+ * Implementation of the asynchronous distributed file system.
+ * This instance of this class is the way end-user code interacts
+ * with a Hadoop DistributedFileSystem in an asynchronous manner.
+ *
+ *****************************************************************/
+@Unstable
+public class AsyncDistributedFileSystem {
+
+ private final DistributedFileSystem dfs;
+
+ AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
+ this.dfs = dfs;
+ }
+
+ static <T> Future<T> getReturnValue() {
+ final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+ .getReturnValueCallback();
+ Future<T> returnFuture = new AbstractFuture<T>() {
+ public T get() throws InterruptedException, ExecutionException {
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
+ }
+ return super.get();
+ }
+ };
+ return returnFuture;
+ }
+
+ /**
+ * Renames Path src to Path dst
+ * <ul>
+ * <li>Fails if src is a file and dst is a directory.
+ * <li>Fails if src is a directory and dst is a file.
+ * <li>Fails if the parent of dst does not exist or is a file.
+ * </ul>
+ * <p>
+ * If OVERWRITE option is not passed as an argument, rename fails if the dst
+ * already exists.
+ * <p>
+ * If OVERWRITE option is passed as an argument, rename overwrites the dst if
+ * it is a file or an empty directory. Rename fails if dst is a non-empty
+ * directory.
+ * <p>
+ * Note that atomicity of rename is dependent on the file system
+ * implementation. Please refer to the file system documentation for details.
+ * This default implementation is non atomic.
+ *
+ * @param src
+ * path to be renamed
+ * @param dst
+ * new path after rename
+ * @throws IOException
+ * on failure
+ * @return an instance of Future, #get of which is invoked to wait for
+ * asynchronous call being finished.
+ */
+ public Future<Void> rename(Path src, Path dst,
+ final Options.Rename... options) throws IOException {
+ dfs.getFsStatistics().incrementWriteOps(1);
+
+ final Path absSrc = dfs.fixRelativePart(src);
+ final Path absDst = dfs.fixRelativePart(dst);
+
+ final boolean isAsync = Client.isAsynchronousMode();
+ Client.setAsynchronousMode(true);
+ try {
+ dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
+ options);
+ return getReturnValue();
+ } finally {
+ Client.setAsynchronousMode(isAsync);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 5e54edd..0ae4d70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
@@ -204,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
* @return path component of {file}
* @throws IllegalArgumentException if URI does not belong to this DFS
*/
- private String getPathName(Path file) {
+ String getPathName(Path file) {
checkPath(file);
String result = file.toUri().getPath();
if (!DFSUtilClient.isValidName(result)) {
@@ -2509,4 +2510,23 @@ public class DistributedFileSystem extends FileSystem {
}
return ret;
}
+
+ private final AsyncDistributedFileSystem adfs =
+ new AsyncDistributedFileSystem(this);
+
+ /** @return an {@link AsyncDistributedFileSystem} object. */
+ @Unstable
+ public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
+ return adfs;
+ }
+
+ @Override
+ protected Path fixRelativePart(Path p) {
+ return super.fixRelativePart(p);
+ }
+
+ Statistics getFsStatistics() {
+ return statistics;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 513a5e3..f4074b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,11 +24,14 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
+import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -135,7 +139,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@@ -153,13 +156,15 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.*;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@@ -177,8 +182,9 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -190,12 +196,9 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
-import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
-import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
- .EncryptionZoneProto;
-
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@@ -206,6 +209,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
+ private static final ThreadLocal<Callable<?>>
+ RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -239,6 +244,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
+ @SuppressWarnings("unchecked")
+ @Unstable
+ public static <T> Callable<T> getReturnValueCallback() {
+ return (Callable<T>) RETURN_VALUE_CALLBACK.get();
+ }
+
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -475,6 +486,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
RenameRequestProto req = RenameRequestProto.newBuilder()
.setSrc(src)
.setDst(dst).build();
+
try {
return rpcProxy.rename(null, req).getResult();
} catch (ServiceException e) {
@@ -499,7 +511,22 @@ public class ClientNamenodeProtocolTranslatorPB implements
setDst(dst).setOverwriteDest(overwrite).
build();
try {
- rpcProxy.rename2(null, req);
+ if (Client.isAsynchronousMode()) {
+ rpcProxy.rename2(null, req);
+
+ final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+ .getReturnMessageCallback();
+ Callable<Void> callBack = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ returnMessageCallback.call();
+ return null;
+ }
+ };
+ RETURN_VALUE_CALLBACK.set(callBack);
+ } else {
+ rpcProxy.rename2(null, req);
+ }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
new file mode 100644
index 0000000..9322e1a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAsyncDFSRename {
+ final Path asyncRenameDir = new Path("/test/async_rename/");
+ public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+ final private static Configuration CONF = new HdfsConfiguration();
+
+ final private static String GROUP1_NAME = "group1";
+ final private static String GROUP2_NAME = "group2";
+ final private static String USER1_NAME = "user1";
+ private static final UserGroupInformation USER1;
+
+ private MiniDFSCluster gCluster;
+
+ static {
+ // explicitly turn on permission checking
+ CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+ u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+ DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+ // Initiate all four users
+ USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+ GROUP1_NAME, GROUP2_NAME });
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+ gCluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (gCluster != null) {
+ gCluster.shutdown();
+ gCluster = null;
+ }
+ }
+
+ static int countLease(MiniDFSCluster cluster) {
+ return TestDFSRename.countLease(cluster);
+ }
+
+ void list(DistributedFileSystem dfs, String name) throws IOException {
+ FileSystem.LOG.info("\n\n" + name);
+ for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+ FileSystem.LOG.info("" + s.getPath());
+ }
+ }
+
+ static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+ DataOutputStream a_out = dfs.create(f);
+ a_out.writeBytes("something");
+ a_out.close();
+ }
+
+ /**
+ * Check the blocks of dst file are cleaned after rename with overwrite
+ * Restart NN to check the rename successfully
+ */
+ @Test
+ public void testAsyncRenameWithOverwrite() throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ replFactor).build();
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+
+ try {
+
+ long fileLen = blockSize * 3;
+ String src = "/foo/src";
+ String dst = "/foo/dst";
+ String src2 = "/foo/src2";
+ String dst2 = "/foo/dst2";
+ Path srcPath = new Path(src);
+ Path dstPath = new Path(dst);
+ Path srcPath2 = new Path(src2);
+ Path dstPath2 = new Path(dst2);
+
+ DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
+
+ LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), dst, 0, fileLen);
+ LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), dst2, 0, fileLen);
+ BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
+ .getBlockManager();
+ assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) != null);
+ assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) != null);
+
+ Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+ Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
+ retVal1.get();
+ retVal2.get();
+
+ assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) == null);
+ assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) == null);
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+ assertFalse(dfs.exists(srcPath));
+ assertTrue(dfs.exists(dstPath));
+ assertFalse(dfs.exists(srcPath2));
+ assertTrue(dfs.exists(dstPath2));
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
+ final Path renameDir = new Path(
+ "/test/concurrent_reanme_with_overwrite_dir/");
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ int count = 1000;
+
+ try {
+ long fileLen = blockSize * 3;
+ assertTrue(dfs.mkdirs(renameDir));
+
+ Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+ // concurrently invoking many rename
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ }
+
+ // wait for completing the calls
+ for (int i = 0; i < count; i++) {
+ returnFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src dir should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ assertFalse(dfs.exists(src));
+ assertTrue(dfs.exists(dst));
+ }
+ } finally {
+ dfs.delete(renameDir, true);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testAsyncRenameWithException() throws Exception {
+ FileSystem rootFs = FileSystem.get(CONF);
+ final Path renameDir = new Path("/test/async_rename_exception/");
+ final Path src = new Path(renameDir, "src");
+ final Path dst = new Path(renameDir, "dst");
+ rootFs.mkdirs(src);
+
+ AsyncDistributedFileSystem adfs = USER1
+ .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+ @Override
+ public AsyncDistributedFileSystem run() throws Exception {
+ return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+ }
+ });
+
+ try {
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src);
+ }
+ }
+
+ private void checkPermissionDenied(final Exception e, final Path dir) {
+ assertTrue(e.getCause() instanceof ExecutionException);
+ assertTrue("Permission denied messages must carry AccessControlException",
+ e.getMessage().contains("AccessControlException"));
+ assertTrue("Permission denied messages must carry the username", e
+ .getMessage().contains(USER1_NAME));
+ assertTrue("Permission denied messages must carry the path parent", e
+ .getMessage().contains(dir.getParent().toUri().getPath()));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[5/8] hadoop git commit: Revert "Revert "HDFS-10390. Implement
asynchronous setAcl/getAclStatus for DistributedFileSystem. Contributed by
Xiaobing Zhou""
Posted by sz...@apache.org.
Revert "Revert "HDFS-10390. Implement asynchronous setAcl/getAclStatus for DistributedFileSystem. Contributed by Xiaobing Zhou""
This reverts commit b82c74b9102ba95eae776501ed4484be9edd8c96.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3d81f38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3d81f38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3d81f38
Branch: refs/heads/trunk
Commit: b3d81f38da5d3d913e7b7ed498198c899c1e68b7
Parents: 574dcd3
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:31:30 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:31:30 2016 +0800
----------------------------------------------------------------------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 59 ++++
.../hadoop/hdfs/DistributedFileSystem.java | 3 +
.../ClientNamenodeProtocolTranslatorPB.java | 30 +-
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 310 +++++++++++++++++++
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 15 +-
.../hdfs/server/namenode/FSAclBaseTest.java | 12 +-
6 files changed, 411 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3d81f38/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 6bfd71d..29bac2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,12 +19,16 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
@@ -83,6 +87,7 @@ public class AsyncDistributedFileSystem {
public Future<Void> rename(Path src, Path dst,
final Options.Rename... options) throws IOException {
dfs.getFsStatistics().incrementWriteOps(1);
+ dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.RENAME);
final Path absSrc = dfs.fixRelativePart(src);
final Path absDst = dfs.fixRelativePart(dst);
@@ -111,6 +116,7 @@ public class AsyncDistributedFileSystem {
public Future<Void> setPermission(Path p, final FsPermission permission)
throws IOException {
dfs.getFsStatistics().incrementWriteOps(1);
+ dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_PERMISSION);
final Path absPath = dfs.fixRelativePart(p);
final boolean isAsync = Client.isAsynchronousMode();
Client.setAsynchronousMode(true);
@@ -142,6 +148,7 @@ public class AsyncDistributedFileSystem {
}
dfs.getFsStatistics().incrementWriteOps(1);
+ dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_OWNER);
final Path absPath = dfs.fixRelativePart(p);
final boolean isAsync = Client.isAsynchronousMode();
Client.setAsynchronousMode(true);
@@ -152,4 +159,56 @@ public class AsyncDistributedFileSystem {
Client.setAsynchronousMode(isAsync);
}
}
+
+ /**
+ * Fully replaces ACL of files and directories, discarding all existing
+ * entries.
+ *
+ * @param p
+ * Path to modify
+ * @param aclSpec
+ * List<AclEntry> describing modifications, must include entries for
+ * user, group, and others for compatibility with permission bits.
+ * @throws IOException
+ * if an ACL could not be modified
+ * @return an instance of Future, #get of which is invoked to wait for
+ * asynchronous call being finished.
+ */
+ public Future<Void> setAcl(Path p, final List<AclEntry> aclSpec)
+ throws IOException {
+ dfs.getFsStatistics().incrementWriteOps(1);
+ dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_ACL);
+ final Path absPath = dfs.fixRelativePart(p);
+ final boolean isAsync = Client.isAsynchronousMode();
+ Client.setAsynchronousMode(true);
+ try {
+ dfs.getClient().setAcl(dfs.getPathName(absPath), aclSpec);
+ return getReturnValue();
+ } finally {
+ Client.setAsynchronousMode(isAsync);
+ }
+ }
+
+ /**
+ * Gets the ACL of a file or directory.
+ *
+ * @param p
+ * Path to get
+ * @return AclStatus describing the ACL of the file or directory
+ * @throws IOException
+ * if an ACL could not be read
+ * @return an instance of Future, #get of which is invoked to wait for
+ * asynchronous call being finished.
+ */
+ public Future<AclStatus> getAclStatus(Path p) throws IOException {
+ final Path absPath = dfs.fixRelativePart(p);
+ final boolean isAsync = Client.isAsynchronousMode();
+ Client.setAsynchronousMode(true);
+ try {
+ dfs.getClient().getAclStatus(dfs.getPathName(absPath));
+ return getReturnValue();
+ } finally {
+ Client.setAsynchronousMode(isAsync);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3d81f38/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 0ae4d70..66ee42f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -2529,4 +2529,7 @@ public class DistributedFileSystem extends FileSystem {
return statistics;
}
+ DFSOpsCountStatistics getDFSOpsCountStatistics() {
+ return storageStatistics;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3d81f38/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 939c1ac..2373da7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto;
@@ -163,7 +164,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Trunca
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
@@ -1346,7 +1347,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
.addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec))
.build();
try {
- rpcProxy.setAcl(null, req);
+ if (Client.isAsynchronousMode()) {
+ rpcProxy.setAcl(null, req);
+ setAsyncReturnValue();
+ } else {
+ rpcProxy.setAcl(null, req);
+ }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -1357,7 +1363,25 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetAclStatusRequestProto req = GetAclStatusRequestProto.newBuilder()
.setSrc(src).build();
try {
- return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
+ if (Client.isAsynchronousMode()) {
+ rpcProxy.getAclStatus(null, req);
+ final AsyncGet<Message, Exception> asyncReturnMessage
+ = ProtobufRpcEngine.getAsyncReturnMessage();
+ final AsyncGet<AclStatus, Exception> asyncGet =
+ new AsyncGet<AclStatus, Exception>() {
+ @Override
+ public AclStatus get(long timeout, TimeUnit unit)
+ throws Exception {
+ return PBHelperClient
+ .convert((GetAclStatusResponseProto) asyncReturnMessage
+ .get(timeout, unit));
+ }
+ };
+ ASYNC_RETURN_VALUE.set(asyncGet);
+ return null;
+ } else {
+ return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
+ }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3d81f38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
new file mode 100644
index 0000000..67262dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
+import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.fs.permission.FsAction.NONE;
+import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
+import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Unit tests for asynchronous distributed filesystem.
+ * */
+public class TestAsyncDFS {
+ public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
+ private static final int NUM_TESTS = 1000;
+ private static final int NUM_NN_HANDLER = 10;
+ private static final int ASYNC_CALL_LIMIT = 100;
+
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private FileSystem fs;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new HdfsConfiguration();
+ // explicitly turn on acl
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+ // explicitly turn on ACL
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+ // set the limit of max async calls
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ ASYNC_CALL_LIMIT);
+ // set server handlers
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+ fs = FileSystem.get(conf);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ static class AclQueueEntry {
+ private final Object future;
+ private final Path path;
+ private final Boolean isSetAcl;
+
+ AclQueueEntry(final Object future, final Path path,
+ final Boolean isSetAcl) {
+ this.future = future;
+ this.path = path;
+ this.isSetAcl = isSetAcl;
+ }
+
+ public final Object getFuture() {
+ return future;
+ }
+
+ public final Path getPath() {
+ return path;
+ }
+
+ public final Boolean isSetAcl() {
+ return this.isSetAcl;
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testBatchAsyncAcl() throws Exception {
+ final String basePath = "testBatchAsyncAcl";
+ final Path parent = new Path(String.format("/test/%s/", basePath));
+
+ AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+ .getAsyncDistributedFileSystem();
+
+ // prepare test
+ int count = NUM_TESTS;
+ final Path[] paths = new Path[count];
+ for (int i = 0; i < count; i++) {
+ paths[i] = new Path(parent, "acl" + i);
+ FileSystem.mkdirs(fs, paths[i],
+ FsPermission.createImmutable((short) 0750));
+ assertTrue(fs.exists(paths[i]));
+ assertTrue(fs.getFileStatus(paths[i]).isDirectory());
+ }
+
+ final List<AclEntry> aclSpec = getAclSpec();
+ final AclEntry[] expectedAclSpec = getExpectedAclSpec();
+ Map<Integer, Future<Void>> setAclRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<AclStatus>> getAclRetFutures =
+ new HashMap<Integer, Future<AclStatus>>();
+ int start = 0, end = 0;
+ try {
+ // test setAcl
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
+ setAclRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForAclReturnValues(setAclRetFutures, start, end);
+ }
+ }
+ }
+ waitForAclReturnValues(setAclRetFutures, end, count);
+
+ // test getAclStatus
+ start = 0;
+ end = 0;
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
+ getAclRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForAclReturnValues(getAclRetFutures, start, end, paths,
+ expectedAclSpec);
+ }
+ }
+ }
+ waitForAclReturnValues(getAclRetFutures, end, count, paths,
+ expectedAclSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void waitForAclReturnValues(
+ final Map<Integer, Future<Void>> aclRetFutures, final int start,
+ final int end) throws InterruptedException, ExecutionException {
+ for (int i = start; i < end; i++) {
+ aclRetFutures.get(i).get();
+ }
+ }
+
+ private void waitForAclReturnValues(
+ final Map<Integer, Future<AclStatus>> aclRetFutures, final int start,
+ final int end, final Path[] paths, final AclEntry[] expectedAclSpec)
+ throws InterruptedException, ExecutionException, IOException {
+ for (int i = start; i < end; i++) {
+ AclStatus aclStatus = aclRetFutures.get(i).get();
+ verifyGetAcl(aclStatus, expectedAclSpec, paths[i]);
+ }
+ }
+
+ private void verifyGetAcl(final AclStatus aclStatus,
+ final AclEntry[] expectedAclSpec, final Path path) throws IOException {
+ if (aclStatus == null) {
+ return;
+ }
+
+ // verify permission and acl
+ AclEntry[] returned = aclStatus.getEntries().toArray(new AclEntry[0]);
+ assertArrayEquals(expectedAclSpec, returned);
+ assertPermission(path, (short) 010770);
+ FSAclBaseTest.assertAclFeature(cluster, path, true);
+ }
+
+ private List<AclEntry> getAclSpec() {
+ return Lists.newArrayList(
+ aclEntry(ACCESS, USER, ALL),
+ aclEntry(ACCESS, USER, "foo", ALL),
+ aclEntry(ACCESS, GROUP, READ_EXECUTE),
+ aclEntry(ACCESS, OTHER, NONE),
+ aclEntry(DEFAULT, USER, "foo", ALL));
+ }
+
+ private AclEntry[] getExpectedAclSpec() {
+ return new AclEntry[] {
+ aclEntry(ACCESS, USER, "foo", ALL),
+ aclEntry(ACCESS, GROUP, READ_EXECUTE),
+ aclEntry(DEFAULT, USER, ALL),
+ aclEntry(DEFAULT, USER, "foo", ALL),
+ aclEntry(DEFAULT, GROUP, READ_EXECUTE),
+ aclEntry(DEFAULT, MASK, ALL),
+ aclEntry(DEFAULT, OTHER, NONE) };
+ }
+
+ private void assertPermission(final Path pathToCheck, final short perm)
+ throws IOException {
+ AclTestHelpers.assertPermission(fs, pathToCheck, perm);
+ }
+
+ @Test(timeout=60000)
+ public void testAsyncAPIWithException() throws Exception {
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+ UserGroupInformation ugi1;
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+ u2gMap.put(user1, new String[] {group1, group2});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+ // Initiate all four users
+ ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+ group1, group2 });
+
+ final Path parent = new Path("/test/async_api_exception/");
+ final Path aclDir = new Path(parent, "aclDir");
+ fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
+
+ AsyncDistributedFileSystem adfs = ugi1
+ .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+ @Override
+ public AsyncDistributedFileSystem run() throws Exception {
+ return cluster.getFileSystem().getAsyncDistributedFileSystem();
+ }
+ });
+
+ Future<Void> retFuture;
+ // test setAcl
+ try {
+ retFuture = adfs.setAcl(aclDir,
+ Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
+ retFuture.get();
+ fail("setAcl should fail with permission denied");
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, aclDir, user1);
+ }
+
+ // test getAclStatus
+ try {
+ Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
+ aclRetFuture.get();
+ fail("getAclStatus should fail with permission denied");
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, aclDir, user1);
+ }
+ }
+
+ public static void checkPermissionDenied(final Exception e, final Path dir,
+ final String user) {
+ assertTrue(e.getCause() instanceof ExecutionException);
+ assertTrue("Permission denied messages must carry AccessControlException",
+ e.getMessage().contains("AccessControlException"));
+ assertTrue("Permission denied messages must carry the username", e
+ .getMessage().contains(user));
+ assertTrue("Permission denied messages must carry the name of the path",
+ e.getMessage().contains(dir.getName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3d81f38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 7539fbd..03c8151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -520,7 +520,7 @@ public class TestAsyncDFSRename {
retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
retFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
+ TestAsyncDFS.checkPermissionDenied(e, src, user1);
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(src.getParent().toUri().getPath()));
}
@@ -530,7 +530,7 @@ public class TestAsyncDFSRename {
retFuture = adfs.setPermission(src, fsPerm);
retFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
+ TestAsyncDFS.checkPermissionDenied(e, src, user1);
assertTrue("Permission denied messages must carry the name of the path",
e.getMessage().contains(src.getName()));
}
@@ -539,7 +539,7 @@ public class TestAsyncDFSRename {
retFuture = adfs.setOwner(src, "user1", "group2");
retFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
+ TestAsyncDFS.checkPermissionDenied(e, src, user1);
assertTrue("Permission denied messages must carry the name of the path",
e.getMessage().contains(src.getName()));
} finally {
@@ -551,13 +551,4 @@ public class TestAsyncDFSRename {
}
}
}
-
- private void checkPermissionDenied(final Exception e, final Path dir,
- final String user) {
- assertTrue(e.getCause() instanceof ExecutionException);
- assertTrue("Permission denied messages must carry AccessControlException",
- e.getMessage().contains("AccessControlException"));
- assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(user));
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3d81f38/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
index 002f7c0..216147a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
@@ -1637,17 +1637,23 @@ public abstract class FSAclBaseTest {
assertAclFeature(path, expectAclFeature);
}
+ private static void assertAclFeature(Path pathToCheck,
+ boolean expectAclFeature) throws IOException {
+ assertAclFeature(cluster, pathToCheck, expectAclFeature);
+ }
+
/**
* Asserts whether or not the inode for a specific path has an AclFeature.
*
+ * @param miniCluster the cluster into which the path resides
* @param pathToCheck Path inode to check
* @param expectAclFeature boolean true if an AclFeature must be present,
* false if an AclFeature must not be present
* @throws IOException thrown if there is an I/O error
*/
- private static void assertAclFeature(Path pathToCheck,
- boolean expectAclFeature) throws IOException {
- AclFeature aclFeature = getAclFeature(pathToCheck, cluster);
+ public static void assertAclFeature(final MiniDFSCluster miniCluster,
+ Path pathToCheck, boolean expectAclFeature) throws IOException {
+ AclFeature aclFeature = getAclFeature(pathToCheck, miniCluster);
if (expectAclFeature) {
assertNotNull(aclFeature);
// Intentionally capturing a reference to the entries, not using nested
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[8/8] hadoop git commit: Revert "Revert "HADOOP-13226 Support async
call retry and failover.""
Posted by sz...@apache.org.
Revert "Revert "HADOOP-13226 Support async call retry and failover.""
This reverts commit 5360da8bd9f720384860f411bee081aef13b4bd4.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/35f255b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/35f255b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/35f255b0
Branch: refs/heads/trunk
Commit: 35f255b03b1bb5c94063ec1818af1d253ceee991
Parents: 7e7b1ae
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:31:43 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:31:43 2016 +0800
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 8 +-
.../hadoop/io/retry/AsyncCallHandler.java | 321 +++++++++++++++++++
.../org/apache/hadoop/io/retry/CallReturn.java | 75 +++++
.../hadoop/io/retry/RetryInvocationHandler.java | 134 ++++++--
.../apache/hadoop/io/retry/RetryPolicies.java | 4 +-
.../main/java/org/apache/hadoop/ipc/Client.java | 25 +-
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 13 +-
.../apache/hadoop/util/concurrent/AsyncGet.java | 17 +-
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 10 +-
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 7 +-
.../ClientNamenodeProtocolTranslatorPB.java | 42 +--
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 43 +--
.../apache/hadoop/hdfs/TestAsyncHDFSWithHA.java | 181 +++++++++++
.../hdfs/server/namenode/ha/HATestUtil.java | 9 +-
14 files changed, 775 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index ab8673b..a644aa5 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -345,7 +345,13 @@
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
- <!-- Synchronization performed on util.concurrent instance. -->
+ <!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
+ <Match>
+ <Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
+ <Method name="wait" />
+ <Bug pattern="WA_NOT_IN_LOOP" />
+ </Match>
+
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="stop" />
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
new file mode 100644
index 0000000..5a03b03
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Handle async calls. */
+@InterfaceAudience.Private
+public class AsyncCallHandler {
+ static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
+
+ private static final ThreadLocal<AsyncGet<?, Exception>>
+ LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
+ private static final ThreadLocal<AsyncGet<Object, Throwable>>
+ ASYNC_RETURN = new ThreadLocal<>();
+
+ /** @return the async return value from {@link AsyncCallHandler}. */
+ @InterfaceStability.Unstable
+ @SuppressWarnings("unchecked")
+ public static <R, T extends Throwable> AsyncGet<R, T> getAsyncReturn() {
+ final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
+ if (asyncGet != null) {
+ ASYNC_RETURN.set(null);
+ return asyncGet;
+ } else {
+ return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
+ }
+ }
+
+ /** For the lower rpc layers to set the async return value. */
+ @InterfaceStability.Unstable
+ public static void setLowerLayerAsyncReturn(
+ AsyncGet<?, Exception> asyncReturn) {
+ LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
+ }
+
+ private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
+ final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
+ Preconditions.checkNotNull(asyncGet);
+ LOWER_LAYER_ASYNC_RETURN.set(null);
+ return asyncGet;
+ }
+
+ /** A simple concurrent queue which keeping track the empty start time. */
+ static class ConcurrentQueue<T> {
+ private final Queue<T> queue = new LinkedList<>();
+ private long emptyStartTime = Time.monotonicNow();
+
+ synchronized int size() {
+ return queue.size();
+ }
+
+ /** Is the queue empty for more than the given time in millisecond? */
+ synchronized boolean isEmpty(long time) {
+ return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
+ }
+
+ synchronized void offer(T c) {
+ final boolean added = queue.offer(c);
+ Preconditions.checkState(added);
+ }
+
+ synchronized T poll() {
+ Preconditions.checkState(!queue.isEmpty());
+ final T t = queue.poll();
+ if (queue.isEmpty()) {
+ emptyStartTime = Time.monotonicNow();
+ }
+ return t;
+ }
+ }
+
+ /** A queue for handling async calls. */
+ static class AsyncCallQueue {
+ private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
+ private final Processor processor = new Processor();
+
+ void addCall(AsyncCall call) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add " + call);
+ }
+ queue.offer(call);
+ processor.tryStart();
+ }
+
+ void checkCalls() {
+ final int size = queue.size();
+ for (int i = 0; i < size; i++) {
+ final AsyncCall c = queue.poll();
+ if (!c.isDone()) {
+ queue.offer(c); // the call is not done yet, add it back.
+ }
+ }
+ }
+
+ /** Process the async calls in the queue. */
+ private class Processor {
+ static final long GRACE_PERIOD = 10*1000L;
+ static final long SLEEP_PERIOD = 100L;
+
+ private final AtomicReference<Thread> running = new AtomicReference<>();
+
+ boolean isRunning(Daemon d) {
+ return d == running.get();
+ }
+
+ void tryStart() {
+ final Thread current = Thread.currentThread();
+ if (running.compareAndSet(null, current)) {
+ final Daemon daemon = new Daemon() {
+ @Override
+ public void run() {
+ for (; isRunning(this);) {
+ try {
+ Thread.sleep(SLEEP_PERIOD);
+ } catch (InterruptedException e) {
+ kill(this);
+ return;
+ }
+
+ checkCalls();
+ tryStop(this);
+ }
+ }
+ };
+
+ final boolean set = running.compareAndSet(current, daemon);
+ Preconditions.checkState(set);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
+ }
+ daemon.start();
+ }
+ }
+
+ void tryStop(Daemon d) {
+ if (queue.isEmpty(GRACE_PERIOD)) {
+ kill(d);
+ }
+ }
+
+ void kill(Daemon d) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Killing " + d);
+ }
+ final boolean set = running.compareAndSet(d, null);
+ Preconditions.checkState(set);
+ }
+ }
+ }
+
+ static class AsyncValue<V> {
+ private V value;
+
+ synchronized V waitAsyncValue(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ if (value != null) {
+ return value;
+ }
+ AsyncGet.Util.wait(this, timeout, unit);
+ if (value != null) {
+ return value;
+ }
+
+ throw new TimeoutException("waitCallReturn timed out "
+ + timeout + " " + unit);
+ }
+
+ synchronized void set(V v) {
+ Preconditions.checkNotNull(v);
+ Preconditions.checkState(value == null);
+ value = v;
+ notify();
+ }
+
+ synchronized boolean isDone() {
+ return value != null;
+ }
+ }
+
+ static class AsyncCall extends RetryInvocationHandler.Call {
+ private final AsyncCallHandler asyncCallHandler;
+
+ private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
+ private AsyncGet<?, Exception> lowerLayerAsyncGet;
+
+ AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
+ RetryInvocationHandler.Counters counters,
+ RetryInvocationHandler<?> retryInvocationHandler,
+ AsyncCallHandler asyncCallHandler) {
+ super(method, args, isRpc, callId, counters, retryInvocationHandler);
+
+ this.asyncCallHandler = asyncCallHandler;
+ }
+
+ /** @return true if the call is done; otherwise, return false. */
+ boolean isDone() {
+ final CallReturn r = invokeOnce();
+ switch (r.getState()) {
+ case RETURNED:
+ case EXCEPTION:
+ asyncCallReturn.set(r); // the async call is done
+ return true;
+ case RETRY:
+ invokeOnce();
+ break;
+ case ASYNC_CALL_IN_PROGRESS:
+ case ASYNC_INVOKED:
+ // nothing to do
+ break;
+ default:
+ Preconditions.checkState(false);
+ }
+ return false;
+ }
+
+ @Override
+ CallReturn invoke() throws Throwable {
+ LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
+ if (lowerLayerAsyncGet != null) {
+ // async call was submitted early, check the lower level async call
+ final boolean isDone = lowerLayerAsyncGet.isDone();
+ LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
+ if (!isDone) {
+ return CallReturn.ASYNC_CALL_IN_PROGRESS;
+ }
+ try {
+ return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
+ } finally {
+ lowerLayerAsyncGet = null;
+ }
+ }
+
+ // submit a new async call
+ LOG.trace("invoke: ASYNC_INVOKED");
+ final boolean mode = Client.isAsynchronousMode();
+ try {
+ Client.setAsynchronousMode(true);
+ final Object r = invokeMethod();
+ // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
+ Preconditions.checkState(r == null);
+ lowerLayerAsyncGet = getLowerLayerAsyncReturn();
+
+ if (counters.isZeros()) {
+ // first async attempt, initialize
+ LOG.trace("invoke: initAsyncCall");
+ asyncCallHandler.initAsyncCall(this, asyncCallReturn);
+ }
+ return CallReturn.ASYNC_INVOKED;
+ } finally {
+ Client.setAsynchronousMode(mode);
+ }
+ }
+ }
+
+ private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
+ private volatile boolean hasSuccessfulCall = false;
+
+ AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
+ int callId, RetryInvocationHandler.Counters counters,
+ RetryInvocationHandler<?> retryInvocationHandler) {
+ return new AsyncCall(method, args, isRpc, callId, counters,
+ retryInvocationHandler, this);
+ }
+
+ boolean hasSuccessfulCall() {
+ return hasSuccessfulCall;
+ }
+
+ private void initAsyncCall(final AsyncCall asyncCall,
+ final AsyncValue<CallReturn> asyncCallReturn) {
+ asyncCalls.addCall(asyncCall);
+
+ final AsyncGet<Object, Throwable> asyncGet
+ = new AsyncGet<Object, Throwable>() {
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws Throwable {
+ final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
+ final Object r = c.getReturnValue();
+ hasSuccessfulCall = true;
+ return r;
+ }
+
+ @Override
+ public boolean isDone() {
+ return asyncCallReturn.isDone();
+ }
+ };
+ ASYNC_RETURN.set(asyncGet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
new file mode 100644
index 0000000..943725c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import com.google.common.base.Preconditions;
+
+/** The call return from a method invocation. */
+class CallReturn {
+ /** The return state. */
+ enum State {
+ /** Call is returned successfully. */
+ RETURNED,
+ /** Call throws an exception. */
+ EXCEPTION,
+ /** Call should be retried according to the {@link RetryPolicy}. */
+ RETRY,
+ /** Call, which is async, is still in progress. */
+ ASYNC_CALL_IN_PROGRESS,
+ /** Call, which is async, just has been invoked. */
+ ASYNC_INVOKED
+ }
+
+ static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
+ State.ASYNC_CALL_IN_PROGRESS);
+ static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
+ static final CallReturn RETRY = new CallReturn(State.RETRY);
+
+ private final Object returnValue;
+ private final Throwable thrown;
+ private final State state;
+
+ CallReturn(Object r) {
+ this(r, null, State.RETURNED);
+ }
+ CallReturn(Throwable t) {
+ this(null, t, State.EXCEPTION);
+ Preconditions.checkNotNull(t);
+ }
+ private CallReturn(State s) {
+ this(null, null, s);
+ }
+ private CallReturn(Object r, Throwable t, State s) {
+ Preconditions.checkArgument(r == null || t == null);
+ returnValue = r;
+ thrown = t;
+ state = s;
+ }
+
+ State getState() {
+ return state;
+ }
+
+ Object getReturnValue() throws Throwable {
+ if (state == State.EXCEPTION) {
+ throw thrown;
+ }
+ Preconditions.checkState(state == State.RETURNED, "state == %s", state);
+ return returnValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 300d0c2..f2b2c99 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -42,11 +42,83 @@ import java.util.Map;
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
- private static class Counters {
+ static class Call {
+ private final Method method;
+ private final Object[] args;
+ private final boolean isRpc;
+ private final int callId;
+ final Counters counters;
+
+ private final RetryPolicy retryPolicy;
+ private final RetryInvocationHandler<?> retryInvocationHandler;
+
+ Call(Method method, Object[] args, boolean isRpc, int callId,
+ Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
+ this.method = method;
+ this.args = args;
+ this.isRpc = isRpc;
+ this.callId = callId;
+ this.counters = counters;
+
+ this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
+ this.retryInvocationHandler = retryInvocationHandler;
+ }
+
+ /** Invoke the call once without retrying. */
+ synchronized CallReturn invokeOnce() {
+ try {
+ // The number of times this invocation handler has ever been failed over
+ // before this method invocation attempt. Used to prevent concurrent
+ // failed method invocations from triggering multiple failover attempts.
+ final long failoverCount = retryInvocationHandler.getFailoverCount();
+ try {
+ return invoke();
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this, e);
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ // If interrupted, do not retry.
+ throw e;
+ }
+ retryInvocationHandler.handleException(
+ method, retryPolicy, failoverCount, counters, e);
+ return CallReturn.RETRY;
+ }
+ } catch(Throwable t) {
+ return new CallReturn(t);
+ }
+ }
+
+ CallReturn invoke() throws Throwable {
+ return new CallReturn(invokeMethod());
+ }
+
+ Object invokeMethod() throws Throwable {
+ if (isRpc) {
+ Client.setCallIdAndRetryCount(callId, counters.retries);
+ }
+ return retryInvocationHandler.invokeMethod(method, args);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "#" + callId + ": "
+ + method.getDeclaringClass().getSimpleName() + "." + method.getName()
+ + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
+ + ")";
+ }
+ }
+
+ static class Counters {
/** Counter for retries. */
private int retries;
/** Counter for method invocation has been failed over. */
private int failovers;
+
+ boolean isZeros() {
+ return retries == 0 && failovers == 0;
+ }
}
private static class ProxyDescriptor<T> {
@@ -144,11 +216,13 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
private final ProxyDescriptor<T> proxyDescriptor;
- private volatile boolean hasMadeASuccessfulCall = false;
-
+ private volatile boolean hasSuccessfulCall = false;
+
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
+ private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
+
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) {
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@@ -167,38 +241,35 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return policy != null? policy: defaultPolicy;
}
+ private long getFailoverCount() {
+ return proxyDescriptor.getFailoverCount();
+ }
+
+ private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
+ Counters counters) {
+ if (Client.isAsynchronousMode()) {
+ return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
+ counters, this);
+ } else {
+ return new Call(method, args, isRpc, callId, counters, this);
+ }
+ }
+
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
- return invoke(method, args, isRpc, callId, new Counters());
- }
-
- private Object invoke(final Method method, final Object[] args,
- final boolean isRpc, final int callId, final Counters counters)
- throws Throwable {
- final RetryPolicy policy = getRetryPolicy(method);
+ final Counters counters = new Counters();
+ final Call call = newCall(method, args, isRpc, callId, counters);
while (true) {
- // The number of times this invocation handler has ever been failed over,
- // before this method invocation attempt. Used to prevent concurrent
- // failed method invocations from triggering multiple failover attempts.
- final long failoverCount = proxyDescriptor.getFailoverCount();
-
- if (isRpc) {
- Client.setCallIdAndRetryCount(callId, counters.retries);
- }
- try {
- final Object ret = invokeMethod(method, args);
- hasMadeASuccessfulCall = true;
- return ret;
- } catch (Exception ex) {
- if (Thread.currentThread().isInterrupted()) {
- // If interrupted, do not retry.
- throw ex;
- }
- handleException(method, policy, failoverCount, counters, ex);
+ final CallReturn c = call.invokeOnce();
+ final CallReturn.State state = c.getState();
+ if (state == CallReturn.State.ASYNC_INVOKED) {
+ return null; // return null for async calls
+ } else if (c.getState() != CallReturn.State.RETRY) {
+ return c.getReturnValue();
}
}
}
@@ -239,7 +310,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
final int failovers, final long delay, final Exception ex) {
// log info if this has made some successful calls or
// this is not the first failover
- final boolean info = hasMadeASuccessfulCall || failovers != 0;
+ final boolean info = hasSuccessfulCall || failovers != 0
+ || asyncCallHandler.hasSuccessfulCall();
if (!info && !LOG.isDebugEnabled()) {
return;
}
@@ -265,7 +337,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (!method.isAccessible()) {
method.setAccessible(true);
}
- return method.invoke(proxyDescriptor.getProxy(), args);
+ final Object r = method.invoke(proxyDescriptor.getProxy(), args);
+ hasSuccessfulCall = true;
+ return r;
} catch (InvocationTargetException e) {
throw e.getCause();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index 131aa8f..c0a14b7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.retry;
+import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
@@ -647,8 +648,9 @@ public class RetryPolicies {
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
+ retries + ") exceeded maximum allowed (" + maxRetries + ")");
}
-
+
if (e instanceof ConnectException ||
+ e instanceof EOFException ||
e instanceof NoRouteToHostException ||
e instanceof UnknownHostException ||
e instanceof StandbyException ||
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d1d5b17..ed8d905 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
@@ -94,8 +93,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
- = new ThreadLocal<>();
+ private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
+ ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -106,8 +105,9 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Future<T> getAsyncRpcResponse() {
- return (Future<T>) ASYNC_RPC_RESPONSE.get();
+ public static <T extends Writable> AsyncGet<T, IOException>
+ getAsyncRpcResponse() {
+ return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -1413,9 +1413,16 @@ public class Client implements AutoCloseable {
}
}
}
+
+ @Override
+ public boolean isDone() {
+ synchronized (call) {
+ return call.done;
+ }
+ }
};
- ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
+ ASYNC_RPC_RESPONSE.set(asyncGet);
return null;
} else {
return getRpcResponse(call, connection, -1, null);
@@ -1460,10 +1467,8 @@ public class Client implements AutoCloseable {
synchronized (call) {
while (!call.done) {
try {
- final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
- timeout, unit);
- call.wait(waitTimeout); // wait for the result
- if (waitTimeout > 0 && !call.done) {
+ AsyncGet.Util.wait(call, timeout, unit);
+ if (timeout >= 0 && !call.done) {
return null;
}
} catch (InterruptedException ie) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0f43fc6..315ec67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -54,7 +54,6 @@ import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -256,14 +255,18 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Client.isAsynchronousMode()) {
- final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
+ final AsyncGet<RpcResponseWrapper, IOException> arr
+ = Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet
= new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
- final RpcResponseWrapper rrw = timeout < 0?
- frrw.get(): frrw.get(timeout, unit);
- return getReturnMessage(method, rrw);
+ return getReturnMessage(method, arr.get(timeout, unit));
+ }
+
+ @Override
+ public boolean isDone() {
+ return arr.isDone();
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
index 5eac869..f124890 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -47,14 +47,19 @@ public interface AsyncGet<R, E extends Throwable> {
R get(long timeout, TimeUnit unit)
throws E, TimeoutException, InterruptedException;
+ /** @return true if the underlying computation is done; false, otherwise. */
+ boolean isDone();
+
/** Utility */
class Util {
- /**
- * @return {@link Object#wait(long)} timeout converted
- * from {@link #get(long, TimeUnit)} timeout.
- */
- public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
- return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
+ /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
+ public static void wait(Object obj, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (timeout < 0) {
+ obj.wait();
+ } else if (timeout > 0) {
+ obj.wait(unit.toMillis(timeout));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 0ad191b..4450c0c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.TestIPC.TestServer;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -50,6 +51,11 @@ public class TestAsyncIPC {
private static Configuration conf;
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
+ static <T extends Writable> AsyncGetFuture<T, IOException>
+ getAsyncRpcResponseFuture() {
+ return new AsyncGetFuture<>(Client.getAsyncRpcResponse());
+ }
+
@Before
public void setupConf() {
conf = new Configuration();
@@ -84,7 +90,7 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- returnFutures.put(i, Client.getAsyncRpcResponse());
+ returnFutures.put(i, getAsyncRpcResponseFuture());
expectedValues.put(i, param);
} catch (Exception e) {
failed = true;
@@ -204,7 +210,7 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
- returnFutures.put(idx, Client.getAsyncRpcResponse());
+ returnFutures.put(idx, getAsyncRpcResponseFuture());
expectedValues.put(idx, param);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 29bac2a..824336a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
@@ -51,9 +51,8 @@ public class AsyncDistributedFileSystem {
this.dfs = dfs;
}
- static <T> Future<T> getReturnValue() {
- return new AsyncGetFuture<>(
- ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
+ private static <T> Future<T> getReturnValue() {
+ return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 2373da7..bcf5269 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -184,6 +183,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -212,8 +212,6 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
- private static final ThreadLocal<AsyncGet<?, Exception>>
- ASYNC_RETURN_VALUE = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -247,12 +245,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
- @SuppressWarnings("unchecked")
- @Unstable
- public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
- return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
- }
-
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -391,8 +383,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
asyncReturnMessage.get(timeout, unit);
return null;
}
+
+ @Override
+ public boolean isDone() {
+ return asyncReturnMessage.isDone();
+ }
};
- ASYNC_RETURN_VALUE.set(asyncGet);
+ AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
}
@Override
@@ -1367,17 +1364,20 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy.getAclStatus(null, req);
final AsyncGet<Message, Exception> asyncReturnMessage
= ProtobufRpcEngine.getAsyncReturnMessage();
- final AsyncGet<AclStatus, Exception> asyncGet =
- new AsyncGet<AclStatus, Exception>() {
- @Override
- public AclStatus get(long timeout, TimeUnit unit)
- throws Exception {
- return PBHelperClient
- .convert((GetAclStatusResponseProto) asyncReturnMessage
- .get(timeout, unit));
- }
- };
- ASYNC_RETURN_VALUE.set(asyncGet);
+ final AsyncGet<AclStatus, Exception> asyncGet
+ = new AsyncGet<AclStatus, Exception>() {
+ @Override
+ public AclStatus get(long timeout, TimeUnit unit) throws Exception {
+ return PBHelperClient.convert((GetAclStatusResponseProto)
+ asyncReturnMessage.get(timeout, unit));
+ }
+
+ @Override
+ public boolean isDone() {
+ return asyncReturnMessage.isDone();
+ }
+ };
+ AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
return null;
} else {
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index c7615a9..6a60290 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -70,7 +71,7 @@ public class TestAsyncDFS {
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
private final short replFactor = 1;
private final long blockSize = 512;
- private long fileLen = blockSize * 3;
+ private long fileLen = 0;
private final long seed = Time.now();
private final Random r = new Random(seed);
private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@@ -80,7 +81,7 @@ public class TestAsyncDFS {
private Configuration conf;
private MiniDFSCluster cluster;
- private FileSystem fs;
+ private DistributedFileSystem fs;
private AsyncDistributedFileSystem adfs;
@Before
@@ -95,10 +96,10 @@ public class TestAsyncDFS {
ASYNC_CALL_LIMIT);
// set server handlers
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
- fs = FileSystem.get(conf);
- adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
+ fs = cluster.getFileSystem();
+ adfs = fs.getAsyncDistributedFileSystem();
}
@After
@@ -113,31 +114,6 @@ public class TestAsyncDFS {
}
}
- static class AclQueueEntry {
- private final Object future;
- private final Path path;
- private final Boolean isSetAcl;
-
- AclQueueEntry(final Object future, final Path path,
- final Boolean isSetAcl) {
- this.future = future;
- this.path = path;
- this.isSetAcl = isSetAcl;
- }
-
- public final Object getFuture() {
- return future;
- }
-
- public final Path getPath() {
- return path;
- }
-
- public final Boolean isSetAcl() {
- return this.isSetAcl;
- }
- }
-
@Test(timeout=60000)
public void testBatchAsyncAcl() throws Exception {
final String basePath = "testBatchAsyncAcl";
@@ -348,7 +324,7 @@ public class TestAsyncDFS {
public static void checkPermissionDenied(final Exception e, final Path dir,
final String user) {
- assertTrue(e.getCause() instanceof ExecutionException);
+ assertTrue(e.getCause() instanceof RemoteException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
@@ -470,4 +446,9 @@ public class TestAsyncDFS {
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
}
}
+
+ @Test
+ public void testAsyncWithoutRetry() throws Exception {
+ TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
new file mode 100644
index 0000000..9ade8ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.io.retry.AsyncCallHandler;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/** Test async methods with HA setup. */
+public class TestAsyncHDFSWithHA {
+ static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
+ static {
+ GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
+ }
+
+ private static <T> Future<T> getReturnValue() {
+ return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
+ }
+
+ static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
+ Path[] dsts) throws IOException {
+ for (int i = 0; i < srcs.length; i++) {
+ srcs[i] = new Path(dir, "src" + i);
+ dsts[i] = new Path(dir, "dst" + i);
+ dfs.mkdirs(srcs[i]);
+ }
+ }
+
+ static void runTestAsyncWithoutRetry(Configuration conf,
+ MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
+ final int num = 5;
+
+ final String renameDir = "/testAsyncWithoutRetry/";
+ final Path[] srcs = new Path[num + 1];
+ final Path[] dsts = new Path[num + 1];
+ mkdirs(dfs, renameDir, srcs, dsts);
+
+ // create a proxy without retry.
+ final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
+ = NameNodeProxies.createNonHAProxy(conf,
+ cluster.getNameNode(0).getNameNodeAddress(),
+ ClientProtocol.class, UserGroupInformation.getCurrentUser(),
+ false);
+ final ClientProtocol cp = proxyInfo.getProxy();
+
+ // submit async calls
+ Client.setAsynchronousMode(true);
+ final List<Future<Void>> results = new ArrayList<>();
+ for (int i = 0; i < num; i++) {
+ final String src = srcs[i].toString();
+ final String dst = dsts[i].toString();
+ LOG.info(i + ") rename " + src + " -> " + dst);
+ cp.rename2(src, dst);
+ results.add(getReturnValue());
+ }
+ Client.setAsynchronousMode(false);
+
+ // wait for the async calls
+ for (Future<Void> f : results) {
+ f.get();
+ }
+
+ //check results
+ for (int i = 0; i < num; i++) {
+ Assert.assertEquals(false, dfs.exists(srcs[i]));
+ Assert.assertEquals(true, dfs.exists(dsts[i]));
+ }
+ }
+
+ /** Testing HDFS async methods with HA setup. */
+ @Test(timeout = 120000)
+ public void testAsyncWithHAFailover() throws Exception {
+ final int num = 10;
+
+ final Configuration conf = new HdfsConfiguration();
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(0).build();
+
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+
+ final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
+ cluster, conf);
+ runTestAsyncWithoutRetry(conf, cluster, dfs);
+
+ final String renameDir = "/testAsyncWithHAFailover/";
+ final Path[] srcs = new Path[num + 1];
+ final Path[] dsts = new Path[num + 1];
+ mkdirs(dfs, renameDir, srcs, dsts);
+
+ // submit async calls and trigger failover in the middle.
+ final AsyncDistributedFileSystem adfs
+ = dfs.getAsyncDistributedFileSystem();
+ final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
+
+ final List<Future<Void>> results = new ArrayList<>();
+ final List<IOException> exceptions = new ArrayList<>();
+ final List<Future<?>> futures = new ArrayList<>();
+ final int half = num/2;
+ for(int i = 0; i <= num; i++) {
+ final int id = i;
+ futures.add(executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (id == half) {
+ // failover
+ cluster.shutdownNameNode(0);
+ cluster.transitionToActive(1);
+ } else {
+ // rename
+ results.add(adfs.rename(srcs[id], dsts[id]));
+ }
+ } catch (IOException e) {
+ exceptions.add(e);
+ }
+ }
+ }));
+ }
+
+ // wait for the tasks
+ Assert.assertEquals(num + 1, futures.size());
+ for(int i = 0; i <= num; i++) {
+ futures.get(i).get();
+ }
+ // wait for the async calls
+ Assert.assertEquals(num, results.size());
+ Assert.assertTrue(exceptions.isEmpty());
+ for(Future<Void> r : results) {
+ r.get();
+ }
+
+ // check results
+ for(int i = 0; i <= num; i++) {
+ final boolean renamed = i != half;
+ Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
+ Assert.assertEquals(renamed, dfs.exists(dsts[i]));
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35f255b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 42cf3d4..169bbee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -135,7 +136,8 @@ public abstract class HATestUtil {
}
/** Gets the filesystem instance by setting the failover configurations */
- public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
+ public static DistributedFileSystem configureFailoverFs(
+ MiniDFSCluster cluster, Configuration conf)
throws IOException, URISyntaxException {
return configureFailoverFs(cluster, conf, 0);
}
@@ -147,13 +149,14 @@ public abstract class HATestUtil {
* @param nsIndex namespace index starting with zero
* @throws IOException if an error occurs rolling the edit log
*/
- public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
+ public static DistributedFileSystem configureFailoverFs(
+ MiniDFSCluster cluster, Configuration conf,
int nsIndex) throws IOException, URISyntaxException {
conf = new Configuration(conf);
String logicalName = getLogicalHostname(cluster);
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
- return fs;
+ return (DistributedFileSystem)fs;
}
public static void setFailoverConfigurations(MiniDFSCluster cluster,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/8] hadoop git commit: Revert "Revert "HDFS-10346. Implement
asynchronous setPermission/setOwner for DistributedFileSystem. Contributed by
Xiaobing Zhou""
Posted by sz...@apache.org.
Revert "Revert "HDFS-10346. Implement asynchronous setPermission/setOwner for DistributedFileSystem. Contributed by Xiaobing Zhou""
This reverts commit f23d5dfc60a017187ae57f3667ac0e688877c2dd.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cba9a018
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cba9a018
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cba9a018
Branch: refs/heads/trunk
Commit: cba9a0188970cb33dcb95e9c49168ac4a83446d9
Parents: aa20fa1
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:29:38 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:29:38 2016 +0800
----------------------------------------------------------------------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 59 ++++
.../ClientNamenodeProtocolTranslatorPB.java | 39 ++-
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 267 +++++++++++++++++--
.../apache/hadoop/hdfs/TestDFSPermission.java | 29 +-
4 files changed, 351 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba9a018/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 356ae3f..4fe0861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.ipc.Client;
@@ -37,6 +38,9 @@ import com.google.common.util.concurrent.AbstractFuture;
* This instance of this class is the way end-user code interacts
* with a Hadoop DistributedFileSystem in an asynchronous manner.
*
+ * This class is unstable, so no guarantee is provided as to reliability,
+ * stability or compatibility across any level of release granularity.
+ *
*****************************************************************/
@Unstable
public class AsyncDistributedFileSystem {
@@ -111,4 +115,59 @@ public class AsyncDistributedFileSystem {
Client.setAsynchronousMode(isAsync);
}
}
+
+ /**
+ * Set permission of a path.
+ *
+ * @param p
+ * the path the permission is set to
+ * @param permission
+ * the permission that is set to a path.
+ * @return an instance of Future, #get of which is invoked to wait for
+ * asynchronous call being finished.
+ */
+ public Future<Void> setPermission(Path p, final FsPermission permission)
+ throws IOException {
+ dfs.getFsStatistics().incrementWriteOps(1);
+ final Path absPath = dfs.fixRelativePart(p);
+ final boolean isAsync = Client.isAsynchronousMode();
+ Client.setAsynchronousMode(true);
+ try {
+ dfs.getClient().setPermission(dfs.getPathName(absPath), permission);
+ return getReturnValue();
+ } finally {
+ Client.setAsynchronousMode(isAsync);
+ }
+ }
+
+ /**
+ * Set owner of a path (i.e. a file or a directory). The parameters username
+ * and groupname cannot both be null.
+ *
+ * @param p
+ * The path
+ * @param username
+ * If it is null, the original username remains unchanged.
+ * @param groupname
+ * If it is null, the original groupname remains unchanged.
+ * @return an instance of Future, #get of which is invoked to wait for
+ * asynchronous call being finished.
+ */
+ public Future<Void> setOwner(Path p, String username, String groupname)
+ throws IOException {
+ if (username == null && groupname == null) {
+ throw new IOException("username == null && groupname == null");
+ }
+
+ dfs.getFsStatistics().incrementWriteOps(1);
+ final Path absPath = dfs.fixRelativePart(p);
+ final boolean isAsync = Client.isAsynchronousMode();
+ Client.setAsynchronousMode(true);
+ try {
+ dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname);
+ return getReturnValue();
+ } finally {
+ Client.setAsynchronousMode(isAsync);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba9a018/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index f4074b6..faa925c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -367,12 +367,30 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setPermission(PBHelperClient.convert(permission))
.build();
try {
- rpcProxy.setPermission(null, req);
+ if (Client.isAsynchronousMode()) {
+ rpcProxy.setPermission(null, req);
+ setReturnValueCallback();
+ } else {
+ rpcProxy.setPermission(null, req);
+ }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
+ private void setReturnValueCallback() {
+ final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+ .getReturnMessageCallback();
+ Callable<Void> callBack = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ returnMessageCallback.call();
+ return null;
+ }
+ };
+ RETURN_VALUE_CALLBACK.set(callBack);
+ }
+
@Override
public void setOwner(String src, String username, String groupname)
throws IOException {
@@ -383,7 +401,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (groupname != null)
req.setGroupname(groupname);
try {
- rpcProxy.setOwner(null, req.build());
+ if (Client.isAsynchronousMode()) {
+ rpcProxy.setOwner(null, req.build());
+ setReturnValueCallback();
+ } else {
+ rpcProxy.setOwner(null, req.build());
+ }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -513,17 +536,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req);
-
- final Callable<Message> returnMessageCallback = ProtobufRpcEngine
- .getReturnMessageCallback();
- Callable<Void> callBack = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- returnMessageCallback.call();
- return null;
- }
- };
- RETURN_VALUE_CALLBACK.set(callBack);
+ setReturnValueCallback();
} else {
rpcProxy.rename2(null, req);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba9a018/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index d129299..7539fbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -22,8 +22,11 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -31,18 +34,30 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestAsyncDFSRename {
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+ private final long seed = Time.now();
+ private final Random r = new Random(seed);
+ private final PermissionGenerator permGenerator = new PermissionGenerator(r);
+ private final short replFactor = 2;
+ private final long blockSize = 512;
+ private long fileLen = blockSize * 3;
/**
* Check the blocks of dst file are cleaned after rename with overwrite
@@ -50,8 +65,6 @@ public class TestAsyncDFSRename {
*/
@Test(timeout = 60000)
public void testAsyncRenameWithOverwrite() throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
replFactor).build();
@@ -60,8 +73,6 @@ public class TestAsyncDFSRename {
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
try {
-
- long fileLen = blockSize * 3;
String src = "/foo/src";
String dst = "/foo/dst";
String src2 = "/foo/src2";
@@ -115,8 +126,6 @@ public class TestAsyncDFSRename {
@Test(timeout = 60000)
public void testCallGetReturnValueMultipleTimes() throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
final Path renameDir = new Path(
"/test/testCallGetReturnValueMultipleTimes/");
final Configuration conf = new HdfsConfiguration();
@@ -127,7 +136,6 @@ public class TestAsyncDFSRename {
final DistributedFileSystem dfs = cluster.getFileSystem();
final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
final int count = 100;
- long fileLen = blockSize * 3;
final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
@@ -178,15 +186,15 @@ public class TestAsyncDFSRename {
}
}
- @Test(timeout = 120000)
- public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+ @Test
+ public void testConservativeConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(100,
"testAggressiveConcurrentAsyncRenameWithOverwrite");
}
@Test(timeout = 60000)
- public void testConservativeConcurrentAsyncRenameWithOverwrite()
+ public void testAggressiveConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(10000,
"testConservativeConcurrentAsyncRenameWithOverwrite");
@@ -194,8 +202,6 @@ public class TestAsyncDFSRename {
private void internalTestConcurrentAsyncRenameWithOverwrite(
final int asyncCallLimit, final String basePath) throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
final Path renameDir = new Path(String.format("/test/%s/", basePath));
Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
@@ -206,7 +212,6 @@ public class TestAsyncDFSRename {
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
- long fileLen = blockSize * 3;
int start = 0, end = 0;
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
@@ -274,8 +279,206 @@ public class TestAsyncDFSRename {
}
}
+ @Test
+ public void testConservativeConcurrentAsyncAPI() throws Exception {
+ internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
+ }
+
+ @Test(timeout = 60000)
+ public void testAggressiveConcurrentAsyncAPI() throws Exception {
+ internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
+ }
+
+ private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
+ final String basePath) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+ int count = 500;
+
+ // explicitly turn on permission checking
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+ // set the limit of max async calls
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ asyncCallLimit);
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+ u2gMap.put(user1, new String[] {group1, group2});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+ // start mini cluster
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3).build();
+ cluster.waitActive();
+ AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+ .getAsyncDistributedFileSystem();
+
+ // prepare for test
+ FileSystem rootFs = FileSystem.get(conf);
+ final Path parent = new Path(String.format("/test/%s/", basePath));
+ final Path[] srcs = new Path[count];
+ final Path[] dsts = new Path[count];
+ short[] permissions = new short[count];
+ for (int i = 0; i < count; i++) {
+ srcs[i] = new Path(parent, "src" + i);
+ dsts[i] = new Path(parent, "dst" + i);
+ DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
+ DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
+ assertTrue(rootFs.exists(srcs[i]));
+ assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
+ assertTrue(rootFs.exists(dsts[i]));
+ assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
+ permissions[i] = permGenerator.next();
+ }
+
+ Map<Integer, Future<Void>> renameRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<Void>> permRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<Void>> ownerRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ int start = 0, end = 0;
+ // test rename
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
+ Rename.OVERWRITE);
+ renameRetFutures.put(i, returnFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(renameRetFutures, start, end);
+ }
+ }
+ }
+
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
+ renameRetFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ assertFalse(rootFs.exists(srcs[i]));
+ assertTrue(rootFs.exists(dsts[i]));
+ }
+
+ // test permissions
+ try {
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setPermission(dsts[i],
+ new FsPermission(permissions[i]));
+ permRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(permRetFutures, start, end);
+ }
+ }
+ }
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
+ permRetFutures.get(i).get();
+ }
+
+ // Restart NN and check permission then
+ cluster.restartNameNodes();
+
+ // verify the permission
+ for (int i = 0; i < count; i++) {
+ assertTrue(rootFs.exists(dsts[i]));
+ FsPermission fsPerm = new FsPermission(permissions[i]);
+ checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
+ fsPerm.getUserAction());
+ }
+
+ // test setOwner
+ start = 0;
+ end = 0;
+ for (int i = 0; i < count; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
+ "group2");
+ ownerRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(ownerRetFutures, start, end);
+ }
+ }
+ }
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
+ ownerRetFutures.get(i).get();
+ }
+
+ // Restart NN and check owner then
+ cluster.restartNameNodes();
+
+ // verify the owner
+ for (int i = 0; i < count; i++) {
+ assertTrue(rootFs.exists(dsts[i]));
+ assertTrue(
+ "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
+ assertTrue(
+ "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
+ }
+ } catch (AccessControlException ace) {
+ throw ace;
+ } finally {
+ if (rootFs != null) {
+ rootFs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ static void checkAccessPermissions(FileStatus stat, FsAction mode)
+ throws IOException {
+ checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+ }
+
+ static void checkAccessPermissions(final UserGroupInformation ugi,
+ FileStatus stat, FsAction mode) throws IOException {
+ FsPermission perm = stat.getPermission();
+ String user = ugi.getShortUserName();
+ List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+ if (user.equals(stat.getOwner())) {
+ if (perm.getUserAction().implies(mode)) {
+ return;
+ }
+ } else if (groups.contains(stat.getGroup())) {
+ if (perm.getGroupAction().implies(mode)) {
+ return;
+ }
+ } else {
+ if (perm.getOtherAction().implies(mode)) {
+ return;
+ }
+ }
+ throw new AccessControlException(String.format(
+ "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+ .getPath(), stat.getOwner(), stat.getGroup(),
+ stat.isDirectory() ? "d" : "-", perm));
+ }
+
@Test(timeout = 60000)
- public void testAsyncRenameWithException() throws Exception {
+ public void testAsyncAPIWithException() throws Exception {
Configuration conf = new HdfsConfiguration();
String group1 = "group1";
String group2 = "group2";
@@ -286,9 +489,9 @@ public class TestAsyncDFSRename {
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
- Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
- u2g_map.put(user1, new String[] { group1, group2 });
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+ Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+ u2gMap.put(user1, new String[] {group1, group2});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
// Initiate all four users
ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
@@ -299,7 +502,7 @@ public class TestAsyncDFSRename {
cluster.waitActive();
FileSystem rootFs = FileSystem.get(conf);
- final Path renameDir = new Path("/test/async_rename_exception/");
+ final Path renameDir = new Path("/test/async_api_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
@@ -312,11 +515,33 @@ public class TestAsyncDFSRename {
}
});
+ Future<Void> retFuture;
+ try {
+ retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ retFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the path parent", e
+ .getMessage().contains(src.getParent().toUri().getPath()));
+ }
+
+ FsPermission fsPerm = new FsPermission(permGenerator.next());
+ try {
+ retFuture = adfs.setPermission(src, fsPerm);
+ retFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the name of the path",
+ e.getMessage().contains(src.getName()));
+ }
+
try {
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFuture.get();
+ retFuture = adfs.setOwner(src, "user1", "group2");
+ retFuture.get();
} catch (ExecutionException e) {
checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the name of the path",
+ e.getMessage().contains(src.getName()));
} finally {
if (rootFs != null) {
rootFs.close();
@@ -334,7 +559,5 @@ public class TestAsyncDFSRename {
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
.getMessage().contains(user));
- assertTrue("Permission denied messages must carry the path parent", e
- .getMessage().contains(dir.getParent().toUri().getPath()));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba9a018/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
index aa204cd..66a0380 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
@@ -196,22 +196,35 @@ public class TestDFSPermission {
return fs.getFileStatus(path).getPermission().toShort();
}
- /* create a file/directory with the default umask and permission */
private void create(OpType op, Path name) throws IOException {
- create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION));
+ create(fs, conf, op, name);
+ }
+
+ /* create a file/directory with the default umask and permission */
+ static void create(final FileSystem fs, final Configuration fsConf,
+ OpType op, Path name) throws IOException {
+ create(fs, fsConf, op, name, DEFAULT_UMASK, new FsPermission(
+ DEFAULT_PERMISSION));
+ }
+
+ private void create(OpType op, Path name, short umask,
+ FsPermission permission)
+ throws IOException {
+ create(fs, conf, op, name, umask, permission);
}
/* create a file/directory with the given umask and permission */
- private void create(OpType op, Path name, short umask,
- FsPermission permission) throws IOException {
+ static void create(final FileSystem fs, final Configuration fsConf,
+ OpType op, Path name, short umask, FsPermission permission)
+ throws IOException {
// set umask in configuration, converting to padded octal
- conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
+ fsConf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
// create the file/directory
switch (op) {
case CREATE:
FSDataOutputStream out = fs.create(name, permission, true,
- conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+ fsConf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
fs.getDefaultReplication(name), fs.getDefaultBlockSize(name), null);
out.close();
break;
@@ -359,7 +372,7 @@ public class TestDFSPermission {
final static private String DIR_NAME = "dir";
final static private String FILE_DIR_NAME = "filedir";
- private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
+ enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE
};
@@ -615,7 +628,7 @@ public class TestDFSPermission {
/* A random permission generator that guarantees that each permission
* value is generated only once.
*/
- static private class PermissionGenerator {
+ static class PermissionGenerator {
private final Random r;
private final short[] permissions = new short[MAX_PERMISSION + 1];
private int numLeft = MAX_PERMISSION + 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/8] hadoop git commit: Revert "Revert "HADOOP-12957. Limit the
number of outstanding async calls. Contributed by Xiaobing Zhou""
Posted by sz...@apache.org.
Revert "Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou""
This reverts commit 4d36b221a24e3b626bb91093b0bb0fd377061cae.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa20fa15
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa20fa15
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa20fa15
Branch: refs/heads/trunk
Commit: aa20fa150d522b9fe469dd99a8e24d7e27d888ea
Parents: eded3d1
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:28:47 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:28:47 2016 +0800
----------------------------------------------------------------------
.../hadoop/fs/CommonConfigurationKeys.java | 3 +
.../ipc/AsyncCallLimitExceededException.java | 36 +++
.../main/java/org/apache/hadoop/ipc/Client.java | 66 ++++-
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 199 ++++++++++++++--
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 12 +-
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 238 +++++++++++++------
6 files changed, 445 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 86e1b43..06614db 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -324,6 +324,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours
+ public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
+ "ipc.client.async.calls.max";
+ public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
new file mode 100644
index 0000000..db97b6c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+/**
+ * Signals that an AsyncCallLimitExceededException has occurred. This class is
+ * used to make application code using async RPC aware that limit of max async
+ * calls is reached, application code need to retrieve results from response of
+ * established async calls to avoid buffer overflow in order for follow-on async
+ * calls going correctly.
+ */
+public class AsyncCallLimitExceededException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public AsyncCallLimitExceededException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d59aeb89..9be4649 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,7 +159,9 @@ public class Client implements AutoCloseable {
private final boolean fallbackAllowed;
private final byte[] clientId;
-
+ private final int maxAsyncCalls;
+ private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
+
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@@ -1288,6 +1290,9 @@ public class Client implements AutoCloseable {
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
+ this.maxAsyncCalls = conf.getInt(
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@@ -1354,6 +1359,20 @@ public class Client implements AutoCloseable {
fallbackToSimpleAuth);
}
+ private void checkAsyncCall() throws IOException {
+ if (isAsynchronousMode()) {
+ if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
+ asyncCallCounter.decrementAndGet();
+ String errMsg = String.format(
+ "Exceeded limit of max asynchronous calls: %d, " +
+ "please configure %s to adjust it.",
+ maxAsyncCalls,
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
+ throw new AsyncCallLimitExceededException(errMsg);
+ }
+ }
+ }
+
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@@ -1374,24 +1393,38 @@ public class Client implements AutoCloseable {
final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
+
try {
- connection.sendRpcRequest(call); // send the rpc request
- } catch (RejectedExecutionException e) {
- throw new IOException("connection has been closed", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("interrupted waiting to send rpc request to server", e);
- throw new IOException(e);
+ checkAsyncCall();
+ try {
+ connection.sendRpcRequest(call); // send the rpc request
+ } catch (RejectedExecutionException e) {
+ throw new IOException("connection has been closed", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("interrupted waiting to send rpc request to server", e);
+ throw new IOException(e);
+ }
+ } catch(Exception e) {
+ if (isAsynchronousMode()) {
+ releaseAsyncCall();
+ }
+ throw e;
}
if (isAsynchronousMode()) {
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+ private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
public Writable get() throws InterruptedException, ExecutionException {
- try {
- set(getRpcResponse(call, connection));
- } catch (IOException ie) {
- setException(ie);
+ if (callled.compareAndSet(false, true)) {
+ try {
+ set(getRpcResponse(call, connection));
+ } catch (IOException ie) {
+ setException(ie);
+ } finally {
+ releaseAsyncCall();
+ }
}
return super.get();
}
@@ -1427,6 +1460,15 @@ public class Client implements AutoCloseable {
asynchronousMode.set(async);
}
+ private void releaseAsyncCall() {
+ asyncCallCounter.decrementAndGet();
+ }
+
+ @VisibleForTesting
+ int getAsyncCallCount() {
+ return asyncCallCounter.get();
+ }
+
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
synchronized (call) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c7..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -34,6 +35,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -54,12 +56,13 @@ public class TestAsyncIPC {
@Before
public void setupConf() {
conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread
Client.setAsynchronousMode(true);
}
- protected static class SerialCaller extends Thread {
+ static class AsyncCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
@@ -68,11 +71,11 @@ public class TestAsyncIPC {
new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
- public SerialCaller(Client client, InetSocketAddress server, int count) {
+ public AsyncCaller(Client client, InetSocketAddress server, int count) {
this.client = client;
this.server = server;
this.count = count;
- // set asynchronous mode, since SerialCaller extends Thread
+ // set asynchronous mode, since AsyncCaller extends Thread
Client.setAsynchronousMode(true);
}
@@ -107,14 +110,111 @@ public class TestAsyncIPC {
}
}
- @Test
- public void testSerial() throws IOException, InterruptedException,
+ static class AsyncLimitlCaller extends Thread {
+ private Client client;
+ private InetSocketAddress server;
+ private int count;
+ private boolean failed;
+ Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
+ Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
+ int start = 0, end = 0;
+
+ int getStart() {
+ return start;
+ }
+
+ int getEnd() {
+ return end;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
+ this(0, client, server, count);
+ }
+
+ final int callerId;
+
+ public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
+ int count) {
+ this.client = client;
+ this.server = server;
+ this.count = count;
+ // set asynchronous mode, since AsyncLimitlCaller extends Thread
+ Client.setAsynchronousMode(true);
+ this.callerId = callerId;
+ }
+
+ @Override
+ public void run() {
+ // in case Thread#Start is called, which will spawn new thread
+ Client.setAsynchronousMode(true);
+ for (int i = 0; i < count; i++) {
+ try {
+ final long param = TestIPC.RANDOM.nextLong();
+ runCall(i, param);
+ } catch (Exception e) {
+ LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
+ StringUtils.stringifyException(e)));
+ failed = true;
+ }
+ }
+ }
+
+ private void runCall(final int idx, final long param)
+ throws InterruptedException, ExecutionException, IOException {
+ for (;;) {
+ try {
+ doCall(idx, param);
+ return;
+ } catch (AsyncCallLimitExceededException e) {
+ /**
+ * reached limit of async calls, fetch results of finished async calls
+ * to let follow-on calls go
+ */
+ start = end;
+ end = idx;
+ waitForReturnValues(start, end);
+ }
+ }
+ }
+
+ private void doCall(final int idx, final long param) throws IOException {
+ TestIPC.call(client, param, server, conf);
+ Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+ returnFutures.put(idx, returnFuture);
+ expectedValues.put(idx, param);
+ }
+
+ private void waitForReturnValues(final int start, final int end)
+ throws InterruptedException, ExecutionException {
+ for (int i = start; i < end; i++) {
+ LongWritable value = returnFutures.get(i).get();
+ if (expectedValues.get(i) != value.get()) {
+ LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
+ failed = true;
+ break;
+ }
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testAsyncCall() throws IOException, InterruptedException,
ExecutionException {
- internalTestSerial(3, false, 2, 5, 100);
- internalTestSerial(3, true, 2, 5, 10);
+ internalTestAsyncCall(3, false, 2, 5, 100);
+ internalTestAsyncCall(3, true, 2, 5, 10);
}
- public void internalTestSerial(int handlerCount, boolean handlerSleep,
+ @Test(timeout = 60000)
+ public void testAsyncCallLimit() throws IOException,
+ InterruptedException, ExecutionException {
+ internalTestAsyncCallLimit(100, false, 5, 10, 500);
+ }
+
+ public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -126,9 +226,9 @@ public class TestAsyncIPC {
clients[i] = new Client(LongWritable.class, conf);
}
- SerialCaller[] callers = new SerialCaller[callerCount];
+ AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
- callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+ callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
@@ -144,6 +244,75 @@ public class TestAsyncIPC {
server.stop();
}
+ @Test(timeout = 60000)
+ public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
+ InterruptedException, ExecutionException {
+ int handlerCount = 10, callCount = 100;
+ Server server = new TestIPC.TestServer(handlerCount, false, conf);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ final Client client = new Client(LongWritable.class, conf);
+
+ int asyncCallCount = client.getAsyncCallCount();
+
+ try {
+ AsyncCaller caller = new AsyncCaller(client, addr, callCount);
+ caller.run();
+
+ caller.waitForReturnValues();
+ String msg = String.format(
+ "First time, expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
+
+ caller.waitForReturnValues();
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
+ msg = String.format("Second time, expected not failed for caller: %s.",
+ caller);
+ assertFalse(msg, caller.failed);
+
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
+
+ public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
+ int clientCount, int callerCount, int callCount) throws IOException,
+ InterruptedException, ExecutionException {
+ Configuration conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
+ Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
+
+ Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ Client[] clients = new Client[clientCount];
+ for (int i = 0; i < clientCount; i++) {
+ clients[i] = new Client(LongWritable.class, conf);
+ }
+
+ AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
+ callCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; i++) {
+ callers[i].join();
+ callers[i].waitForReturnValues(callers[i].getStart(),
+ callers[i].getCount());
+ String msg = String.format("Expected not failed for caller-%d: %s.", i,
+ callers[i]);
+ assertFalse(msg, callers[i].failed);
+ }
+ for (int i = 0; i < clientCount; i++) {
+ clients[i].stop();
+ }
+ server.stop();
+ }
+
/**
* Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -196,7 +365,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final SerialCaller caller = new SerialCaller(client, addr, 4);
+ final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -235,7 +404,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final SerialCaller caller = new SerialCaller(client, addr, 10);
+ final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -272,7 +441,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final SerialCaller caller = new SerialCaller(client, addr, 10);
+ final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -313,9 +482,9 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- SerialCaller[] callers = new SerialCaller[callerCount];
+ AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
- callers[i] = new SerialCaller(client, addr, perCallerCallCount);
+ callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 37899aa..356ae3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
@@ -50,11 +51,14 @@ public class AsyncDistributedFileSystem {
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future<T> returnFuture = new AbstractFuture<T>() {
+ private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
+ if (called.compareAndSet(false, true)) {
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
+ }
}
return super.get();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa20fa15/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 9322e1a..d129299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@@ -31,80 +30,25 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
- final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- final private static Configuration CONF = new HdfsConfiguration();
-
- final private static String GROUP1_NAME = "group1";
- final private static String GROUP2_NAME = "group2";
- final private static String USER1_NAME = "user1";
- private static final UserGroupInformation USER1;
-
- private MiniDFSCluster gCluster;
-
- static {
- // explicitly turn on permission checking
- CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
- u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
- DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
- // Initiate all four users
- USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
- GROUP1_NAME, GROUP2_NAME });
- }
-
- @Before
- public void setUp() throws IOException {
- gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
- gCluster.waitActive();
- }
-
- @After
- public void tearDown() throws IOException {
- if (gCluster != null) {
- gCluster.shutdown();
- gCluster = null;
- }
- }
-
- static int countLease(MiniDFSCluster cluster) {
- return TestDFSRename.countLease(cluster);
- }
-
- void list(DistributedFileSystem dfs, String name) throws IOException {
- FileSystem.LOG.info("\n\n" + name);
- for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
- FileSystem.LOG.info("" + s.getPath());
- }
- }
-
- static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
- DataOutputStream a_out = dfs.create(f);
- a_out.writeBytes("something");
- a_out.close();
- }
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
- @Test
+ @Test(timeout = 60000)
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
@@ -169,38 +113,134 @@ public class TestAsyncDFSRename {
}
}
- @Test
- public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+ @Test(timeout = 60000)
+ public void testCallGetReturnValueMultipleTimes() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
- "/test/concurrent_reanme_with_overwrite_dir/");
+ "/test/testCallGetReturnValueMultipleTimes/");
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ final int count = 100;
+ long fileLen = blockSize * 3;
+ final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+ assertTrue(dfs.mkdirs(renameDir));
+
+ try {
+ // concurrently invoking many rename
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
+ renameDir, dfs);
+ }
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void verifyCallGetReturnValueMultipleTimes(
+ Map<Integer, Future<Void>> returnFutures, int count,
+ MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
+ throws InterruptedException, ExecutionException, IOException {
+ // wait for completing the calls
+ for (int i = 0; i < count; i++) {
+ returnFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src dir should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ assertFalse(dfs.exists(src));
+ assertTrue(dfs.exists(dst));
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+ throws Exception {
+ internalTestConcurrentAsyncRenameWithOverwrite(100,
+ "testAggressiveConcurrentAsyncRenameWithOverwrite");
+ }
+
+ @Test(timeout = 60000)
+ public void testConservativeConcurrentAsyncRenameWithOverwrite()
+ throws Exception {
+ internalTestConcurrentAsyncRenameWithOverwrite(10000,
+ "testConservativeConcurrentAsyncRenameWithOverwrite");
+ }
+
+ private void internalTestConcurrentAsyncRenameWithOverwrite(
+ final int asyncCallLimit, final String basePath) throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
+ final Path renameDir = new Path(String.format("/test/%s/", basePath));
Configuration conf = new HdfsConfiguration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ asyncCallLimit);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
+ long fileLen = blockSize * 3;
+ int start = 0, end = 0;
+ Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
- try {
- long fileLen = blockSize * 3;
- assertTrue(dfs.mkdirs(renameDir));
-
- Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+ assertTrue(dfs.mkdirs(renameDir));
+ try {
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
+ for (;;) {
+ try {
+ LOG.info("rename #" + i);
+ Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ /**
+ * reached limit of async calls, fetch results of finished async
+ * calls to let follow-on calls go
+ */
+ LOG.error(e);
+ start = end;
+ end = i;
+ LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+ waitForReturnValues(returnFutures, start, end);
+ }
+ }
}
// wait for completing the calls
- for (int i = 0; i < count; i++) {
+ for (int i = start; i < count; i++) {
returnFutures.get(i).get();
}
@@ -215,26 +255,60 @@ public class TestAsyncDFSRename {
assertTrue(dfs.exists(dst));
}
} finally {
- dfs.delete(renameDir, true);
+ if (dfs != null) {
+ dfs.close();
+ }
if (cluster != null) {
cluster.shutdown();
}
}
}
- @Test
+ private void waitForReturnValues(
+ final Map<Integer, Future<Void>> returnFutures, final int start,
+ final int end) throws InterruptedException, ExecutionException {
+ LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+ for (int i = start; i < end; i++) {
+ LOG.info("calling Future#get #" + i);
+ returnFutures.get(i).get();
+ }
+ }
+
+ @Test(timeout = 60000)
public void testAsyncRenameWithException() throws Exception {
- FileSystem rootFs = FileSystem.get(CONF);
+ Configuration conf = new HdfsConfiguration();
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+ UserGroupInformation ugi1;
+
+ // explicitly turn on permission checking
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+ u2g_map.put(user1, new String[] { group1, group2 });
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
+ // Initiate all four users
+ ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+ group1, group2 });
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3).build();
+ cluster.waitActive();
+
+ FileSystem rootFs = FileSystem.get(conf);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
- AsyncDistributedFileSystem adfs = USER1
+ AsyncDistributedFileSystem adfs = ugi1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
- return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+ return cluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
@@ -242,16 +316,24 @@ public class TestAsyncDFSRename {
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src);
+ checkPermissionDenied(e, src, user1);
+ } finally {
+ if (rootFs != null) {
+ rootFs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
- private void checkPermissionDenied(final Exception e, final Path dir) {
+ private void checkPermissionDenied(final Exception e, final Path dir,
+ final String user) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(USER1_NAME));
+ .getMessage().contains(user));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[6/8] hadoop git commit: Revert "Revert "HDFS-10431 Refactor and
speedup TestAsyncDFSRename. Contributed by Xiaobing Zhou""
Posted by sz...@apache.org.
Revert "Revert "HDFS-10431 Refactor and speedup TestAsyncDFSRename. Contributed by Xiaobing Zhou""
This reverts commit 5ee5912ebd541d5b4c33ecd46dfdebe1e23b56c3.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db41e6d2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db41e6d2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db41e6d2
Branch: refs/heads/trunk
Commit: db41e6d285a3b425ffd7c11c7baa8253c7929439
Parents: b3d81f3
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:31:34 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:31:34 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 233 +++++++-
.../apache/hadoop/hdfs/TestAsyncDFSRename.java | 563 ++++---------------
2 files changed, 313 insertions(+), 483 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db41e6d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index 67262dd..ddcf492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -29,13 +29,16 @@ import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -43,15 +46,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -63,21 +72,28 @@ import com.google.common.collect.Lists;
* */
public class TestAsyncDFS {
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
- private static final int NUM_TESTS = 1000;
+ private final short replFactor = 1;
+ private final long blockSize = 512;
+ private long fileLen = blockSize * 3;
+ private final long seed = Time.now();
+ private final Random r = new Random(seed);
+ private final PermissionGenerator permGenerator = new PermissionGenerator(r);
+ private static final int NUM_TESTS = 50;
private static final int NUM_NN_HANDLER = 10;
- private static final int ASYNC_CALL_LIMIT = 100;
+ private static final int ASYNC_CALL_LIMIT = 1000;
private Configuration conf;
private MiniDFSCluster cluster;
private FileSystem fs;
+ private AsyncDistributedFileSystem adfs;
@Before
public void setup() throws IOException {
conf = new HdfsConfiguration();
// explicitly turn on acl
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
- // explicitly turn on ACL
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+ // explicitly turn on permission checking
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// set the limit of max async calls
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
ASYNC_CALL_LIMIT);
@@ -86,6 +102,7 @@ public class TestAsyncDFS {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
fs = FileSystem.get(conf);
+ adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
}
@After
@@ -130,13 +147,9 @@ public class TestAsyncDFS {
final String basePath = "testBatchAsyncAcl";
final Path parent = new Path(String.format("/test/%s/", basePath));
- AsyncDistributedFileSystem adfs = cluster.getFileSystem()
- .getAsyncDistributedFileSystem();
-
// prepare test
- int count = NUM_TESTS;
- final Path[] paths = new Path[count];
- for (int i = 0; i < count; i++) {
+ final Path[] paths = new Path[NUM_TESTS];
+ for (int i = 0; i < NUM_TESTS; i++) {
paths[i] = new Path(parent, "acl" + i);
FileSystem.mkdirs(fs, paths[i],
FsPermission.createImmutable((short) 0750));
@@ -153,7 +166,7 @@ public class TestAsyncDFS {
int start = 0, end = 0;
try {
// test setAcl
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < NUM_TESTS; i++) {
for (;;) {
try {
Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
@@ -166,12 +179,12 @@ public class TestAsyncDFS {
}
}
}
- waitForAclReturnValues(setAclRetFutures, end, count);
+ waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS);
// test getAclStatus
start = 0;
end = 0;
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < NUM_TESTS; i++) {
for (;;) {
try {
Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
@@ -185,13 +198,23 @@ public class TestAsyncDFS {
}
}
}
- waitForAclReturnValues(getAclRetFutures, end, count, paths,
+ waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths,
expectedAclSpec);
} catch (Exception e) {
throw e;
}
}
+ static void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
+ final int start, final int end)
+ throws InterruptedException, ExecutionException {
+ LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+ for (int i = start; i < end; i++) {
+ LOG.info("calling Future#get #" + i);
+ retFutures.get(i).get();
+ }
+ }
+
private void waitForAclReturnValues(
final Map<Integer, Future<Void>> aclRetFutures, final int start,
final int end) throws InterruptedException, ExecutionException {
@@ -266,9 +289,12 @@ public class TestAsyncDFS {
final Path parent = new Path("/test/async_api_exception/");
final Path aclDir = new Path(parent, "aclDir");
- fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
+ final Path src = new Path(parent, "src");
+ final Path dst = new Path(parent, "dst");
+ fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700));
+ fs.mkdirs(src);
- AsyncDistributedFileSystem adfs = ugi1
+ AsyncDistributedFileSystem adfs1 = ugi1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
@@ -277,9 +303,36 @@ public class TestAsyncDFS {
});
Future<Void> retFuture;
+ // test rename
+ try {
+ retFuture = adfs1.rename(src, dst, Rename.OVERWRITE);
+ retFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src, user1);
+ assertTrue("Permission denied messages must carry the path parent", e
+ .getMessage().contains(src.getParent().toUri().getPath()));
+ }
+
+ // test setPermission
+ FsPermission fsPerm = new FsPermission(permGenerator.next());
+ try {
+ retFuture = adfs1.setPermission(src, fsPerm);
+ retFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src, user1);
+ }
+
+ // test setOwner
+ try {
+ retFuture = adfs1.setOwner(src, "user1", "group2");
+ retFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src, user1);
+ }
+
// test setAcl
try {
- retFuture = adfs.setAcl(aclDir,
+ retFuture = adfs1.setAcl(aclDir,
Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
retFuture.get();
fail("setAcl should fail with permission denied");
@@ -289,7 +342,7 @@ public class TestAsyncDFS {
// test getAclStatus
try {
- Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
+ Future<AclStatus> aclRetFuture = adfs1.getAclStatus(aclDir);
aclRetFuture.get();
fail("getAclStatus should fail with permission denied");
} catch (ExecutionException e) {
@@ -307,4 +360,148 @@ public class TestAsyncDFS {
assertTrue("Permission denied messages must carry the name of the path",
e.getMessage().contains(dir.getName()));
}
+
+
+ @Test(timeout = 120000)
+ public void testConcurrentAsyncAPI() throws Exception {
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+
+ // create fake mapping for the groups
+ Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+ u2gMap.put(user1, new String[] {group1, group2});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+ // prepare for test
+ final Path parent = new Path(
+ String.format("/test/%s/", "testConcurrentAsyncAPI"));
+ final Path[] srcs = new Path[NUM_TESTS];
+ final Path[] dsts = new Path[NUM_TESTS];
+ short[] permissions = new short[NUM_TESTS];
+ for (int i = 0; i < NUM_TESTS; i++) {
+ srcs[i] = new Path(parent, "src" + i);
+ dsts[i] = new Path(parent, "dst" + i);
+ DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
+ DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
+ assertTrue(fs.exists(srcs[i]));
+ assertTrue(fs.getFileStatus(srcs[i]).isFile());
+ assertTrue(fs.exists(dsts[i]));
+ assertTrue(fs.getFileStatus(dsts[i]).isFile());
+ permissions[i] = permGenerator.next();
+ }
+
+ Map<Integer, Future<Void>> renameRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<Void>> permRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ Map<Integer, Future<Void>> ownerRetFutures =
+ new HashMap<Integer, Future<Void>>();
+ int start = 0, end = 0;
+ // test rename
+ for (int i = 0; i < NUM_TESTS; i++) {
+ for (;;) {
+ try {
+ Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
+ Rename.OVERWRITE);
+ renameRetFutures.put(i, returnFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(renameRetFutures, start, end);
+ }
+ }
+ }
+
+ // wait for completing the calls
+ waitForAclReturnValues(renameRetFutures, end, NUM_TESTS);
+
+ // verify the src should not exist, dst should
+ for (int i = 0; i < NUM_TESTS; i++) {
+ assertFalse(fs.exists(srcs[i]));
+ assertTrue(fs.exists(dsts[i]));
+ }
+
+ // test permissions
+ for (int i = 0; i < NUM_TESTS; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setPermission(dsts[i],
+ new FsPermission(permissions[i]));
+ permRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(permRetFutures, start, end);
+ }
+ }
+ }
+ // wait for completing the calls
+ waitForAclReturnValues(permRetFutures, end, NUM_TESTS);
+
+ // verify the permission
+ for (int i = 0; i < NUM_TESTS; i++) {
+ assertTrue(fs.exists(dsts[i]));
+ FsPermission fsPerm = new FsPermission(permissions[i]);
+ checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
+ }
+
+ // test setOwner
+ start = 0;
+ end = 0;
+ for (int i = 0; i < NUM_TESTS; i++) {
+ for (;;) {
+ try {
+ Future<Void> retFuture = adfs.setOwner(dsts[i], "user1", "group2");
+ ownerRetFutures.put(i, retFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ start = end;
+ end = i;
+ waitForReturnValues(ownerRetFutures, start, end);
+ }
+ }
+ }
+ // wait for completing the calls
+ waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS);
+
+ // verify the owner
+ for (int i = 0; i < NUM_TESTS; i++) {
+ assertTrue(fs.exists(dsts[i]));
+ assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner()));
+ assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
+ }
+ }
+
+ static void checkAccessPermissions(FileStatus stat, FsAction mode)
+ throws IOException {
+ checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+ }
+
+ static void checkAccessPermissions(final UserGroupInformation ugi,
+ FileStatus stat, FsAction mode) throws IOException {
+ FsPermission perm = stat.getPermission();
+ String user = ugi.getShortUserName();
+ List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+ if (user.equals(stat.getOwner())) {
+ if (perm.getUserAction().implies(mode)) {
+ return;
+ }
+ } else if (groups.contains(stat.getGroup())) {
+ if (perm.getGroupAction().implies(mode)) {
+ return;
+ }
+ } else {
+ if (perm.getOtherAction().implies(mode)) {
+ return;
+ }
+ }
+ throw new AccessControlException(String.format(
+ "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+ .getPath(), stat.getOwner(), stat.getGroup(),
+ stat.isDirectory() ? "d" : "-", perm));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db41e6d2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 03c8151..8d3e509 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -19,14 +19,11 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -34,521 +31,157 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- private final long seed = Time.now();
- private final Random r = new Random(seed);
- private final PermissionGenerator permGenerator = new PermissionGenerator(r);
- private final short replFactor = 2;
+ private final short replFactor = 1;
private final long blockSize = 512;
private long fileLen = blockSize * 3;
-
- /**
- * Check the blocks of dst file are cleaned after rename with overwrite
- * Restart NN to check the rename successfully
- */
- @Test(timeout = 60000)
- public void testAsyncRenameWithOverwrite() throws Exception {
- Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
- replFactor).build();
+ private static final int NUM_TESTS = 50;
+ private static final int NUM_NN_HANDLER = 10;
+ private static final int ASYNC_CALL_LIMIT = 1000;
+
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private FileSystem fs;
+ private AsyncDistributedFileSystem adfs;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new HdfsConfiguration();
+ // set the limit of max async calls
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ ASYNC_CALL_LIMIT);
+ // set server handlers
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-
- try {
- String src = "/foo/src";
- String dst = "/foo/dst";
- String src2 = "/foo/src2";
- String dst2 = "/foo/dst2";
- Path srcPath = new Path(src);
- Path dstPath = new Path(dst);
- Path srcPath2 = new Path(src2);
- Path dstPath2 = new Path(dst2);
-
- DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
-
- LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), dst, 0, fileLen);
- LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), dst2, 0, fileLen);
- BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
- .getBlockManager();
- assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) != null);
- assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) != null);
-
- Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
- Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
- retVal1.get();
- retVal2.get();
-
- assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) == null);
- assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) == null);
+ fs = FileSystem.get(conf);
+ adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
+ }
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
- assertFalse(dfs.exists(srcPath));
- assertTrue(dfs.exists(dstPath));
- assertFalse(dfs.exists(srcPath2));
- assertTrue(dfs.exists(dstPath2));
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
+ @After
+ public void tearDown() throws IOException {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
}
}
@Test(timeout = 60000)
public void testCallGetReturnValueMultipleTimes() throws Exception {
- final Path renameDir = new Path(
- "/test/testCallGetReturnValueMultipleTimes/");
- final Configuration conf = new HdfsConfiguration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(2).build();
- cluster.waitActive();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- final int count = 100;
- final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+ final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/");
+ assertTrue(fs.mkdirs(parent));
- assertTrue(dfs.mkdirs(renameDir));
-
- try {
- // concurrently invoking many rename
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- }
-
- for (int i = 0; i < 5; i++) {
- verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
- renameDir, dfs);
- }
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
+ // prepare test
+ final Path[] srcs = new Path[NUM_TESTS];
+ final Path[] dsts = new Path[NUM_TESTS];
+ for (int i = 0; i < NUM_TESTS; i++) {
+ srcs[i] = new Path(parent, "src" + i);
+ dsts[i] = new Path(parent, "dst" + i);
+ DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
+ DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
}
- }
- private void verifyCallGetReturnValueMultipleTimes(
- Map<Integer, Future<Void>> returnFutures, int count,
- MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
- throws InterruptedException, ExecutionException, IOException {
- // wait for completing the calls
- for (int i = 0; i < count; i++) {
- returnFutures.get(i).get();
+ // concurrently invoking many rename
+ final Map<Integer, Future<Void>> reFutures =
+ new HashMap<Integer, Future<Void>>();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
+ Rename.OVERWRITE);
+ reFutures.put(i, retFuture);
}
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
+ assertEquals(NUM_TESTS, reFutures.size());
- // very the src dir should not exist, dst should
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- assertFalse(dfs.exists(src));
- assertTrue(dfs.exists(dst));
+ for (int i = 0; i < 5; i++) {
+ verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts);
}
}
- @Test
- public void testConservativeConcurrentAsyncRenameWithOverwrite()
- throws Exception {
- internalTestConcurrentAsyncRenameWithOverwrite(100,
- "testAggressiveConcurrentAsyncRenameWithOverwrite");
- }
-
- @Test(timeout = 60000)
- public void testAggressiveConcurrentAsyncRenameWithOverwrite()
- throws Exception {
- internalTestConcurrentAsyncRenameWithOverwrite(10000,
- "testConservativeConcurrentAsyncRenameWithOverwrite");
- }
-
- private void internalTestConcurrentAsyncRenameWithOverwrite(
- final int asyncCallLimit, final String basePath) throws Exception {
- final Path renameDir = new Path(String.format("/test/%s/", basePath));
- Configuration conf = new HdfsConfiguration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- asyncCallLimit);
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .build();
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- int count = 1000;
- int start = 0, end = 0;
- Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
- assertTrue(dfs.mkdirs(renameDir));
-
- try {
- // concurrently invoking many rename
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- for (;;) {
- try {
- LOG.info("rename #" + i);
- Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- /**
- * reached limit of async calls, fetch results of finished async
- * calls to let follow-on calls go
- */
- LOG.error(e);
- start = end;
- end = i;
- LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
- waitForReturnValues(returnFutures, start, end);
- }
- }
- }
-
- // wait for completing the calls
- for (int i = start; i < count; i++) {
- returnFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src dir should not exist, dst should
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- assertFalse(dfs.exists(src));
- assertTrue(dfs.exists(dst));
- }
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
+ private void verifyCallGetReturnValueMultipleTimes(
+ final Map<Integer, Future<Void>> reFutures, final Path[] srcs,
+ final Path[] dsts)
+ throws InterruptedException, ExecutionException, IOException {
- private void waitForReturnValues(
- final Map<Integer, Future<Void>> returnFutures, final int start,
- final int end) throws InterruptedException, ExecutionException {
- LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
- for (int i = start; i < end; i++) {
- LOG.info("calling Future#get #" + i);
- returnFutures.get(i).get();
- }
- }
+ // wait for completing the calls
+ waitForReturnValues(reFutures, 0, NUM_TESTS);
- @Test
- public void testConservativeConcurrentAsyncAPI() throws Exception {
- internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
+ // verify the src dir should not exist, dst should
+ verifyRenames(srcs, dsts);
}
@Test(timeout = 60000)
- public void testAggressiveConcurrentAsyncAPI() throws Exception {
- internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
- }
-
- private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
- final String basePath) throws Exception {
- Configuration conf = new HdfsConfiguration();
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
- int count = 500;
-
- // explicitly turn on permission checking
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
- // set the limit of max async calls
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- asyncCallLimit);
-
- // create fake mapping for the groups
- Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
- u2gMap.put(user1, new String[] {group1, group2});
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
- // start mini cluster
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(3).build();
- cluster.waitActive();
- AsyncDistributedFileSystem adfs = cluster.getFileSystem()
- .getAsyncDistributedFileSystem();
-
- // prepare for test
- FileSystem rootFs = FileSystem.get(conf);
- final Path parent = new Path(String.format("/test/%s/", basePath));
- final Path[] srcs = new Path[count];
- final Path[] dsts = new Path[count];
- short[] permissions = new short[count];
- for (int i = 0; i < count; i++) {
+ public void testConcurrentAsyncRename() throws Exception {
+ final Path parent = new Path(
+ String.format("/test/%s/", "testConcurrentAsyncRename"));
+ assertTrue(fs.mkdirs(parent));
+
+ // prepare test
+ final Path[] srcs = new Path[NUM_TESTS];
+ final Path[] dsts = new Path[NUM_TESTS];
+ for (int i = 0; i < NUM_TESTS; i++) {
srcs[i] = new Path(parent, "src" + i);
dsts[i] = new Path(parent, "dst" + i);
- DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
- DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
- assertTrue(rootFs.exists(srcs[i]));
- assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
- assertTrue(rootFs.exists(dsts[i]));
- assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
- permissions[i] = permGenerator.next();
+ DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
+ DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
}
- Map<Integer, Future<Void>> renameRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<Void>> permRetFutures =
- new HashMap<Integer, Future<Void>>();
- Map<Integer, Future<Void>> ownerRetFutures =
- new HashMap<Integer, Future<Void>>();
+ // concurrently invoking many rename
int start = 0, end = 0;
- // test rename
- for (int i = 0; i < count; i++) {
+ Map<Integer, Future<Void>> retFutures =
+ new HashMap<Integer, Future<Void>>();
+ for (int i = 0; i < NUM_TESTS; i++) {
for (;;) {
try {
- Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
+ LOG.info("rename #" + i);
+ Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
Rename.OVERWRITE);
- renameRetFutures.put(i, returnFuture);
+ retFutures.put(i, retFuture);
break;
} catch (AsyncCallLimitExceededException e) {
+ /**
+ * reached limit of async calls, fetch results of finished async calls
+ * to let follow-on calls go
+ */
+ LOG.error(e);
start = end;
end = i;
- waitForReturnValues(renameRetFutures, start, end);
+ LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+ waitForReturnValues(retFutures, start, end);
}
}
}
// wait for completing the calls
- for (int i = start; i < count; i++) {
- renameRetFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src should not exist, dst should
- for (int i = 0; i < count; i++) {
- assertFalse(rootFs.exists(srcs[i]));
- assertTrue(rootFs.exists(dsts[i]));
- }
-
- // test permissions
- try {
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setPermission(dsts[i],
- new FsPermission(permissions[i]));
- permRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(permRetFutures, start, end);
- }
- }
- }
- // wait for completing the calls
- for (int i = start; i < count; i++) {
- permRetFutures.get(i).get();
- }
-
- // Restart NN and check permission then
- cluster.restartNameNodes();
-
- // verify the permission
- for (int i = 0; i < count; i++) {
- assertTrue(rootFs.exists(dsts[i]));
- FsPermission fsPerm = new FsPermission(permissions[i]);
- checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
- fsPerm.getUserAction());
- }
-
- // test setOwner
- start = 0;
- end = 0;
- for (int i = 0; i < count; i++) {
- for (;;) {
- try {
- Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
- "group2");
- ownerRetFutures.put(i, retFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- start = end;
- end = i;
- waitForReturnValues(ownerRetFutures, start, end);
- }
- }
- }
- // wait for completing the calls
- for (int i = start; i < count; i++) {
- ownerRetFutures.get(i).get();
- }
+ waitForReturnValues(retFutures, end, NUM_TESTS);
- // Restart NN and check owner then
- cluster.restartNameNodes();
-
- // verify the owner
- for (int i = 0; i < count; i++) {
- assertTrue(rootFs.exists(dsts[i]));
- assertTrue(
- "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
- assertTrue(
- "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
- }
- } catch (AccessControlException ace) {
- throw ace;
- } finally {
- if (rootFs != null) {
- rootFs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ // verify the src dir should not exist, dst should
+ verifyRenames(srcs, dsts);
}
- static void checkAccessPermissions(FileStatus stat, FsAction mode)
+ private void verifyRenames(final Path[] srcs, final Path[] dsts)
throws IOException {
- checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
- }
-
- static void checkAccessPermissions(final UserGroupInformation ugi,
- FileStatus stat, FsAction mode) throws IOException {
- FsPermission perm = stat.getPermission();
- String user = ugi.getShortUserName();
- List<String> groups = Arrays.asList(ugi.getGroupNames());
-
- if (user.equals(stat.getOwner())) {
- if (perm.getUserAction().implies(mode)) {
- return;
- }
- } else if (groups.contains(stat.getGroup())) {
- if (perm.getGroupAction().implies(mode)) {
- return;
- }
- } else {
- if (perm.getOtherAction().implies(mode)) {
- return;
- }
+ for (int i = 0; i < NUM_TESTS; i++) {
+ assertFalse(fs.exists(srcs[i]));
+ assertTrue(fs.exists(dsts[i]));
}
- throw new AccessControlException(String.format(
- "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
- .getPath(), stat.getOwner(), stat.getGroup(),
- stat.isDirectory() ? "d" : "-", perm));
}
- @Test(timeout = 60000)
- public void testAsyncAPIWithException() throws Exception {
- Configuration conf = new HdfsConfiguration();
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
- UserGroupInformation ugi1;
-
- // explicitly turn on permission checking
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
- u2gMap.put(user1, new String[] {group1, group2});
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
- // Initiate all four users
- ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
- group1, group2 });
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(3).build();
- cluster.waitActive();
-
- FileSystem rootFs = FileSystem.get(conf);
- final Path renameDir = new Path("/test/async_api_exception/");
- final Path src = new Path(renameDir, "src");
- final Path dst = new Path(renameDir, "dst");
- rootFs.mkdirs(src);
-
- AsyncDistributedFileSystem adfs = ugi1
- .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
- @Override
- public AsyncDistributedFileSystem run() throws Exception {
- return cluster.getFileSystem().getAsyncDistributedFileSystem();
- }
- });
-
- Future<Void> retFuture;
- try {
- retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- retFuture.get();
- } catch (ExecutionException e) {
- TestAsyncDFS.checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the path parent", e
- .getMessage().contains(src.getParent().toUri().getPath()));
- }
-
- FsPermission fsPerm = new FsPermission(permGenerator.next());
- try {
- retFuture = adfs.setPermission(src, fsPerm);
- retFuture.get();
- } catch (ExecutionException e) {
- TestAsyncDFS.checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the name of the path",
- e.getMessage().contains(src.getName()));
- }
-
- try {
- retFuture = adfs.setOwner(src, "user1", "group2");
- retFuture.get();
- } catch (ExecutionException e) {
- TestAsyncDFS.checkPermissionDenied(e, src, user1);
- assertTrue("Permission denied messages must carry the name of the path",
- e.getMessage().contains(src.getName()));
- } finally {
- if (rootFs != null) {
- rootFs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
+ final int start, final int end)
+ throws InterruptedException, ExecutionException {
+ TestAsyncDFS.waitForReturnValues(retFutures, start, end);
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[7/8] hadoop git commit: Revert "Revert "HDFS-10430. Reuse
FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou.""
Posted by sz...@apache.org.
Revert "Revert "HDFS-10430. Reuse FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou.""
This reverts commit 8cf47d8589badfc07ef4bca3328a420c7c68abbd.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7e7b1ae0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7e7b1ae0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7e7b1ae0
Branch: refs/heads/trunk
Commit: 7e7b1ae03759da0becfef677e1d5f7a2ed9041c3
Parents: db41e6d
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:31:38 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:31:38 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 36 +-------------------
1 file changed, 1 insertion(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e7b1ae0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index ddcf492..c7615a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -34,7 +34,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,19 +45,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -445,7 +441,7 @@ public class TestAsyncDFS {
for (int i = 0; i < NUM_TESTS; i++) {
assertTrue(fs.exists(dsts[i]));
FsPermission fsPerm = new FsPermission(permissions[i]);
- checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
+ fs.access(dsts[i], fsPerm.getUserAction());
}
// test setOwner
@@ -474,34 +470,4 @@ public class TestAsyncDFS {
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
}
}
-
- static void checkAccessPermissions(FileStatus stat, FsAction mode)
- throws IOException {
- checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
- }
-
- static void checkAccessPermissions(final UserGroupInformation ugi,
- FileStatus stat, FsAction mode) throws IOException {
- FsPermission perm = stat.getPermission();
- String user = ugi.getShortUserName();
- List<String> groups = Arrays.asList(ugi.getGroupNames());
-
- if (user.equals(stat.getOwner())) {
- if (perm.getUserAction().implies(mode)) {
- return;
- }
- } else if (groups.contains(stat.getGroup())) {
- if (perm.getGroupAction().implies(mode)) {
- return;
- }
- } else {
- if (perm.getOtherAction().implies(mode)) {
- return;
- }
- }
- throw new AccessControlException(String.format(
- "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
- .getPath(), stat.getOwner(), stat.getGroup(),
- stat.isDirectory() ? "d" : "-", perm));
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[4/8] hadoop git commit: Revert "Revert "HADOOP-13168. Support
Future.get with timeout in ipc async calls.""
Posted by sz...@apache.org.
Revert "Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls.""
This reverts commit e4450d47f19131818e1c040b6bd8d85ae8250475.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/574dcd34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/574dcd34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/574dcd34
Branch: refs/heads/trunk
Commit: 574dcd34c0da1903d25e37dc5757642a584dc3d0
Parents: cba9a01
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Jun 6 16:31:23 2016 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Jun 6 16:31:23 2016 +0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/ipc/Client.java | 119 ++++++++----------
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 62 +++++-----
.../apache/hadoop/util/concurrent/AsyncGet.java | 60 +++++++++
.../hadoop/util/concurrent/AsyncGetFuture.java | 73 +++++++++++
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 124 +++++++++++--------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 24 +---
.../ClientNamenodeProtocolTranslatorPB.java | 33 ++---
7 files changed, 310 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 9be4649..d1d5b17 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,46 +18,10 @@
package org.apache.hadoop.ipc;
-import static org.apache.hadoop.ipc.RpcConstants.*;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -93,14 +57,25 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+import java.io.*;
+import java.net.*;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -119,8 +94,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<Future<?>>
- RETURN_RPC_RESPONSE = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+ = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -131,8 +106,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Future<T> getReturnRpcResponse() {
- return (Future<T>) RETURN_RPC_RESPONSE.get();
+ public static <T> Future<T> getAsyncRpcResponse() {
+ return (Future<T>) ASYNC_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -379,6 +354,11 @@ public class Client implements AutoCloseable {
}
}
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + id;
+ }
+
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
@@ -1413,27 +1393,32 @@ public class Client implements AutoCloseable {
}
if (isAsynchronousMode()) {
- Future<Writable> returnFuture = new AbstractFuture<Writable>() {
- private final AtomicBoolean callled = new AtomicBoolean(false);
+ final AsyncGet<Writable, IOException> asyncGet
+ = new AsyncGet<Writable, IOException>() {
@Override
- public Writable get() throws InterruptedException, ExecutionException {
- if (callled.compareAndSet(false, true)) {
- try {
- set(getRpcResponse(call, connection));
- } catch (IOException ie) {
- setException(ie);
- } finally {
+ public Writable get(long timeout, TimeUnit unit)
+ throws IOException, TimeoutException{
+ boolean done = true;
+ try {
+ final Writable w = getRpcResponse(call, connection, timeout, unit);
+ if (w == null) {
+ done = false;
+ throw new TimeoutException(call + " timed out "
+ + timeout + " " + unit);
+ }
+ return w;
+ } finally {
+ if (done) {
releaseAsyncCall();
}
}
- return super.get();
}
};
- RETURN_RPC_RESPONSE.set(returnFuture);
+ ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
return null;
} else {
- return getRpcResponse(call, connection);
+ return getRpcResponse(call, connection, -1, null);
}
}
@@ -1469,12 +1454,18 @@ public class Client implements AutoCloseable {
return asyncCallCounter.get();
}
- private Writable getRpcResponse(final Call call, final Connection connection)
- throws IOException {
+ /** @return the rpc response or, in case of timeout, null. */
+ private Writable getRpcResponse(final Call call, final Connection connection,
+ final long timeout, final TimeUnit unit) throws IOException {
synchronized (call) {
while (!call.done) {
try {
- call.wait(); // wait for the result
+ final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+ timeout, unit);
+ call.wait(waitTimeout); // wait for the result
+ if (waitTimeout > 0 && !call.done) {
+ return null;
+ }
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 8fcdb78..0f43fc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,21 +18,9 @@
package org.apache.hadoop.ipc;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.SocketFactory;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -52,17 +40,23 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.Message;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
+import javax.net.SocketFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* RPC Engine for for protobuf based RPCs.
@@ -70,8 +64,8 @@ import com.google.protobuf.TextFormat;
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
- private static final ThreadLocal<Callable<?>>
- RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
+ private static final ThreadLocal<AsyncGet<Message, Exception>>
+ ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -81,10 +75,9 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
- @SuppressWarnings("unchecked")
@Unstable
- public static <T> Callable<T> getReturnMessageCallback() {
- return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
+ public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
+ return ASYNC_RETURN_MESSAGE.get();
}
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -263,14 +256,17 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Client.isAsynchronousMode()) {
- final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
- Callable<Message> callback = new Callable<Message>() {
+ final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
+ final AsyncGet<Message, Exception> asyncGet
+ = new AsyncGet<Message, Exception>() {
@Override
- public Message call() throws Exception {
- return getReturnMessage(method, frrw.get());
+ public Message get(long timeout, TimeUnit unit) throws Exception {
+ final RpcResponseWrapper rrw = timeout < 0?
+ frrw.get(): frrw.get(timeout, unit);
+ return getReturnMessage(method, rrw);
}
};
- RETURN_MESSAGE_CALLBACK.set(callback);
+ ASYNC_RETURN_MESSAGE.set(asyncGet);
return null;
} else {
return getReturnMessage(method, val);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
new file mode 100644
index 0000000..5eac869
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
+ *
+ * When the return value is still being computed, invoking
+ * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
+ * The method should be invoked again and again
+ * until the underlying computation is completed.
+ *
+ * @param <R> The type of the return value.
+ * @param <E> The exception type that the underlying implementation may throw.
+ */
+public interface AsyncGet<R, E extends Throwable> {
+ /**
+ * Get the result.
+ *
+ * @param timeout The maximum time period to wait.
+ * When timeout == 0, it does not wait at all.
+ * When timeout < 0, it waits indefinitely.
+ * @param unit The unit of the timeout value
+ * @return the result, which is possibly null.
+ * @throws E an exception thrown by the underlying implementation.
+ * @throws TimeoutException if it cannot return after the given time period.
+ * @throws InterruptedException if the thread is interrupted.
+ */
+ R get(long timeout, TimeUnit unit)
+ throws E, TimeoutException, InterruptedException;
+
+ /** Utility */
+ class Util {
+ /**
+ * @return {@link Object#wait(long)} timeout converted
+ * from {@link #get(long, TimeUnit)} timeout.
+ */
+ public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
+ return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
new file mode 100644
index 0000000..d687867
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.concurrent;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** A {@link Future} implemented using an {@link AsyncGet} object. */
+public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
+ public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
+
+ private final AtomicBoolean called = new AtomicBoolean(false);
+ private final AsyncGet<T, E> asyncGet;
+
+ public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
+ this.asyncGet = asyncGet;
+ }
+
+ private void callAsyncGet(long timeout, TimeUnit unit) {
+ if (!isCancelled() && called.compareAndSet(false, true)) {
+ try {
+ set(asyncGet.get(timeout, unit));
+ } catch (TimeoutException te) {
+ LOG.trace("TRACE", te);
+ called.compareAndSet(true, false);
+ } catch (Throwable e) {
+ LOG.trace("TRACE", e);
+ setException(e);
+ }
+ }
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ callAsyncGet(-1, TimeUnit.MILLISECONDS);
+ return super.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException, ExecutionException {
+ callAsyncGet(timeout, unit);
+ return super.get(0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isDone() {
+ callAsyncGet(0, TimeUnit.MILLISECONDS);
+ return super.isDone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c..0ad191b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,20 +18,6 @@
package org.apache.hadoop.ipc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,6 +34,17 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
public class TestAsyncIPC {
private static Configuration conf;
@@ -87,26 +84,50 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
- returnFutures.put(i, returnFuture);
+ returnFutures.put(i, Client.getAsyncRpcResponse());
expectedValues.put(i, param);
} catch (Exception e) {
- LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
+ throw new RuntimeException(e);
}
}
}
- public void waitForReturnValues() throws InterruptedException,
- ExecutionException {
+ void assertReturnValues() throws InterruptedException, ExecutionException {
for (int i = 0; i < count; i++) {
LongWritable value = returnFutures.get(i).get();
- if (expectedValues.get(i) != value.get()) {
- LOG.fatal(String.format("Call-%d failed!", i));
- failed = true;
- break;
+ Assert.assertEquals("call" + i + " failed.",
+ expectedValues.get(i).longValue(), value.get());
+ }
+ Assert.assertFalse(failed);
+ }
+
+ void assertReturnValues(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException {
+ final boolean[] checked = new boolean[count];
+ for(boolean done = false; !done;) {
+ done = true;
+ for (int i = 0; i < count; i++) {
+ if (checked[i]) {
+ continue;
+ } else {
+ done = false;
+ }
+
+ final LongWritable value;
+ try {
+ value = returnFutures.get(i).get(timeout, unit);
+ } catch (TimeoutException e) {
+ LOG.info("call" + i + " caught ", e);
+ continue;
+ }
+
+ Assert.assertEquals("call" + i + " failed.",
+ expectedValues.get(i).longValue(), value.get());
+ checked[i] = true;
}
}
+ Assert.assertFalse(failed);
}
}
@@ -183,8 +204,7 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
- returnFutures.put(idx, returnFuture);
+ returnFutures.put(idx, Client.getAsyncRpcResponse());
expectedValues.put(idx, param);
}
@@ -233,10 +253,7 @@ public class TestAsyncIPC {
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
- callers[i].waitForReturnValues();
- String msg = String.format("Expected not failed for caller-%d: %s.", i,
- callers[i]);
- assertFalse(msg, callers[i].failed);
+ callers[i].assertReturnValues();
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
@@ -258,25 +275,37 @@ public class TestAsyncIPC {
try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run();
+ caller.assertReturnValues();
+ caller.assertReturnValues();
+ caller.assertReturnValues();
+ Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
- caller.waitForReturnValues();
- String msg = String.format(
- "First time, expected not failed for caller: %s.", caller);
- assertFalse(msg, caller.failed);
+ @Test(timeout = 60000)
+ public void testFutureGetWithTimeout() throws IOException,
+ InterruptedException, ExecutionException {
+// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
+ final Server server = new TestIPC.TestServer(10, true, conf);
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
- caller.waitForReturnValues();
- assertTrue(asyncCallCount == client.getAsyncCallCount());
- msg = String.format("Second time, expected not failed for caller: %s.",
- caller);
- assertFalse(msg, caller.failed);
+ final Client client = new Client(LongWritable.class, conf);
- assertTrue(asyncCallCount == client.getAsyncCallCount());
+ try {
+ final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+ caller.run();
+ caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
} finally {
client.stop();
server.stop();
}
}
+
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
@@ -367,9 +396,7 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
- caller.waitForReturnValues();
- String msg = String.format("Expected not failed for caller: %s.", caller);
- assertFalse(msg, caller.failed);
+ caller.assertReturnValues();
} finally {
client.stop();
server.stop();
@@ -406,9 +433,7 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
- caller.waitForReturnValues();
- String msg = String.format("Expected not failed for caller: %s.", caller);
- assertFalse(msg, caller.failed);
+ caller.assertReturnValues();
} finally {
client.stop();
server.stop();
@@ -443,9 +468,7 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
- caller.waitForReturnValues();
- String msg = String.format("Expected not failed for caller: %s.", caller);
- assertFalse(msg, caller.failed);
+ caller.assertReturnValues();
} finally {
client.stop();
server.stop();
@@ -489,10 +512,7 @@ public class TestAsyncIPC {
}
for (int i = 0; i < callerCount; ++i) {
callers[i].join();
- callers[i].waitForReturnValues();
- String msg = String.format("Expected not failed for caller-%d: %s.", i,
- callers[i]);
- assertFalse(msg, callers[i].failed);
+ callers[i].assertReturnValues();
}
} finally {
client.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 4fe0861..6bfd71d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,20 +19,16 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
-import com.google.common.util.concurrent.AbstractFuture;
-
/****************************************************************
* Implementation of the asynchronous distributed file system.
* This instance of this class is the way end-user code interacts
@@ -52,22 +48,8 @@ public class AsyncDistributedFileSystem {
}
static <T> Future<T> getReturnValue() {
- final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
- .getReturnValueCallback();
- Future<T> returnFuture = new AbstractFuture<T>() {
- private final AtomicBoolean called = new AtomicBoolean(false);
- public T get() throws InterruptedException, ExecutionException {
- if (called.compareAndSet(false, true)) {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
- }
- }
- return super.get();
- }
- };
- return returnFuture;
+ return new AsyncGetFuture<>(
+ ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/574dcd34/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index faa925c..939c1ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,7 +24,8 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
+
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -198,6 +199,7 @@ import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
+import org.apache.hadoop.util.concurrent.AsyncGet;
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -209,8 +211,8 @@ import com.google.protobuf.ServiceException;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
- private static final ThreadLocal<Callable<?>>
- RETURN_VALUE_CALLBACK = new ThreadLocal<>();
+ private static final ThreadLocal<AsyncGet<?, Exception>>
+ ASYNC_RETURN_VALUE = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -246,8 +248,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Callable<T> getReturnValueCallback() {
- return (Callable<T>) RETURN_VALUE_CALLBACK.get();
+ public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+ return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
}
@Override
@@ -369,7 +371,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.setPermission(null, req);
- setReturnValueCallback();
+ setAsyncReturnValue();
} else {
rpcProxy.setPermission(null, req);
}
@@ -378,17 +380,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
- private void setReturnValueCallback() {
- final Callable<Message> returnMessageCallback = ProtobufRpcEngine
- .getReturnMessageCallback();
- Callable<Void> callBack = new Callable<Void>() {
+ private void setAsyncReturnValue() {
+ final AsyncGet<Message, Exception> asyncReturnMessage
+ = ProtobufRpcEngine.getAsyncReturnMessage();
+ final AsyncGet<Void, Exception> asyncGet
+ = new AsyncGet<Void, Exception>() {
@Override
- public Void call() throws Exception {
- returnMessageCallback.call();
+ public Void get(long timeout, TimeUnit unit) throws Exception {
+ asyncReturnMessage.get(timeout, unit);
return null;
}
};
- RETURN_VALUE_CALLBACK.set(callBack);
+ ASYNC_RETURN_VALUE.set(asyncGet);
}
@Override
@@ -403,7 +406,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.setOwner(null, req.build());
- setReturnValueCallback();
+ setAsyncReturnValue();
} else {
rpcProxy.setOwner(null, req.build());
}
@@ -536,7 +539,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req);
- setReturnValueCallback();
+ setAsyncReturnValue();
} else {
rpcProxy.rename2(null, req);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org