You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [23/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
@@ -49,8 +50,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -58,49 +62,28 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
-import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
-import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
-import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
-import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
-import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
-import org.apache.hadoop.hdfs.web.resources.DelegationParam;
-import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
-import org.apache.hadoop.hdfs.web.resources.DestinationParam;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.GroupParam;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
-import org.apache.hadoop.hdfs.web.resources.OwnerParam;
-import org.apache.hadoop.hdfs.web.resources.Param;
-import org.apache.hadoop.hdfs.web.resources.PermissionParam;
-import org.apache.hadoop.hdfs.web.resources.PostOpParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
-import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
-import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
-import org.apache.hadoop.hdfs.web.resources.RenewerParam;
-import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
-import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
-import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.util.Progressable;
import org.mortbay.util.ajax.JSON;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/** A FileSystem for HDFS over the web. */
@@ -119,7 +102,7 @@ public class WebHdfsFileSystem extends F
/** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
- protected TokenAspect<? extends WebHdfsFileSystem> tokenAspect;
+ private boolean canRefreshDelegationToken;
private UserGroupInformation ugi;
private URI uri;
@@ -148,13 +131,8 @@ public class WebHdfsFileSystem extends F
return "http";
}
- /**
- * Initialize tokenAspect. This function is intended to
- * be overridden by SWebHdfsFileSystem.
- */
- protected synchronized void initializeTokenAspect() {
- tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, tokenServiceName,
- TOKEN_KIND);
+ protected Text getTokenKind() {
+ return TOKEN_KIND;
}
@Override
@@ -175,12 +153,14 @@ public class WebHdfsFileSystem extends F
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.nnAddrs = resolveNNAddr();
- boolean isHA = HAUtil.isLogicalUri(conf, this.uri);
- // In non-HA case, the code needs to call getCanonicalUri() in order to
- // handle the case where no port is specified in the URI
- this.tokenServiceName = isHA ? HAUtil.buildTokenServiceForLogicalUri(uri)
+ boolean isHA = HAUtil.isClientFailoverConfigured(conf, this.uri);
+ boolean isLogicalUri = isHA && HAUtil.isLogicalUri(conf, this.uri);
+ // In non-HA or non-logical URI case, the code needs to call
+ // getCanonicalUri() in order to handle the case where no port is
+ // specified in the URI
+ this.tokenServiceName = isLogicalUri ?
+ HAUtil.buildTokenServiceForLogicalUri(uri, getScheme())
: SecurityUtil.buildTokenService(getCanonicalUri());
- initializeTokenAspect();
if (!isHA) {
this.retryPolicy =
@@ -213,10 +193,8 @@ public class WebHdfsFileSystem extends F
}
this.workingDir = getHomeDirectory();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- tokenAspect.initDelegationToken(ugi);
- }
+ this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
+ this.delegationToken = null;
}
@Override
@@ -231,11 +209,46 @@ public class WebHdfsFileSystem extends F
return b;
}
+ TokenSelector<DelegationTokenIdentifier> tokenSelector =
+ new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
+
+ // the first getAuthParams() for a non-token op will either get the
+ // internal token from the ugi or lazy fetch one
protected synchronized Token<?> getDelegationToken() throws IOException {
- tokenAspect.ensureTokenInitialized();
+ if (canRefreshDelegationToken && delegationToken == null) {
+ Token<?> token = tokenSelector.selectToken(
+ new Text(getCanonicalServiceName()), ugi.getTokens());
+ // ugi tokens are usually indicative of a task which can't
+ // refetch tokens. even if ugi has credentials, don't attempt
+ // to get another token to match hdfs/rpc behavior
+ if (token != null) {
+ LOG.debug("Using UGI token: " + token);
+ canRefreshDelegationToken = false;
+ } else {
+ token = getDelegationToken(null);
+ if (token != null) {
+ LOG.debug("Fetched new token: " + token);
+ } else { // security is disabled
+ canRefreshDelegationToken = false;
+ }
+ }
+ setDelegationToken(token);
+ }
return delegationToken;
}
+ @VisibleForTesting
+ synchronized boolean replaceExpiredDelegationToken() throws IOException {
+ boolean replaced = false;
+ if (canRefreshDelegationToken) {
+ Token<?> token = getDelegationToken(null);
+ LOG.debug("Replaced expired token: " + token);
+ setDelegationToken(token);
+ replaced = (token != null);
+ }
+ return replaced;
+ }
+
@Override
protected int getDefaultPort() {
return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
@@ -306,8 +319,8 @@ public class WebHdfsFileSystem extends F
final int code = conn.getResponseCode();
// server is demanding an authentication we don't support
if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
- throw new IOException(
- new AuthenticationException(conn.getResponseMessage()));
+ // match hdfs/rpc exception
+ throw new AccessControlException(conn.getResponseMessage());
}
if (code != op.getExpectedHttpResponseCode()) {
final Map<?, ?> m;
@@ -327,7 +340,15 @@ public class WebHdfsFileSystem extends F
return m;
}
- final RemoteException re = JsonUtil.toRemoteException(m);
+ IOException re = JsonUtil.toRemoteException(m);
+ // extract UGI-related exceptions and unwrap InvalidToken
+ // the NN mangles these exceptions but the DN does not and may need
+ // to re-fetch a token if either report the token is expired
+ if (re.getMessage().startsWith("Failed to obtain user group information:")) {
+ String[] parts = re.getMessage().split(":\\s+", 3);
+ re = new RemoteException(parts[1], parts[2]);
+ re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
+ }
throw unwrapException? toIOException(re): re;
}
return null;
@@ -362,8 +383,6 @@ public class WebHdfsFileSystem extends F
*/
private synchronized void resetStateToFailOver() {
currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
- delegationToken = null;
- tokenAspect.reset();
}
/**
@@ -389,7 +408,7 @@ public class WebHdfsFileSystem extends F
// Skip adding delegation token for token operations because these
// operations require authentication.
Token<?> token = null;
- if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) {
+ if (!op.getRequireAuth()) {
token = getDelegationToken();
}
if (token != null) {
@@ -422,40 +441,24 @@ public class WebHdfsFileSystem extends F
}
/**
- * Run a http operation.
- * Connect to the http server, validate response, and obtain the JSON output.
- *
- * @param op http operation
- * @param fspath file system path
- * @param parameters parameters for the operation
- * @return a JSON object, e.g. Object[], Map<?, ?>, etc.
- * @throws IOException
- */
- private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
- final Param<?,?>... parameters) throws IOException {
- return new FsPathRunner(op, fspath, parameters).run().json;
- }
-
- /**
* This class is for initialing a HTTP connection, connecting to server,
* obtaining a response, and also handling retry on failures.
*/
- abstract class AbstractRunner {
+ abstract class AbstractRunner<T> {
abstract protected URL getUrl() throws IOException;
protected final HttpOpParam.Op op;
private final boolean redirected;
+ protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
private boolean checkRetry;
- protected HttpURLConnection conn = null;
- private Map<?, ?> json = null;
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op;
this.redirected = redirected;
}
- AbstractRunner run() throws IOException {
+ T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
@@ -467,9 +470,9 @@ public class WebHdfsFileSystem extends F
// the entire lifecycle of the connection must be run inside the
// doAs to ensure authentication is performed correctly
return connectUgi.doAs(
- new PrivilegedExceptionAction<AbstractRunner>() {
+ new PrivilegedExceptionAction<T>() {
@Override
- public AbstractRunner run() throws IOException {
+ public T run() throws IOException {
return runWithRetry();
}
});
@@ -477,32 +480,97 @@ public class WebHdfsFileSystem extends F
throw new IOException(e);
}
}
-
- private void init() throws IOException {
- checkRetry = !redirected;
- URL url = getUrl();
- conn = (HttpURLConnection) connectionFactory.openConnection(url);
- }
-
- private void connect() throws IOException {
- connect(op.getDoOutput());
+
+ /**
+ * Two-step requests redirected to a DN
+ *
+ * Create/Append:
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
+ * Step 2) Submit another Http request with the URL from the Location header with data.
+ *
+ * The reason of having two-step create/append is for preventing clients to
+ * send out the data before the redirect. This issue is addressed by the
+ * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+ * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+ * and Java 6 http client), which do not correctly implement "Expect:
+ * 100-continue". The two-step create/append is a temporary workaround for
+ * the software library bugs.
+ *
+ * Open/Checksum
+ * Also implements two-step connects for other operations redirected to
+ * a DN such as open and checksum
+ */
+ private HttpURLConnection connect(URL url) throws IOException {
+ //redirect hostname and port
+ String redirectHost = null;
+
+
+ // resolve redirects for a DN operation unless already resolved
+ if (op.getRedirect() && !redirected) {
+ final HttpOpParam.Op redirectOp =
+ HttpOpParam.TemporaryRedirectOp.valueOf(op);
+ final HttpURLConnection conn = connect(redirectOp, url);
+ // application level proxy like httpfs might not issue a redirect
+ if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
+ return conn;
+ }
+ try {
+ validateResponse(redirectOp, conn, false);
+ url = new URL(conn.getHeaderField("Location"));
+ redirectHost = url.getHost() + ":" + url.getPort();
+ } finally {
+ conn.disconnect();
+ }
+ }
+ try {
+ return connect(op, url);
+ } catch (IOException ioe) {
+ if (redirectHost != null) {
+ if (excludeDatanodes.getValue() != null) {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ + excludeDatanodes.getValue());
+ } else {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+ }
+ }
+ throw ioe;
+ }
}
- private void connect(boolean doOutput) throws IOException {
+ private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
+ throws IOException {
+ final HttpURLConnection conn =
+ (HttpURLConnection)connectionFactory.openConnection(url);
+ final boolean doOutput = op.getDoOutput();
conn.setRequestMethod(op.getType().toString());
- conn.setDoOutput(doOutput);
conn.setInstanceFollowRedirects(false);
- conn.connect();
- }
-
- private void disconnect() {
- if (conn != null) {
- conn.disconnect();
- conn = null;
+ switch (op.getType()) {
+ // if not sending a message body for a POST or PUT operation, need
+ // to ensure the server/proxy knows this
+ case POST:
+ case PUT: {
+ conn.setDoOutput(true);
+ if (!doOutput) {
+ // explicitly setting content-length to 0 won't do spnego!!
+ // opening and closing the stream will send "Content-Length: 0"
+ conn.getOutputStream().close();
+ } else {
+ conn.setRequestProperty("Content-Type",
+ MediaType.APPLICATION_OCTET_STREAM);
+ conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
+ }
+ break;
+ }
+ default: {
+ conn.setDoOutput(doOutput);
+ break;
+ }
}
+ conn.connect();
+ return conn;
}
- private AbstractRunner runWithRetry() throws IOException {
+ private T runWithRetry() throws IOException {
/**
* Do the real work.
*
@@ -520,19 +588,26 @@ public class WebHdfsFileSystem extends F
* examines the exception and swallows it if it decides to rerun the work.
*/
for(int retry = 0; ; retry++) {
+ checkRetry = !redirected;
+ final URL url = getUrl();
try {
- init();
- if (op.getDoOutput()) {
- twoStepWrite();
- } else {
- getResponse(op != GetOpParam.Op.OPEN);
+ final HttpURLConnection conn = connect(url);
+ // output streams will validate on close
+ if (!op.getDoOutput()) {
+ validateResponse(op, conn, false);
}
- return this;
- } catch(IOException ioe) {
- Throwable cause = ioe.getCause();
- if (cause != null && cause instanceof AuthenticationException) {
- throw ioe; // no retries for auth failures
+ return getResponse(conn);
+ } catch (AccessControlException ace) {
+ // no retries for auth failures
+ throw ace;
+ } catch (InvalidToken it) {
+ // try to replace the expired token with a new one. the attempt
+ // to acquire a new token must be outside this operation's retry
+ // so if it fails after its own retries, this operation fails too.
+ if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
+ throw it;
}
+ } catch (IOException ioe) {
shouldRetry(ioe, retry);
}
}
@@ -570,87 +645,159 @@ public class WebHdfsFileSystem extends F
throw toIOException(ioe);
}
- /**
- * Two-step Create/Append:
- * Step 1) Submit a Http request with neither auto-redirect nor data.
- * Step 2) Submit another Http request with the URL from the Location header with data.
- *
- * The reason of having two-step create/append is for preventing clients to
- * send out the data before the redirect. This issue is addressed by the
- * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
- * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
- * and Java 6 http client), which do not correctly implement "Expect:
- * 100-continue". The two-step create/append is a temporary workaround for
- * the software library bugs.
- */
- HttpURLConnection twoStepWrite() throws IOException {
- //Step 1) Submit a Http request with neither auto-redirect nor data.
- connect(false);
- validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
- final String redirect = conn.getHeaderField("Location");
- disconnect();
- checkRetry = false;
-
- //Step 2) Submit another Http request with the URL from the Location header with data.
- conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
- redirect));
- conn.setRequestProperty("Content-Type",
- MediaType.APPLICATION_OCTET_STREAM);
- conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
- connect();
- return conn;
+ abstract T getResponse(HttpURLConnection conn) throws IOException;
+ }
+
+ /**
+ * Abstract base class to handle path-based operations with params
+ */
+ abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
+ private final Path fspath;
+ private final Param<?,?>[] parameters;
+
+ AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
+ Param<?,?>... parameters) {
+ super(op, false);
+ this.fspath = fspath;
+ this.parameters = parameters;
+ }
+
+ AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+ final Path fspath) {
+ super(op, false);
+ this.fspath = fspath;
+ this.parameters = parameters;
+ }
+
+ @Override
+ protected URL getUrl() throws IOException {
+ if (excludeDatanodes.getValue() != null) {
+ Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
+ System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+ tmpParam[parameters.length] = excludeDatanodes;
+ return toUrl(op, fspath, tmpParam);
+ } else {
+ return toUrl(op, fspath, parameters);
+ }
}
+ }
- FSDataOutputStream write(final int bufferSize) throws IOException {
- return WebHdfsFileSystem.this.write(op, conn, bufferSize);
+ /**
+ * Default path-based implementation expects no json response
+ */
+ class FsPathRunner extends AbstractFsPathRunner<Void> {
+ FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ @Override
+ Void getResponse(HttpURLConnection conn) throws IOException {
+ return null;
}
+ }
- void getResponse(boolean getJsonAndDisconnect) throws IOException {
+ /**
+ * Handle path-based operations with a json response
+ */
+ abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
+ FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
+ Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+ final Path fspath) {
+ super(op, parameters, fspath);
+ }
+
+ @Override
+ final T getResponse(HttpURLConnection conn) throws IOException {
try {
- connect();
- final int code = conn.getResponseCode();
- if (!redirected && op.getRedirect()
- && code != op.getExpectedHttpResponseCode()) {
- final String redirect = conn.getHeaderField("Location");
- json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
- conn, false);
- disconnect();
-
- checkRetry = false;
- conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
- redirect));
- connect();
+ final Map<?,?> json = jsonParse(conn, false);
+ if (json == null) {
+ // match exception class thrown by parser
+ throw new IllegalStateException("Missing response");
}
-
- json = validateResponse(op, conn, false);
- if (json == null && getJsonAndDisconnect) {
- json = jsonParse(conn, false);
+ return decodeResponse(json);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception e) { // catch json parser errors
+ final IOException ioe =
+ new IOException("Response decoding failure: "+e.toString(), e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(ioe);
}
+ throw ioe;
} finally {
- if (getJsonAndDisconnect) {
- disconnect();
- }
+ conn.disconnect();
}
}
+
+ abstract T decodeResponse(Map<?,?> json) throws IOException;
}
- final class FsPathRunner extends AbstractRunner {
- private final Path fspath;
- private final Param<?, ?>[] parameters;
-
- FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
- super(op, false);
- this.fspath = fspath;
- this.parameters = parameters;
+ /**
+ * Handle path-based operations with json boolean response
+ */
+ class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
+ FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ @Override
+ Boolean decodeResponse(Map<?,?> json) throws IOException {
+ return (Boolean)json.get("boolean");
}
+ }
+ /**
+ * Handle create/append output streams
+ */
+ class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
+ private final int bufferSize;
+
+ FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
+ Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ this.bufferSize = bufferSize;
+ }
+
@Override
- protected URL getUrl() throws IOException {
- return toUrl(op, fspath, parameters);
+ FSDataOutputStream getResponse(final HttpURLConnection conn)
+ throws IOException {
+ return new FSDataOutputStream(new BufferedOutputStream(
+ conn.getOutputStream(), bufferSize), statistics) {
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ try {
+ validateResponse(op, conn, true);
+ } finally {
+ conn.disconnect();
+ }
+ }
+ }
+ };
}
}
- final class URLRunner extends AbstractRunner {
+ class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
+ FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+ @Override
+ HttpURLConnection getResponse(final HttpURLConnection conn)
+ throws IOException {
+ return conn;
+ }
+ }
+
+ /**
+ * Used by open() which tracks the resolved url itself
+ */
+ final class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url;
@Override
protected URL getUrl() {
@@ -661,6 +808,11 @@ public class WebHdfsFileSystem extends F
super(op, redirected);
this.url = url;
}
+
+ @Override
+ HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
+ return conn;
+ }
}
private FsPermission applyUMask(FsPermission permission) {
@@ -672,8 +824,12 @@ public class WebHdfsFileSystem extends F
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
- final Map<?, ?> json = run(op, f);
- final HdfsFileStatus status = JsonUtil.toFileStatus(json, true);
+ HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
+ @Override
+ HdfsFileStatus decodeResponse(Map<?,?> json) {
+ return JsonUtil.toFileStatus(json, true);
+ }
+ }.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
@@ -697,8 +853,12 @@ public class WebHdfsFileSystem extends F
@Override
public AclStatus getAclStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
- final Map<?, ?> json = run(op, f);
- AclStatus status = JsonUtil.toAclStatus(json);
+ AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
+ @Override
+ AclStatus decodeResponse(Map<?,?> json) {
+ return JsonUtil.toAclStatus(json);
+ }
+ }.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
@@ -709,9 +869,9 @@ public class WebHdfsFileSystem extends F
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
- final Map<?, ?> json = run(op, f,
- new PermissionParam(applyUMask(permission)));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, f,
+ new PermissionParam(applyUMask(permission))
+ ).run();
}
/**
@@ -722,17 +882,19 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
- run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()),
- new CreateParentParam(createParent));
+ new FsPathRunner(op, f,
+ new DestinationParam(makeQualified(destination).toUri().getPath()),
+ new CreateParentParam(createParent)
+ ).run();
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
- final Map<?, ?> json = run(op, src,
- new DestinationParam(makeQualified(dst).toUri().getPath()));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, src,
+ new DestinationParam(makeQualified(dst).toUri().getPath())
+ ).run();
}
@SuppressWarnings("deprecation")
@@ -741,8 +903,87 @@ public class WebHdfsFileSystem extends F
final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
- run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()),
- new RenameOptionSetParam(options));
+ new FsPathRunner(op, src,
+ new DestinationParam(makeQualified(dst).toUri().getPath()),
+ new RenameOptionSetParam(options)
+ ).run();
+ }
+
+ @Override
+ public void setXAttr(Path p, String name, byte[] value,
+ EnumSet<XAttrSetFlag> flag) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
+ if (value != null) {
+ new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
+ XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
+ new XAttrSetFlagParam(flag)).run();
+ } else {
+ new FsPathRunner(op, p, new XAttrNameParam(name),
+ new XAttrSetFlagParam(flag)).run();
+ }
+ }
+
+ @Override
+ public byte[] getXAttr(Path p, final String name) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+ return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
+ new XAttrEncodingParam(XAttrCodec.HEX)) {
+ @Override
+ byte[] decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtil.getXAttr(json, name);
+ }
+ }.run();
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path p) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+ return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
+ new XAttrEncodingParam(XAttrCodec.HEX)) {
+ @Override
+ Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtil.toXAttrs(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
+ throws IOException {
+ Preconditions.checkArgument(names != null && !names.isEmpty(),
+ "XAttr names cannot be null or empty.");
+ Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
+ for (int i = 0; i < parameters.length - 1; i++) {
+ parameters[i] = new XAttrNameParam(names.get(i));
+ }
+ parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
+
+ final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+ return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
+ @Override
+ Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtil.toXAttrs(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public List<String> listXAttrs(Path p) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
+ return new FsPathResponseRunner<List<String>>(op, p) {
+ @Override
+ List<String> decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtil.toXAttrNames(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public void removeXAttr(Path p, String name) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
+ new FsPathRunner(op, p, new XAttrNameParam(name)).run();
}
@Override
@@ -754,7 +995,9 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
- run(op, p, new OwnerParam(owner), new GroupParam(group));
+ new FsPathRunner(op, p,
+ new OwnerParam(owner), new GroupParam(group)
+ ).run();
}
@Override
@@ -762,7 +1005,7 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
- run(op, p, new PermissionParam(permission));
+ new FsPathRunner(op, p,new PermissionParam(permission)).run();
}
@Override
@@ -770,7 +1013,7 @@ public class WebHdfsFileSystem extends F
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
- run(op, path, new AclPermissionParam(aclSpec));
+ new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
@@ -778,21 +1021,21 @@ public class WebHdfsFileSystem extends F
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
- run(op, path, new AclPermissionParam(aclSpec));
+ new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
- run(op, path);
+ new FsPathRunner(op, path).run();
}
@Override
public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
- run(op, path);
+ new FsPathRunner(op, path).run();
}
@Override
@@ -800,7 +1043,39 @@ public class WebHdfsFileSystem extends F
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETACL;
- run(op, p, new AclPermissionParam(aclSpec));
+ new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
+ }
+
+ @Override
+ public Path createSnapshot(final Path path, final String snapshotName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
+ Path spath = new FsPathResponseRunner<Path>(op, path,
+ new SnapshotNameParam(snapshotName)) {
+ @Override
+ Path decodeResponse(Map<?,?> json) {
+ return new Path((String) json.get(Path.class.getSimpleName()));
+ }
+ }.run();
+ return spath;
+ }
+
+ @Override
+ public void deleteSnapshot(final Path path, final String snapshotName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
+ new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
+ }
+
+ @Override
+ public void renameSnapshot(final Path path, final String snapshotOldName,
+ final String snapshotNewName) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
+ new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
+ new SnapshotNameParam(snapshotNewName)).run();
}
@Override
@@ -808,8 +1083,9 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
- final Map<?, ?> json = run(op, p, new ReplicationParam(replication));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, p,
+ new ReplicationParam(replication)
+ ).run();
}
@Override
@@ -817,7 +1093,10 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
- run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
+ new FsPathRunner(op, p,
+ new ModificationTimeParam(mtime),
+ new AccessTimeParam(atime)
+ ).run();
}
@Override
@@ -832,32 +1111,11 @@ public class WebHdfsFileSystem extends F
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
}
- FSDataOutputStream write(final HttpOpParam.Op op,
- final HttpURLConnection conn, final int bufferSize) throws IOException {
- return new FSDataOutputStream(new BufferedOutputStream(
- conn.getOutputStream(), bufferSize), statistics) {
- @Override
- public void close() throws IOException {
- try {
- super.close();
- } finally {
- try {
- validateResponse(op, conn, true);
- } finally {
- conn.disconnect();
- }
- }
- }
- };
- }
-
@Override
public void concat(final Path trg, final Path [] srcs) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
-
- ConcatSourcesParam param = new ConcatSourcesParam(srcs);
- run(op, trg, param);
+ new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
}
@Override
@@ -867,14 +1125,13 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
- return new FsPathRunner(op, f,
+ return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(applyUMask(permission)),
new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
- new BlockSizeParam(blockSize))
- .run()
- .write(bufferSize);
+ new BlockSizeParam(blockSize)
+ ).run();
}
@Override
@@ -883,16 +1140,17 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
- return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
- .run()
- .write(bufferSize);
+ return new FsPathOutputStreamRunner(op, f, bufferSize,
+ new BufferSizeParam(bufferSize)
+ ).run();
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
- final Map<?, ?> json = run(op, f, new RecursiveParam(recursive));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, f,
+ new RecursiveParam(recursive)
+ ).run();
}
@Override
@@ -900,16 +1158,41 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
- final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
+ // use a runner so the open can recover from an invalid token
+ FsPathConnectionRunner runner =
+ new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
return new FSDataInputStream(new OffsetUrlInputStream(
- new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
+ new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
}
@Override
- public void close() throws IOException {
- super.close();
- synchronized (this) {
- tokenAspect.removeRenewAction();
+ public synchronized void close() throws IOException {
+ try {
+ if (canRefreshDelegationToken && delegationToken != null) {
+ cancelDelegationToken(delegationToken);
+ }
+ } catch (IOException ioe) {
+ LOG.debug("Token cancel failed: "+ioe);
+ } finally {
+ super.close();
+ }
+ }
+
+ // use FsPathConnectionRunner to ensure retries for InvalidTokens
+ class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
+ private final FsPathConnectionRunner runner;
+ UnresolvedUrlOpener(FsPathConnectionRunner runner) {
+ super(null);
+ this.runner = runner;
+ }
+
+ @Override
+ protected HttpURLConnection connect(long offset, boolean resolved)
+ throws IOException {
+ assert offset == 0;
+ HttpURLConnection conn = runner.run();
+ setURL(conn.getURL());
+ return conn;
}
}
@@ -924,7 +1207,7 @@ public class WebHdfsFileSystem extends F
final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset));
- return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
+ return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
}
}
@@ -962,7 +1245,7 @@ public class WebHdfsFileSystem extends F
}
static class OffsetUrlInputStream extends ByteRangeInputStream {
- OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r)
+ OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
throws IOException {
super(o, r);
}
@@ -980,25 +1263,36 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
- final Map<?, ?> json = run(op, f);
- final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
- final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
-
- //convert FileStatus
- final FileStatus[] statuses = new FileStatus[array.length];
- for(int i = 0; i < array.length; i++) {
- final Map<?, ?> m = (Map<?, ?>)array[i];
- statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
- }
- return statuses;
+ return new FsPathResponseRunner<FileStatus[]>(op, f) {
+ @Override
+ FileStatus[] decodeResponse(Map<?,?> json) {
+ final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
+ final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
+
+ //convert FileStatus
+ final FileStatus[] statuses = new FileStatus[array.length];
+ for (int i = 0; i < array.length; i++) {
+ final Map<?, ?> m = (Map<?, ?>)array[i];
+ statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
+ }
+ return statuses;
+ }
+ }.run();
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
- final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
- final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
+ Token<DelegationTokenIdentifier> token =
+ new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
+ op, null, new RenewerParam(renewer)) {
+ @Override
+ Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
+ throws IOException {
+ return JsonUtil.toDelegationToken(json);
+ }
+ }.run();
token.setService(tokenServiceName);
return token;
}
@@ -1020,19 +1314,22 @@ public class WebHdfsFileSystem extends F
public synchronized long renewDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
- TokenArgumentParam dtargParam = new TokenArgumentParam(
- token.encodeToUrlString());
- final Map<?, ?> m = run(op, null, dtargParam);
- return (Long) m.get("long");
+ return new FsPathResponseRunner<Long>(op, null,
+ new TokenArgumentParam(token.encodeToUrlString())) {
+ @Override
+ Long decodeResponse(Map<?,?> json) throws IOException {
+ return (Long) json.get("long");
+ }
+ }.run();
}
@Override
public synchronized void cancelDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
- TokenArgumentParam dtargParam = new TokenArgumentParam(
- token.encodeToUrlString());
- run(op, null, dtargParam);
+ new FsPathRunner(op, null,
+ new TokenArgumentParam(token.encodeToUrlString())
+ ).run();
}
@Override
@@ -1050,9 +1347,20 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
- final Map<?, ?> m = run(op, p, new OffsetParam(offset),
- new LengthParam(length));
- return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
+ return new FsPathResponseRunner<BlockLocation[]>(op, p,
+ new OffsetParam(offset), new LengthParam(length)) {
+ @Override
+ BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
+ return DFSUtil.locatedBlocks2Locations(
+ JsonUtil.toLocatedBlocks(json));
+ }
+ }.run();
+ }
+
+ @Override
+ public void access(final Path path, final FsAction mode) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
+ new FsPathRunner(op, path, new FsActionParam(mode)).run();
}
@Override
@@ -1060,8 +1368,12 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
- final Map<?, ?> m = run(op, p);
- return JsonUtil.toContentSummary(m);
+ return new FsPathResponseRunner<ContentSummary>(op, p) {
+ @Override
+ ContentSummary decodeResponse(Map<?,?> json) {
+ return JsonUtil.toContentSummary(json);
+ }
+ }.run();
}
@Override
@@ -1070,15 +1382,19 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
- final Map<?, ?> m = run(op, p);
- return JsonUtil.toMD5MD5CRC32FileChecksum(m);
+ return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
+ @Override
+ MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
+ return JsonUtil.toMD5MD5CRC32FileChecksum(json);
+ }
+ }.run();
}
/**
* Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
* resolver when the URL points to an non-HA cluster. When the URL points to
- * an HA cluster, the resolver further resolves the logical name (i.e., the
- * authority in the URL) into real namenode addresses.
+ * an HA cluster with its logical name, the resolver further resolves the
+ * logical name(i.e., the authority in the URL) into real namenode addresses.
*/
private InetSocketAddress[] resolveNNAddr() throws IOException {
Configuration conf = getConf();
@@ -1095,10 +1411,10 @@ public class WebHdfsFileSystem extends F
Map<String, Map<String, InetSocketAddress>> addresses = DFSUtil
.getHaNnWebHdfsAddresses(conf, scheme);
- for (Map<String, InetSocketAddress> addrs : addresses.values()) {
- for (InetSocketAddress addr : addrs.values()) {
- ret.add(addr);
- }
+ // Extract the entry corresponding to the logical name.
+ Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
+ for (InetSocketAddress addr : addrs.values()) {
+ ret.add(addr);
}
}
@@ -1111,4 +1427,9 @@ public class WebHdfsFileSystem extends F
return tokenServiceName == null ? super.getCanonicalServiceName()
: tokenServiceName.toString();
}
+
+ @VisibleForTesting
+ InetSocketAddress[] getResolvedNNAddr() {
+ return nnAddrs;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java Tue Aug 19 23:49:39 2014
@@ -60,10 +60,7 @@ public class AclPermissionParam extends
}
/**
- * Parse the list of AclEntry and returns aclspec.
- *
- * @param List <AclEntry>
- * @return String
+ * @return parse {@code aclEntry} and return aclspec
*/
private static String parseAclSpec(List<AclEntry> aclEntry) {
return StringUtils.join(aclEntry, ",");
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java Tue Aug 19 23:49:39 2014
@@ -24,6 +24,7 @@ public class DeleteOpParam extends HttpO
/** Delete operations. */
public static enum Op implements HttpOpParam.Op {
DELETE(HttpURLConnection.HTTP_OK),
+ DELETESNAPSHOT(HttpURLConnection.HTTP_OK),
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java Tue Aug 19 23:49:39 2014
@@ -79,8 +79,8 @@ abstract class EnumSetParam<E extends En
final EnumSet<E> set = EnumSet.noneOf(enumClass);
if (!str.isEmpty()) {
for(int i, j = 0; j >= 0; ) {
- i = j;
- j = str.indexOf(',', i+1);
+ i = j > 0 ? j + 1 : 0;
+ j = str.indexOf(',', i);
final String sub = j >= 0? str.substring(i, j): str.substring(i);
set.add(Enum.valueOf(enumClass, sub.trim().toUpperCase()));
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java Tue Aug 19 23:49:39 2014
@@ -31,8 +31,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.ParamException;
import com.sun.jersey.api.container.ContainerException;
@@ -42,9 +45,22 @@ public class ExceptionHandler implements
public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
private static Exception toCause(Exception e) {
- final Throwable t = e.getCause();
- if (t != null && t instanceof Exception) {
- e = (Exception)e.getCause();
+ final Throwable t = e.getCause();
+ if (e instanceof SecurityException) {
+ // For the issue reported in HDFS-6475, if SecurityException's cause
+ // is InvalidToken, and the InvalidToken's cause is StandbyException,
+ // return StandbyException; Otherwise, leave the exception as is,
+ // since they are handled elsewhere. See HDFS-6588.
+ if (t != null && t instanceof InvalidToken) {
+ final Throwable t1 = t.getCause();
+ if (t1 != null && t1 instanceof StandbyException) {
+ e = (StandbyException)t1;
+ }
+ }
+ } else {
+ if (t != null && t instanceof Exception) {
+ e = (Exception)t;
+ }
}
return e;
}
@@ -74,6 +90,10 @@ public class ExceptionHandler implements
e = ((RemoteException)e).unwrapRemoteException();
}
+ if (e instanceof SecurityException) {
+ e = toCause(e);
+ }
+
//Map response status
final Response.Status s;
if (e instanceof SecurityException) {
@@ -96,4 +116,9 @@ public class ExceptionHandler implements
final String js = JsonUtil.toJsonString(e);
return Response.status(s).type(MediaType.APPLICATION_JSON).entity(js).build();
}
+
+ @VisibleForTesting
+ public void initResponse(HttpServletResponse response) {
+ this.response = response;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java Tue Aug 19 23:49:39 2014
@@ -36,8 +36,12 @@ public class GetOpParam extends HttpOpPa
/** GET_BLOCK_LOCATIONS is a private unstable op. */
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
+ GETXATTRS(false, HttpURLConnection.HTTP_OK),
+ LISTXATTRS(false, HttpURLConnection.HTTP_OK),
- NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+ NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
+
+ CHECKACCESS(false, HttpURLConnection.HTTP_OK);
final boolean redirect;
final int expectedHttpResponseCode;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Tue Aug 19 23:49:39 2014
@@ -71,8 +71,8 @@ public abstract class HttpOpParam<E exte
GetOpParam.Op.GETFILECHECKSUM);
static final List<TemporaryRedirectOp> values
- = Collections.unmodifiableList(Arrays.asList(
- new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM}));
+ = Collections.unmodifiableList(Arrays.asList(CREATE, APPEND, OPEN,
+ GETFILECHECKSUM));
/** Get an object for the given op. */
public static TemporaryRedirectOp valueOf(final Op op) {
@@ -102,7 +102,7 @@ public abstract class HttpOpParam<E exte
@Override
public boolean getDoOutput() {
- return op.getDoOutput();
+ return false;
}
@Override
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java Tue Aug 19 23:49:39 2014
@@ -43,6 +43,12 @@ public class PutOpParam extends HttpOpPa
REMOVEACL(false, HttpURLConnection.HTTP_OK),
SETACL(false, HttpURLConnection.HTTP_OK),
+ SETXATTR(false, HttpURLConnection.HTTP_OK),
+ REMOVEXATTR(false, HttpURLConnection.HTTP_OK),
+
+ CREATESNAPSHOT(false, HttpURLConnection.HTTP_OK),
+ RENAMESNAPSHOT(false, HttpURLConnection.HTTP_OK),
+
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean doOutputAndRedirect;
Propchange: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1588992-1596568
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1582150-1619000
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt Tue Aug 19 23:49:39 2014
@@ -37,6 +37,10 @@ ELSE (${CMAKE_SYSTEM_NAME} MATCHES "Linu
ENDIF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
IF(FUSE_FOUND)
+ add_library(posix_util
+ ../util/posix_util.c
+ )
+
add_executable(fuse_dfs
fuse_dfs.c
fuse_options.c
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c Tue Aug 19 23:49:39 2014
@@ -16,111 +16,228 @@
* limitations under the License.
*/
-
#include <hdfs.h>
+#include <inttypes.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
#include <strings.h>
+#include "fuse_context_handle.h"
#include "fuse_dfs.h"
#include "fuse_trash.h"
-#include "fuse_context_handle.h"
-
-
-const char *const TrashPrefixDir = "/user/root/.Trash";
-const char *const TrashDir = "/user/root/.Trash/Current";
+#include "fuse_users.h"
#define TRASH_RENAME_TRIES 100
+#define ALREADY_IN_TRASH_ERR 9000
-//
-// NOTE: this function is a c implementation of org.apache.hadoop.fs.Trash.moveToTrash(Path path).
-//
-
-int move_to_trash(const char *item, hdfsFS userFS) {
-
- // retrieve dfs specific data
- dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
-
- // check params and the context var
- assert(item);
- assert(dfs);
- assert('/' == *item);
- assert(rindex(item,'/') >= 0);
-
-
- char fname[4096]; // or last element of the directory path
- char parent_dir[4096]; // the directory the fname resides in
-
- if (strlen(item) > sizeof(fname) - strlen(TrashDir)) {
- ERROR("Buffer too small to accomodate path of len %d", (int)strlen(item));
- return -EIO;
+/**
+ * Split a path into a parent directory and a base path component.
+ *
+ * @param abs_path The absolute path.
+ * @param pcomp (out param) Will be set to the last path component.
+ * Malloced.
+ * @param parent_dir (out param) Will be set to the parent directory.
+ * Malloced.
+ *
+ * @return 0 on success.
+ * On success, both *pcomp and *parent_dir will contain
+ * malloc'ed strings.
+ * EINVAL if the path wasn't absolute.
+ * EINVAL if there is no parent directory (i.e. abs_path=/)
+ * ENOMEM if we ran out of memory.
+ */
+static int get_parent_dir(const char *abs_path, char **pcomp,
+ char **parent_dir)
+{
+ int ret;
+ char *pdir = NULL, *pc = NULL, *last_slash;
+
+ pdir = strdup(abs_path);
+ if (!pdir) {
+ ret = ENOMEM;
+ goto done;
+ }
+ last_slash = rindex(pdir, '/');
+ if (!last_slash) {
+ ERROR("get_parent_dir(%s): expected absolute path.\n", abs_path);
+ ret = EINVAL;
+ goto done;
+ }
+ if (last_slash[1] == '\0') {
+ *last_slash = '\0';
+ last_slash = rindex(pdir, '/');
+ if (!last_slash) {
+ ERROR("get_parent_dir(%s): there is no parent dir.\n", abs_path);
+ ret = EINVAL;
+ goto done;
+ }
}
-
- // separate the file name and the parent directory of the item to be deleted
- {
- int length_of_parent_dir = rindex(item, '/') - item ;
- int length_of_fname = strlen(item) - length_of_parent_dir - 1; // the '/'
-
- // note - the below strncpys should be safe from overflow because of the check on item's string length above.
- strncpy(parent_dir, item, length_of_parent_dir);
- parent_dir[length_of_parent_dir ] = 0;
- strncpy(fname, item + length_of_parent_dir + 1, strlen(item));
- fname[length_of_fname + 1] = 0;
+ pc = strdup(last_slash + 1);
+ if (!pc) {
+ ret = ENOMEM;
+ goto done;
+ }
+ *last_slash = '\0';
+ ret = 0;
+done:
+ if (ret) {
+ free(pdir);
+ free(pc);
+ return ret;
}
+ *pcomp = pc;
+ *parent_dir = pdir;
+ return 0;
+}
- // create the target trash directory
- char trash_dir[4096];
- if (snprintf(trash_dir, sizeof(trash_dir), "%s%s", TrashDir, parent_dir)
- >= sizeof trash_dir) {
- ERROR("Move to trash error target not big enough for %s", item);
- return -EIO;
+/**
+ * Get the base path to the trash. This will depend on the user ID.
+ * For example, a user whose ID maps to 'foo' will get back the path
+ * "/user/foo/.Trash/Current".
+ *
+ * @param trash_base (out param) the base path to the trash.
+ * Malloced.
+ *
+ * @return 0 on success; error code otherwise.
+ */
+static int get_trash_base(char **trash_base)
+{
+ const char * const PREFIX = "/user/";
+ const char * const SUFFIX = "/.Trash/Current";
+ char *user_name = NULL, *base = NULL;
+ uid_t uid = fuse_get_context()->uid;
+ int ret;
+
+ user_name = getUsername(uid);
+ if (!user_name) {
+ ERROR("get_trash_base(): failed to get username for uid %"PRId64"\n",
+ (uint64_t)uid);
+ ret = EIO;
+ goto done;
+ }
+ if (asprintf(&base, "%s%s%s", PREFIX, user_name, SUFFIX) < 0) {
+ base = NULL;
+ ret = ENOMEM;
+ goto done;
+ }
+ ret = 0;
+done:
+ free(user_name);
+ if (ret) {
+ free(base);
+ return ret;
}
+ *trash_base = base;
+ return 0;
+}
+//
+// NOTE: this function is a c implementation of org.apache.hadoop.fs.Trash.moveToTrash(Path path).
+//
+int move_to_trash(const char *abs_path, hdfsFS userFS)
+{
+ int ret;
+ char *pcomp = NULL, *parent_dir = NULL, *trash_base = NULL;
+ char *target_dir = NULL, *target = NULL;
+
+ ret = get_parent_dir(abs_path, &pcomp, &parent_dir);
+ if (ret) {
+ goto done;
+ }
+ ret = get_trash_base(&trash_base);
+ if (ret) {
+ goto done;
+ }
+ if (!strncmp(trash_base, abs_path, strlen(trash_base))) {
+ INFO("move_to_trash(%s): file is already in the trash; deleting.",
+ abs_path);
+ ret = ALREADY_IN_TRASH_ERR;
+ goto done;
+ }
+ fprintf(stderr, "trash_base='%s'\n", trash_base);
+ if (asprintf(&target_dir, "%s%s", trash_base, parent_dir) < 0) {
+ ret = ENOMEM;
+ target_dir = NULL;
+ goto done;
+ }
+ if (asprintf(&target, "%s/%s", target_dir, pcomp) < 0) {
+ ret = ENOMEM;
+ target = NULL;
+ goto done;
+ }
// create the target trash directory in trash (if needed)
- if ( hdfsExists(userFS, trash_dir)) {
+ if (hdfsExists(userFS, target_dir) != 0) {
// make the directory to put it in in the Trash - NOTE
// hdfsCreateDirectory also creates parents, so Current will be created if it does not exist.
- if (hdfsCreateDirectory(userFS, trash_dir)) {
- return -EIO;
+ if (hdfsCreateDirectory(userFS, target_dir)) {
+ ret = errno;
+ ERROR("move_to_trash(%s) error: hdfsCreateDirectory(%s) failed with error %d",
+ abs_path, target_dir, ret);
+ goto done;
}
- }
-
- //
- // if the target path in Trash already exists, then append with
- // a number. Start from 1.
- //
- char target[4096];
- int j ;
- if ( snprintf(target, sizeof target,"%s/%s",trash_dir, fname) >= sizeof target) {
- ERROR("Move to trash error target not big enough for %s", item);
- return -EIO;
- }
-
- // NOTE: this loop differs from the java version by capping the #of tries
- for (j = 1; ! hdfsExists(userFS, target) && j < TRASH_RENAME_TRIES ; j++) {
- if (snprintf(target, sizeof target,"%s/%s.%d",trash_dir, fname, j) >= sizeof target) {
- ERROR("Move to trash error target not big enough for %s", item);
- return -EIO;
+ } else if (hdfsExists(userFS, target) == 0) {
+ // If there is already a file in the trash with this path, append a number.
+ int idx;
+ for (idx = 1; idx < TRASH_RENAME_TRIES; idx++) {
+ free(target);
+ if (asprintf(&target, "%s%s.%d", target_dir, pcomp, idx) < 0) {
+ target = NULL;
+ ret = ENOMEM;
+ goto done;
+ }
+ if (hdfsExists(userFS, target) != 0) {
+ break;
+ }
+ }
+ if (idx == TRASH_RENAME_TRIES) {
+ ERROR("move_to_trash(%s) error: there are already %d files in the trash "
+ "with this name.\n", abs_path, TRASH_RENAME_TRIES);
+ ret = EINVAL;
+ goto done;
}
}
- if (hdfsRename(userFS, item, target)) {
- ERROR("Trying to rename %s to %s", item, target);
- return -EIO;
- }
- return 0;
-}
-
-
-int hdfsDeleteWithTrash(hdfsFS userFS, const char *path, int useTrash) {
+ if (hdfsRename(userFS, abs_path, target)) {
+ ret = errno;
+ ERROR("move_to_trash(%s): failed to rename the file to %s: error %d",
+ abs_path, target, ret);
+ goto done;
+ }
+
+ ret = 0;
+done:
+ if ((ret != 0) && (ret != ALREADY_IN_TRASH_ERR)) {
+ ERROR("move_to_trash(%s) failed with error %d", abs_path, ret);
+ }
+ free(pcomp);
+ free(parent_dir);
+ free(trash_base);
+ free(target_dir);
+ free(target);
+ return ret;
+}
- // move the file to the trash if this is enabled and its not actually in the trash.
- if (useTrash && strncmp(path, TrashPrefixDir, strlen(TrashPrefixDir)) != 0) {
- int ret= move_to_trash(path, userFS);
- return ret;
+int hdfsDeleteWithTrash(hdfsFS userFS, const char *path, int useTrash)
+{
+ int tried_to_move_to_trash = 0;
+ if (useTrash) {
+ tried_to_move_to_trash = 1;
+ if (move_to_trash(path, userFS) == 0) {
+ return 0;
+ }
}
-
if (hdfsDelete(userFS, path, 1)) {
- ERROR("Trying to delete the file %s", path);
- return -EIO;
+ int err = errno;
+ if (err < 0) {
+ err = -err;
+ }
+ ERROR("hdfsDeleteWithTrash(%s): hdfsDelete failed: error %d.",
+ path, err);
+ return -err;
+ }
+ if (tried_to_move_to_trash) {
+ ERROR("hdfsDeleteWithTrash(%s): deleted the file instead.\n", path);
}
-
return 0;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c Tue Aug 19 23:49:39 2014
@@ -19,8 +19,8 @@
#include "exception.h"
#include "hdfs.h"
#include "jni_helper.h"
+#include "platform.h"
-#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -35,54 +35,54 @@ struct ExceptionInfo {
static const struct ExceptionInfo gExceptionInfo[] = {
{
- .name = "java.io.FileNotFoundException",
- .noPrintFlag = NOPRINT_EXC_FILE_NOT_FOUND,
- .excErrno = ENOENT,
+ "java.io.FileNotFoundException",
+ NOPRINT_EXC_FILE_NOT_FOUND,
+ ENOENT,
},
{
- .name = "org.apache.hadoop.security.AccessControlException",
- .noPrintFlag = NOPRINT_EXC_ACCESS_CONTROL,
- .excErrno = EACCES,
+ "org.apache.hadoop.security.AccessControlException",
+ NOPRINT_EXC_ACCESS_CONTROL,
+ EACCES,
},
{
- .name = "org.apache.hadoop.fs.UnresolvedLinkException",
- .noPrintFlag = NOPRINT_EXC_UNRESOLVED_LINK,
- .excErrno = ENOLINK,
+ "org.apache.hadoop.fs.UnresolvedLinkException",
+ NOPRINT_EXC_UNRESOLVED_LINK,
+ ENOLINK,
},
{
- .name = "org.apache.hadoop.fs.ParentNotDirectoryException",
- .noPrintFlag = NOPRINT_EXC_PARENT_NOT_DIRECTORY,
- .excErrno = ENOTDIR,
+ "org.apache.hadoop.fs.ParentNotDirectoryException",
+ NOPRINT_EXC_PARENT_NOT_DIRECTORY,
+ ENOTDIR,
},
{
- .name = "java.lang.IllegalArgumentException",
- .noPrintFlag = NOPRINT_EXC_ILLEGAL_ARGUMENT,
- .excErrno = EINVAL,
+ "java.lang.IllegalArgumentException",
+ NOPRINT_EXC_ILLEGAL_ARGUMENT,
+ EINVAL,
},
{
- .name = "java.lang.OutOfMemoryError",
- .noPrintFlag = 0,
- .excErrno = ENOMEM,
+ "java.lang.OutOfMemoryError",
+ 0,
+ ENOMEM,
},
{
- .name = "org.apache.hadoop.hdfs.server.namenode.SafeModeException",
- .noPrintFlag = 0,
- .excErrno = EROFS,
+ "org.apache.hadoop.hdfs.server.namenode.SafeModeException",
+ 0,
+ EROFS,
},
{
- .name = "org.apache.hadoop.fs.FileAlreadyExistsException",
- .noPrintFlag = 0,
- .excErrno = EEXIST,
+ "org.apache.hadoop.fs.FileAlreadyExistsException",
+ 0,
+ EEXIST,
},
{
- .name = "org.apache.hadoop.hdfs.protocol.QuotaExceededException",
- .noPrintFlag = 0,
- .excErrno = EDQUOT,
+ "org.apache.hadoop.hdfs.protocol.QuotaExceededException",
+ 0,
+ EDQUOT,
},
{
- .name = "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
- .noPrintFlag = 0,
- .excErrno = ESTALE,
+ "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
+ 0,
+ ESTALE,
},
};
@@ -113,6 +113,7 @@ int printExceptionAndFreeV(JNIEnv *env,
jstring jStr = NULL;
jvalue jVal;
jthrowable jthr;
+ const char *stackTrace;
jthr = classNameOfObject(exc, env, &className);
if (jthr) {
@@ -148,7 +149,7 @@ int printExceptionAndFreeV(JNIEnv *env,
destroyLocalReference(env, jthr);
} else {
jStr = jVal.l;
- const char *stackTrace = (*env)->GetStringUTFChars(env, jStr, NULL);
+ stackTrace = (*env)->GetStringUTFChars(env, jStr, NULL);
if (!stackTrace) {
fprintf(stderr, "(unable to get stack trace for %s exception: "
"GetStringUTFChars error.)\n", className);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h Tue Aug 19 23:49:39 2014
@@ -34,13 +34,14 @@
* usually not what you want.)
*/
+#include "platform.h"
+
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <search.h>
-#include <pthread.h>
#include <errno.h>
/**
@@ -109,7 +110,7 @@ int printExceptionAndFreeV(JNIEnv *env,
* object.
*/
int printExceptionAndFree(JNIEnv *env, jthrowable exc, int noPrintFlags,
- const char *fmt, ...) __attribute__((format(printf, 4, 5)));
+ const char *fmt, ...) TYPE_CHECKED_PRINTF_FORMAT(4, 5);
/**
* Print out information about the pending exception and free it.
@@ -124,7 +125,7 @@ int printExceptionAndFree(JNIEnv *env, j
* object.
*/
int printPendingExceptionAndFree(JNIEnv *env, int noPrintFlags,
- const char *fmt, ...) __attribute__((format(printf, 3, 4)));
+ const char *fmt, ...) TYPE_CHECKED_PRINTF_FORMAT(3, 4);
/**
* Get a local reference to the pending exception and clear it.
@@ -150,6 +151,7 @@ jthrowable getPendingExceptionAndClear(J
* @return A local reference to a RuntimeError
*/
jthrowable newRuntimeError(JNIEnv *env, const char *fmt, ...)
- __attribute__((format(printf, 2, 3)));
+ TYPE_CHECKED_PRINTF_FORMAT(2, 3);
+#undef TYPE_CHECKED_PRINTF_FORMAT
#endif
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c Tue Aug 19 23:49:39 2014
@@ -49,18 +49,18 @@ int expectFileStats(hdfsFile file,
stats->totalShortCircuitBytesRead,
stats->totalZeroCopyBytesRead);
if (expectedTotalBytesRead != UINT64_MAX) {
- EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
+ EXPECT_UINT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
}
if (expectedTotalLocalBytesRead != UINT64_MAX) {
- EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
+ EXPECT_UINT64_EQ(expectedTotalLocalBytesRead,
stats->totalLocalBytesRead);
}
if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
- EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
+ EXPECT_UINT64_EQ(expectedTotalShortCircuitBytesRead,
stats->totalShortCircuitBytesRead);
}
if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
- EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
+ EXPECT_UINT64_EQ(expectedTotalZeroCopyBytesRead,
stats->totalZeroCopyBytesRead);
}
hdfsFileFreeReadStatistics(stats);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h Tue Aug 19 23:49:39 2014
@@ -126,6 +126,18 @@ struct hdfsFile_internal;
} \
} while (0);
+#define EXPECT_UINT64_EQ(x, y) \
+ do { \
+ uint64_t __my_ret__ = y; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ != (x)) { \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+ "value %"PRIu64" (errno: %d): expected %"PRIu64"\n", \
+ __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
+ return -1; \
+ } \
+ } while (0);
+
#define RETRY_ON_EINTR_GET_ERRNO(ret, expr) do { \
ret = expr; \
if (!ret) \