You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/06/02 14:59:28 UTC
[18/50] [abbrv] incubator-ignite git commit: [IGNITE-958]: IGNITE-218
(Wrong staging permissions while running MR job under hadoop accelerator):
IGFS part.
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35388195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35388195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35388195
Branch: refs/heads/ignite-745
Commit: 353881951fcdcc16c3dc31d808d3af6c263f74ce
Parents: 7ec4c82
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:31:35 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:31:35 2015 +0300
----------------------------------------------------------------------
.../igfs/secondary/IgfsSecondaryFileSystem.java | 7 +
.../internal/igfs/common/IgfsMarshaller.java | 35 +---
.../igfs/common/IgfsPathControlRequest.java | 22 +++
.../internal/processors/hadoop/HadoopJob.java | 2 +-
.../ignite/internal/processors/igfs/IgfsEx.java | 8 +-
.../internal/processors/igfs/IgfsImpl.java | 8 +-
.../processors/igfs/IgfsIpcHandler.java | 184 ++++++++++---------
.../igfs/IgfsSecondaryFileSystemImpl.java | 9 +-
.../internal/processors/igfs/IgfsServer.java | 4 +-
.../internal/processors/igfs/IgfsUtils.java | 16 ++
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 ++++++++++++-----
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 107 +++++++----
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 32 +++-
.../internal/processors/hadoop/HadoopUtils.java | 10 +-
.../hadoop/SecondaryFileSystemProvider.java | 53 +++---
.../hadoop/fs/HadoopDistributedFileSystem.java | 91 ---------
.../hadoop/fs/HadoopFileSystemsUtils.java | 17 --
.../processors/hadoop/igfs/HadoopIgfsEx.java | 6 +
.../hadoop/igfs/HadoopIgfsInProc.java | 170 ++++++++++++-----
.../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 2 +-
.../hadoop/igfs/HadoopIgfsOutProc.java | 33 +++-
.../hadoop/igfs/HadoopIgfsWrapper.java | 19 +-
.../hadoop/v2/HadoopV2TaskContext.java | 4 +-
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 56 ++++--
...oopSecondaryFileSystemConfigurationTest.java | 4 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 63 +++++--
.../IgniteHadoopFileSystemClientSelfTest.java | 2 +-
.../IgniteHadoopFileSystemIpcCacheSelfTest.java | 2 +
.../hadoop/HadoopFileSystemsTest.java | 23 +--
.../collections/HadoopSkipListSelfTest.java | 4 +-
30 files changed, 684 insertions(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
* @return Map of properties.
*/
public Map<String,String> properties();
+
+
+ /**
+ * Closes the secondary file system.
+ * @throws IgniteException in case of an error.
+ */
+ public void close() throws IgniteException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
}
/**
+ * Serializes the message and sends it into the given output stream.
* @param msg Message.
* @param hdr Message header.
* @param out Output.
@@ -119,6 +120,7 @@ public class IgfsMarshaller {
IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+ U.writeString(out, req.userName());
writePath(out, req.path());
writePath(out, req.destinationPath());
out.writeBoolean(req.flag());
@@ -236,6 +238,7 @@ public class IgfsMarshaller {
case OPEN_CREATE: {
IgfsPathControlRequest req = new IgfsPathControlRequest();
+ req.userName(U.readString(in));
req.path(readPath(in));
req.destinationPath(readPath(in));
req.flag(in.readBoolean());
@@ -298,8 +301,6 @@ public class IgfsMarshaller {
}
}
- assert msg != null;
-
msg.command(cmd);
return msg;
@@ -341,34 +342,4 @@ public class IgfsMarshaller {
return null;
}
-
- /**
- * Writes string to output.
- *
- * @param out Data output.
- * @param str String.
- * @throws IOException If write failed.
- */
- private void writeString(DataOutput out, @Nullable String str) throws IOException {
- out.writeBoolean(str != null);
-
- if (str != null)
- out.writeUTF(str);
- }
-
- /**
- * Reads string from input.
- *
- * @param in Data input.
- * @return Read string.
- * @throws IOException If read failed.
- */
- @Nullable private String readString(DataInput in) throws IOException {
- boolean hasStr = in.readBoolean();
-
- if (hasStr)
- return in.readUTF();
-
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.igfs.common;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage {
/** Last modification time. */
private long modificationTime;
+ /** The user name this control request is made on behalf of. */
+ private String userName;
+
/**
* @param path Path.
*/
@@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage {
@Override public String toString() {
return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
}
+
+ /**
+ * Getter for the user name.
+ * @return user name.
+ */
+ public final String userName() {
+ assert userName != null;
+
+ return userName;
+ }
+
+ /**
+ * Setter for the user name.
+ * @param userName the user name.
+ */
+ public final void userName(String userName) {
+ this.userName = IgfsUtils.fixUserName(userName);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
/**
* Cleans up the job staging directory.
*/
- void cleanupStagingDirectory();
+ public void cleanupStagingDirectory();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 7c1a837..361f75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
/** Property name for URI of file system. */
public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
- /** Property name for user name of file system. */
- public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
+ /** Property name for default user name of file system.
+ * NOTE: for secondary file system this is just a default user name, which is used
+ * when the 2ndary filesystem is used outside of any user context.
+ * If another user name is set in the context, 2ndary file system will work on behalf
+ * of that user, which is different from the default. */
+ public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
/**
* Stops IGFS cleaning all used resources.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 34636d2..c3495e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx {
for (IgfsFileWorkerBatch batch : workerMap.values())
batch.cancel();
- if (secondaryFs instanceof AutoCloseable)
- U.closeQuiet((AutoCloseable)secondaryFs);
+ try {
+ secondaryFs.close();
+ }
+ catch (Exception e) {
+ log.error("Failed to close secondary file system.", e);
+ }
}
igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..cfe6ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
private final int bufSize; // Buffer size. Must not be less then file block size.
/** IGFS instance for this handler. */
- private IgfsEx igfs;
+ private final IgfsEx igfs;
/** Resource ID generator. */
- private AtomicLong rsrcIdGen = new AtomicLong();
+ private final AtomicLong rsrcIdGen = new AtomicLong();
/** Stopping flag. */
private volatile boolean stopping;
@@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler {
* @return Response message.
* @throws IgniteCheckedException If failed.
*/
- private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+ private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd,
IgfsMessage msg) throws IgniteCheckedException {
- IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+ final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
if (log.isDebugEnabled())
log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']');
- IgfsControlResponse res = new IgfsControlResponse();
+ final IgfsControlResponse res = new IgfsControlResponse();
+
+ final String userName = req.userName();
+
+ assert userName != null;
try {
- switch (cmd) {
- case EXISTS:
- res.response(igfs.exists(req.path()));
+ IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+ @Override public Void apply() {
+ switch (cmd) {
+ case EXISTS:
+ res.response(igfs.exists(req.path()));
- break;
+ break;
- case INFO:
- res.response(igfs.info(req.path()));
+ case INFO:
+ res.response(igfs.info(req.path()));
- break;
+ break;
- case PATH_SUMMARY:
- res.response(igfs.summary(req.path()));
+ case PATH_SUMMARY:
+ res.response(igfs.summary(req.path()));
- break;
+ break;
- case UPDATE:
- res.response(igfs.update(req.path(), req.properties()));
+ case UPDATE:
+ res.response(igfs.update(req.path(), req.properties()));
- break;
+ break;
- case RENAME:
- igfs.rename(req.path(), req.destinationPath());
+ case RENAME:
+ igfs.rename(req.path(), req.destinationPath());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case DELETE:
- res.response(igfs.delete(req.path(), req.flag()));
+ case DELETE:
+ res.response(igfs.delete(req.path(), req.flag()));
- break;
+ break;
- case MAKE_DIRECTORIES:
- igfs.mkdirs(req.path(), req.properties());
+ case MAKE_DIRECTORIES:
+ igfs.mkdirs(req.path(), req.properties());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case LIST_PATHS:
- res.paths(igfs.listPaths(req.path()));
+ case LIST_PATHS:
+ res.paths(igfs.listPaths(req.path()));
- break;
+ break;
- case LIST_FILES:
- res.files(igfs.listFiles(req.path()));
+ case LIST_FILES:
+ res.files(igfs.listFiles(req.path()));
- break;
+ break;
- case SET_TIMES:
- igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+ case SET_TIMES:
+ igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case AFFINITY:
- res.locations(igfs.affinity(req.path(), req.start(), req.length()));
+ case AFFINITY:
+ res.locations(igfs.affinity(req.path(), req.start(), req.length()));
- break;
+ break;
- case OPEN_READ: {
- IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
- igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+ case OPEN_READ: {
+ IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+ igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
- long streamId = registerResource(ses, igfsIn);
+ long streamId = registerResource(ses, igfsIn);
- if (log.isDebugEnabled())
- log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
- igfsIn.fileInfo().modificationTime());
+ IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
+ igfsIn.fileInfo().modificationTime());
- res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+ res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
- break;
- }
+ break;
+ }
- case OPEN_CREATE: {
- long streamId = registerResource(ses, igfs.create(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Overwrite if exists.
- affinityKey(req), // Affinity key based on replication factor.
- req.replication(),// Replication factor.
- req.blockSize(), // Block size.
- req.properties() // File properties.
- ));
+ case OPEN_CREATE: {
+ long streamId = registerResource(ses, igfs.create(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Overwrite if exists.
+ affinityKey(req), // Affinity key based on replication factor.
+ req.replication(),// Replication factor.
+ req.blockSize(), // Block size.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- case OPEN_APPEND: {
- long streamId = registerResource(ses, igfs.append(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Create if absent.
- req.properties() // File properties.
- ));
+ case OPEN_APPEND: {
+ long streamId = registerResource(ses, igfs.append(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Create if absent.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- default:
- assert false : "Unhandled path control request command: " + cmd;
+ default:
+ assert false : "Unhandled path control request command: " + cmd;
- break;
- }
+ break;
+ }
+
+ return null;
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..b8095b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
*/
class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
/** Delegate. */
- private final IgfsImpl igfs;
+ private final IgfsEx igfs;
/**
* Constructor.
*
* @param igfs Delegate.
*/
- IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+ IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
this.igfs = igfs;
}
@@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public Map<String, String> properties() {
return Collections.emptyMap();
}
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ igfs.stop(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
*/
private class ClientWorker extends GridWorker {
/** Connected client endpoint. */
- private IpcEndpoint endpoint;
+ private final IpcEndpoint endpoint;
/** Data output stream. */
private final IgfsDataOutputStream out;
/** Client session object. */
- private IgfsClientSession ses;
+ private final IgfsClientSession ses;
/** Queue node for fast unlink. */
private ConcurrentLinkedDeque8.Node<ClientWorker> node;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 4b0234f..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
import java.lang.reflect.*;
@@ -88,4 +90,18 @@ public class IgfsUtils {
private IgfsUtils() {
// No-op.
}
+
+ /**
+ * Provides non-null user name.
+ * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+ * which is the current process owner user.
+ * @param user a user name to be fixed.
+ * @return non-null interned user name.
+ */
+ public static String fixUserName(@Nullable String user) {
+ if (F.isEmpty(user))
+ user = FileSystemConfiguration.DFLT_USER_NAME;
+
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index ba891f8..6a630fb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.igfs.secondary.*;
import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.processors.hadoop.igfs.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.jetbrains.annotations.*;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*;
import java.io.*;
import java.net.*;
@@ -37,15 +38,45 @@ import java.util.*;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
/**
- * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * In fact, this class deals with different FileSystems depending on the user context,
+ * see {@link IgfsUserContext#currentUser()}.
*/
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable {
- /** Hadoop file system. */
- private final FileSystem fileSys;
-
- /** Properties of file system */
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem {
+ /** Properties of file system, see {@link #properties()}
+ *
+ * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
+ * See {@link IgfsEx#SECONDARY_FS_URI}
+ * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+ * */
private final Map<String, String> props = new HashMap<>();
+ /** Secondary file system provider. */
+ private final SecondaryFileSystemProvider secProvider;
+
+ /** The default user name. It is used if no user context is set. */
+ private final String dfltUserName;
+
+ /** FileSystem instance created for the default user.
+ * Stored outside the fileSysLazyMap due to performance reasons. */
+ private final FileSystem dfltFs;
+
+ /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+ private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+ new ValueFactory<String, FileSystem>() {
+ @Override public FileSystem createValue(String key) {
+ try {
+ assert !F.isEmpty(key);
+
+ return secProvider.createFileSystem(key);
+ }
+ catch (IOException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+
/**
* Simple constructor that is to be used by default.
*
@@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @throws IgniteCheckedException In case of error.
*/
public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
- @Nullable String userName)
- throws IgniteCheckedException {
+ @Nullable String userName) throws IgniteCheckedException {
// Treat empty uri and userName arguments as nulls to improve configuration usability:
if (F.isEmpty(uri))
uri = null;
@@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
if (F.isEmpty(userName))
userName = null;
+ this.dfltUserName = IgfsUtils.fixUserName(userName);
+
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName);
+ this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
- fileSys = secProvider.createFileSystem();
+ // File system creation for the default user name.
+ // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
+ this.dfltFs = secProvider.createFileSystem(dfltUserName);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
- uri = secProvider.uri().toString();
+ assert dfltFs != null;
- if (!uri.endsWith("/"))
- uri += "/";
+ uri = secProvider.uri().toString();
- if (cfgPath != null)
- props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+ if (!uri.endsWith("/"))
+ uri += "/";
- if (userName != null)
- props.put(SECONDARY_FS_USER_NAME, userName);
+ if (cfgPath != null)
+ props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
- props.put(SECONDARY_FS_URI, uri);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
+ props.put(SECONDARY_FS_URI, uri);
+ props.put(SECONDARY_FS_USER_NAME, dfltUserName);
}
/**
@@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @return Hadoop path.
*/
private Path convert(IgfsPath path) {
- URI uri = fileSys.getUri();
+ URI uri = fileSysForUser().getUri();
return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
}
@@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @param detailMsg Detailed error message.
* @return Appropriate exception.
*/
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
- boolean wrongVer = X.hasCause(e, RemoteException.class) ||
- (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
- return !wrongVer ? cast(detailMsg, e) :
- new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
- "version.", e); }
+ return cast(detailMsg, e);
+ }
/**
* Cast IO exception to IGFS exception.
@@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
try {
- return fileSys.exists(convert(path));
+ return fileSysForUser().exists(convert(path));
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
+ final FileSystem fileSys = fileSysForUser();
+
try {
if (props0.userName() != null || props0.groupName() != null)
fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
@@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public void rename(IgfsPath src, IgfsPath dest) {
// Delegate to the secondary file system.
try {
- if (!fileSys.rename(convert(src), convert(dest)))
+ if (!fileSysForUser().rename(convert(src), convert(dest)))
throw new IgfsException("Failed to rename (secondary file system returned false) " +
"[src=" + src + ", dest=" + dest + ']');
}
@@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
try {
- return fileSys.delete(convert(path), recursive);
+ return fileSysForUser().delete(convert(path), recursive);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
try {
- if (!fileSys.mkdirs(convert(path)))
+ if (!fileSysForUser().mkdirs(convert(path)))
throw new IgniteException("Failed to make directories [path=" + path + "]");
}
catch (IOException e) {
@@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
try {
- if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+ if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
}
catch (IOException e) {
@@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
try {
- FileStatus[] statuses = fileSys.listStatus(convert(path));
+ FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
try {
- FileStatus[] statuses = fileSys.listStatus(convert(path));
+ FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
- return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize);
+ return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
try {
- return fileSys.create(convert(path), overwrite);
+ return fileSysForUser().create(convert(path), overwrite);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
try {
- return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
- null);
+ return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+ (short)replication, blockSize, null);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
- return fileSys.append(convert(path), bufSize);
+ return fileSysForUser().append(convert(path), bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
try {
- final FileStatus status = fileSys.getFileStatus(convert(path));
+ final FileStatus status = fileSysForUser().getFileStatus(convert(path));
if (status == null)
return null;
@@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
- return fileSys.getContentSummary(new Path("/")).getSpaceConsumed();
+ return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
@@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
}
/** {@inheritDoc} */
- @Nullable @Override public Map<String, String> properties() {
+ @Override public Map<String, String> properties() {
return props;
}
/** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
+ @Override public void close() throws IgniteException {
+ Exception e = null;
+
try {
- fileSys.close();
+ dfltFs.close();
}
- catch (IOException e) {
- throw new IgniteCheckedException(e);
+ catch (Exception e0) {
+ e = e0;
+ }
+
+ try {
+ fileSysLazyMap.close();
+ }
+ catch (IgniteCheckedException ice) {
+ if (e == null)
+ e = ice;
}
+
+ if (e != null)
+ throw new IgniteException(e);
}
/**
* Gets the underlying {@link FileSystem}.
+ * This method is used solely for testing.
* @return the underlying Hadoop {@link FileSystem}.
*/
public FileSystem fileSystem() {
- return fileSys;
+ return fileSysForUser();
+ }
+
+ /**
+ * Gets the FileSystem for the current context user.
+ * @return the FileSystem instance, never null.
+ */
+ private FileSystem fileSysForUser() {
+ String user = IgfsUserContext.currentUser();
+
+ if (F.isEmpty(user))
+ user = dfltUserName; // default is never empty.
+
+ assert !F.isEmpty(user);
+
+ if (F.eq(user, dfltUserName))
+ return dfltFs; // optimization
+
+ return fileSysLazyMap.getOrCreate(user);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1f53a06..c0a9ade 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
@@ -97,21 +98,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Grid remote client. */
private HadoopIgfsWrapper rmtClient;
- /** User name for each thread. */
- private final ThreadLocal<String> userName = new ThreadLocal<String>(){
- /** {@inheritDoc} */
- @Override protected String initialValue() {
- return DFLT_USER_NAME;
- }
- };
-
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
- /** {@inheritDoc} */
- @Override protected Path initialValue() {
- return getHomeDirectory();
- }
- };
+ /** working directory. */
+ private Path workingDir;
/** Default replication factor. */
private short dfltReplication;
@@ -129,6 +117,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Secondary URI string. */
private URI secondaryUri;
+ /** The user name this file system was created on behalf of. */
+ private String user;
+
/** IGFS mode resolver. */
private IgfsModeResolver modeRslvr;
@@ -182,6 +173,36 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
/**
+ * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+ * @return the user name, never null.
+ */
+ public static String getFsHadoopUser(Configuration cfg) throws IOException {
+ String user = null;
+
+ // -------------------------------------------
+ // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
+ // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
+ // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
+ // ugi.doAs() closure.
+ if (cfg != null)
+ user = cfg.get(MRJobConfig.USER_NAME);
+ // -------------------------------------------
+
+ if (user == null) {
+ UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+ if (currUgi != null)
+ user = currUgi.getShortUserName();
+ }
+
+ user = IgfsUtils.fixUserName(user);
+
+ assert user != null;
+
+ return user;
+ }
+
+ /**
* Public setter that can be used by direct users of FS or Visor.
*
* @param colocateFileWrites Whether all ongoing file writes should be colocated.
@@ -221,7 +242,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
uriAuthority = uri.getAuthority();
- setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+ user = getFsHadoopUser(cfg);
// Override sequential reads before prefetch if needed.
seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -244,7 +265,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
- rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+ rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
// Handshake.
IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -289,13 +310,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
String secUri = props.get(SECONDARY_FS_URI);
String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
- String secUserName = props.get(SECONDARY_FS_USER_NAME);
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
- secUserName);
+ SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+ secondaryFs = secProvider.createFileSystem(user);
- secondaryFs = secProvider.createFileSystem();
secondaryUri = secProvider.uri();
}
catch (IOException e) {
@@ -306,6 +326,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
"will have no effect): " + e.getMessage());
}
}
+
+ // set working directory to the home directory of the current Fs user:
+ setWorkingDirectory(null);
}
finally {
leaveBusy();
@@ -849,22 +872,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** {@inheritDoc} */
@Override public Path getHomeDirectory() {
- Path path = new Path("/user/" + userName.get());
+ Path path = new Path("/user/" + user);
return path.makeQualified(getUri(), null);
}
- /**
- * Set user name and default working directory for current thread.
- *
- * @param userName User name.
- */
- public void setUser(String userName) {
- this.userName.set(userName);
-
- setWorkingDirectory(null);
- }
-
/** {@inheritDoc} */
@Override public void setWorkingDirectory(Path newPath) {
if (newPath == null) {
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (secondaryFs != null)
secondaryFs.setWorkingDirectory(toSecondary(homeDir));
- workingDir.set(homeDir);
+ workingDir = homeDir;
}
else {
Path fixedNewPath = fixRelativePart(newPath);
@@ -886,13 +898,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (secondaryFs != null)
secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
- workingDir.set(fixedNewPath);
+ workingDir = fixedNewPath;
}
}
/** {@inheritDoc} */
@Override public Path getWorkingDirectory() {
- return workingDir.get();
+ return workingDir;
}
/** {@inheritDoc} */
@@ -1153,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
return null;
return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
- new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+ new IgfsPath(convert(workingDir), path.toUri().getPath());
}
/**
@@ -1191,9 +1203,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
*/
@SuppressWarnings("deprecation")
private FileStatus convert(IgfsFile file) {
- return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(),
- file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file),
- file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"),
+ return new FileStatus(
+ file.length(),
+ file.isDirectory(),
+ getDefaultReplication(),
+ file.groupBlockSize(),
+ file.modificationTime(),
+ file.accessTime(),
+ permission(file),
+ file.property(PROP_USER_NAME, user),
+ file.property(PROP_GROUP_NAME, "users"),
convert(file.path())) {
@Override public String toString() {
return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
@@ -1247,4 +1266,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override public String toString() {
return S.toString(IgniteHadoopFileSystem.class, this);
}
+
+ /**
+ * Returns the user name this File System is created on behalf of.
+ * @return the user name
+ */
+ public String user() {
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 9cfb79b..f3fbe9c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
@@ -40,6 +39,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
import static org.apache.ignite.igfs.IgfsMode.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
@@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** Grid remote client. */
private HadoopIgfsWrapper rmtClient;
+ /** The name of the user this File System created on behalf of. */
+ private final String user;
+
/** Working directory. */
private IgfsPath workingDir;
/** URI. */
- private URI uri;
+ private final URI uri;
/** Authority. */
private String uriAuthority;
@@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
uri = name;
+ user = getFsHadoopUser(cfg);
+
try {
initialize(name, cfg);
}
@@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
throw e;
}
- workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+ workingDir = new IgfsPath("/user/" + user);
}
/** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
- rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+ rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
// Handshake.
IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
String secUri = props.get(SECONDARY_FS_URI);
String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
- String secUserName = props.get(SECONDARY_FS_USER_NAME);
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
- secUserName);
+ SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+ secondaryFs = secProvider.createAbstractFileSystem(user);
- secondaryFs = secProvider.createAbstractFileSystem();
secondaryUri = secProvider.uri();
}
catch (IOException e) {
@@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
file.modificationTime(),
file.accessTime(),
permission(file),
- file.property(PROP_USER_NAME, DFLT_USER_NAME),
+ file.property(PROP_USER_NAME, user),
file.property(PROP_GROUP_NAME, "users"),
convert(file.path())) {
@Override public String toString() {
@@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
@Override public String toString() {
return S.toString(IgniteHadoopFileSystem.class, this);
}
-}
+
+ /**
+ * Returns the user name this File System is created on behalf of.
+ * @return the user name
+ */
+ public String user() {
+ return user;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..d493bd4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -126,11 +126,15 @@ public class HadoopUtils {
break;
case PHASE_REDUCE:
- assert status.totalReducerCnt() > 0;
-
+ // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
+ // See https://issues.apache.org/jira/browse/IGNITE-764
setupProgress = 1;
mapProgress = 1;
- reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+
+ if (status.totalReducerCnt() > 0)
+ reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+ else
+ reduceProgress = 1f;
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..b1a057c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.io.*;
import java.net.*;
+import java.security.*;
/**
* Encapsulates logic of secondary filesystem creation.
@@ -36,9 +39,6 @@ public class SecondaryFileSystemProvider {
/** The secondary filesystem URI, never null. */
private final URI uri;
- /** Optional user name to log into secondary filesystem with. */
- private @Nullable final String userName;
-
/**
* Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
* specified either explicitly or in the configuration provided.
@@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider {
* property in the provided configuration.
* @param secConfPath the secondary Fs path (file path on the local file system, optional).
* See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
- * @param userName User name.
* @throws IOException
*/
public SecondaryFileSystemProvider(final @Nullable String secUri,
- final @Nullable String secConfPath, @Nullable String userName) throws IOException {
- this.userName = userName;
-
+ final @Nullable String secConfPath) throws IOException {
if (secConfPath != null) {
URL url = U.resolveIgniteUrl(secConfPath);
@@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider {
* @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs.
* @throws IOException
*/
- public FileSystem createFileSystem() throws IOException {
+ public FileSystem createFileSystem(String userName) throws IOException {
+ userName = IgfsUtils.fixUserName(userName);
+
final FileSystem fileSys;
- if (userName == null)
- fileSys = FileSystem.get(uri, cfg);
- else {
- try {
- fileSys = FileSystem.get(uri, cfg, userName);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ try {
+ fileSys = FileSystem.get(uri, cfg, userName);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
+ throw new IOException("Failed to create file system due to interrupt.", e);
}
return fileSys;
@@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider {
/**
* @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
- * @throws IOException
+ * @throws IOException in case of error.
*/
- public AbstractFileSystem createAbstractFileSystem() throws IOException {
- return AbstractFileSystem.get(uri, cfg);
+ public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
+ userName = IgfsUtils.fixUserName(userName);
+
+ String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
+ @Override public AbstractFileSystem run() throws IOException {
+ return AbstractFileSystem.get(uri, cfg);
+ }
+ });
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", ie);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
deleted file mode 100644
index 509f443..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
- /** User name for each thread. */
- private final ThreadLocal<String> userName = new ThreadLocal<String>() {
- /** {@inheritDoc} */
- @Override protected String initialValue() {
- return DFLT_USER_NAME;
- }
- };
-
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
- /** {@inheritDoc} */
- @Override protected Path initialValue() {
- return getHomeDirectory();
- }
- };
-
- /** {@inheritDoc} */
- @Override public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
-
- setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
- }
-
- /**
- * Set user name and default working directory for current thread.
- *
- * @param userName User name.
- */
- public void setUser(String userName) {
- this.userName.set(userName);
-
- setWorkingDirectory(getHomeDirectory());
- }
-
- /** {@inheritDoc} */
- @Override public Path getHomeDirectory() {
- Path path = new Path("/user/" + userName.get());
-
- return path.makeQualified(getUri(), null);
- }
-
- /** {@inheritDoc} */
- @Override public void setWorkingDirectory(Path dir) {
- Path fixedDir = fixRelativePart(dir);
-
- String res = fixedDir.toUri().getPath();
-
- if (!DFSUtil.isValidName(res))
- throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
- workingDir.set(fixedDir);
- }
-
- /** {@inheritDoc} */
- @Override public Path getWorkingDirectory() {
- return workingDir.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..d90bc28 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
/**
* Utilities for configuring file systems to support the separate working directory per each thread.
@@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils {
public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
/**
- * Set user name and default working directory for current thread if it's supported by file system.
- *
- * @param fs File system.
- * @param userName User name.
- */
- public static void setUser(FileSystem fs, String userName) {
- if (fs instanceof IgniteHadoopFileSystem)
- ((IgniteHadoopFileSystem)fs).setUser(userName);
- else if (fs instanceof HadoopDistributedFileSystem)
- ((HadoopDistributedFileSystem)fs).setUser(userName);
- }
-
- /**
* Setup wrappers of filesystems to support the separate working directory.
*
* @param cfg Config for setup.
@@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils {
cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
HadoopLocalFileSystemV2.class.getName());
-
- cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
* @throws IOException If failed.
*/
public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+ /**
+ * The user this Igfs instance works on behalf of.
+ * @return the user name.
+ */
+ public String user();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..47ba0e8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
/** Logger. */
private final Log log;
+ /** The user this Igfs works on behalf of. */
+ private final String user;
+
/**
* Constructor.
*
* @param igfs Target IGFS.
* @param log Log.
*/
- public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
+ this.user = IgfsUtils.fixUserName(userName);
+
this.igfs = igfs;
+
this.log = log;
bufSize = igfs.configuration().getBlockSize() * 2;
}
/** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(String logDir) {
- igfs.clientLogDirectory(logDir);
+ @Override public IgfsHandshakeResponse handshake(final String logDir) {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
+ @Override public IgfsHandshakeResponse apply() {
+ igfs.clientLogDirectory(logDir);
- return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
- igfs.globalSampling());
+ return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
+ igfs.globalSampling());
+ }
+ });
}
/** {@inheritDoc} */
@@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+ @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.info(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+ @Override public IgfsFile apply() {
+ return igfs.info(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+ @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
try {
- return igfs.update(path, props);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+ @Override public IgfsFile apply() {
+ return igfs.update(path, props);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+ @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
try {
- igfs.setTimes(path, accessTime, modificationTime);
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.setTimes(path, accessTime, modificationTime);
+
+ return null;
+ }
+ });
return true;
}
@@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+ @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
try {
- igfs.rename(src, dest);
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.rename(src, dest);
+
+ return null;
+ }
+ });
return true;
}
@@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+ @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
try {
- return igfs.delete(path, recursive);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+ @Override public Boolean apply() {
+ return igfs.delete(path, recursive);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
/** {@inheritDoc} */
@Override public IgfsStatus fsStatus() throws IgniteCheckedException {
try {
- return igfs.globalSpace();
+ return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+ @Override public IgfsStatus call() throws IgniteCheckedException {
+ return igfs.globalSpace();
+ }
+ });
}
catch (IllegalStateException e) {
throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
"stopping.");
}
+ catch (IgniteCheckedException | RuntimeException | Error e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new AssertionError("Must never go there.");
+ }
}
/** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+ @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.listPaths(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
+ @Override public Collection<IgfsPath> apply() {
+ return igfs.listPaths(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+ @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.listFiles(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
+ @Override public Collection<IgfsFile> apply() {
+ return igfs.listFiles(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+ @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
try {
- igfs.mkdirs(path, props);
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.mkdirs(path, props);
+
+ return null;
+ }
+ });
return true;
}
@@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+ @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.summary(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
+ @Override public IgfsPathSummary apply() {
+ return igfs.summary(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+ @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
throws IgniteCheckedException {
try {
- return igfs.affinity(path, start, len);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+ @Override public Collection<IgfsBlockLocation> apply() {
+ return igfs.affinity(path, start, len);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
try {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
- return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
throws IgniteCheckedException {
try {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
- return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
- int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+ @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
+ final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
try {
- IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
- colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+ colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
- return new HadoopIgfsStreamDelegate(this, stream);
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
- @Nullable Map<String, String> props) throws IgniteCheckedException {
+ @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+ final @Nullable Map<String, String> props) throws IgniteCheckedException {
try {
- IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
- return new HadoopIgfsStreamDelegate(this, stream);
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
if (lsnr0 != null && log.isDebugEnabled())
log.debug("Removed stream event listener [delegate=" + delegate + ']');
}
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 0264e7b..3561e95 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
public class HadoopIgfsIpcIo implements HadoopIgfsIo {
/** Logger. */
- private Log log;
+ private final Log log;
/** Request futures map. */
private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =