You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/01 16:18:13 UTC
[27/50] [abbrv] incubator-ignite git commit: [IGNITE-958]: IGNITE-218
(Wrong staging permissions while running MR job under hadoop accelerator):
IGFS part.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 7dca049..f23c62c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
/** IGFS name. */
private final String igfs;
+ /** The user this out proc is performing on behalf of. */
+ private final String userName;
+
/** Client log. */
private final Log log;
@@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
* @param log Client logger.
* @throws IOException If failed.
*/
- public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException {
- this(host, port, grid, igfs, false, log);
+ public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
+ this(host, port, grid, igfs, false, log, user);
}
/**
@@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
* @param log Client logger.
* @throws IOException If failed.
*/
- public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException {
- this(null, port, grid, igfs, true, log);
+ public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
+ this(null, port, grid, igfs, true, log, user);
}
/**
@@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
* @param log Client logger.
* @throws IOException If failed.
*/
- private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log)
+ private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
throws IOException {
assert host != null && !shmem || host == null && shmem :
"Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
@@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
this.grid = grid;
this.igfs = igfs;
this.log = log;
+ this.userName = IgfsUtils.fixUserName(user);
io = HadoopIgfsIpcIo.get(log, endpoint);
@@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(INFO);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(FILE_RES).get();
}
@@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(UPDATE);
msg.path(path);
msg.properties(props);
+ msg.userName(userName);
return io.send(msg).chain(FILE_RES).get();
}
@@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.accessTime(accessTime);
msg.modificationTime(modificationTime);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(RENAME);
msg.path(src);
msg.destinationPath(dest);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(DELETE);
msg.path(path);
msg.flag(recursive);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.start(start);
msg.length(len);
+ msg.userName(userName);
return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
}
@@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(PATH_SUMMARY);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(SUMMARY_RES).get();
}
@@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(MAKE_DIRECTORIES);
msg.path(path);
msg.properties(props);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(LIST_FILES);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(FILE_COL_RES).get();
}
@@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(LIST_PATHS);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(PATH_COL_RES).get();
}
@@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(OPEN_READ);
msg.path(path);
msg.flag(false);
+ msg.userName(userName);
IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
@@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.flag(true);
msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+ msg.userName(userName);
IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
@@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.properties(props);
msg.replication(replication);
msg.blockSize(blockSize);
+ msg.userName(userName);
Long streamId = io.send(msg).chain(LONG_RES).get();
@@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.flag(create);
msg.properties(props);
+ msg.userName(userName);
Long streamId = io.send(msg).chain(LONG_RES).get();
@@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
}
};
}
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return userName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1dada21..7d0db49 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
/** Logger. */
private final Log log;
+ /** The user name this wrapper works on behalf of. */
+ private final String userName;
+
/**
* Constructor.
*
@@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
* @param conf Configuration.
* @param log Current logger.
*/
- public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
+ public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
+ throws IOException {
try {
this.authority = authority;
this.endpoint = new HadoopIgfsEndpoint(authority);
this.logDir = logDir;
this.conf = conf;
this.log = log;
+ this.userName = user;
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to parse endpoint: " + authority, e);
@@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
HadoopIgfsEx hadoop = null;
try {
- hadoop = new HadoopIgfsInProc(igfs, log);
+ hadoop = new HadoopIgfsInProc(igfs, log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
catch (IOException | IgniteCheckedException e) {
if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
+ if (hadoop != null)
+ hadoop.close(true);
if (log.isDebugEnabled())
log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
@@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
HadoopIgfsEx hadoop = null;
try {
- hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+ hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
@@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
try {
hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
- log);
+ log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
@@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
HadoopIgfsEx hadoop = null;
try {
- hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+ hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
+ log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index e9c859bd..dd18c66 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -239,9 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
try {
- FileSystem fs = FileSystem.get(jobConf());
-
- HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+ FileSystem.get(jobConf());
LocalFileSystem locFs = FileSystem.getLocal(jobConf());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index d11cabb..9bcd5de 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
@@ -39,6 +40,7 @@ import org.jsr166.*;
import java.io.*;
import java.net.*;
+import java.security.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
/** Thread count for multithreaded tests. */
private static final int THREAD_CNT = 8;
+ /** Secondary file system user. */
+ private static final String SECONDARY_FS_USER = "secondary-default";
+
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
if (mode != PRIMARY)
cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(),
- secondaryFileSystemConfigPath()));
+ secondaryFileSystemConfigPath(), SECONDARY_FS_USER));
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
cfg.setManagementPort(-1);
@@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath()));
- fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+
+ // Create Fs on behalf of the client user:
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override public Object run() throws Exception {
+ fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+
+ return null;
+ }
+ });
barrier = new CyclicBarrier(THREAD_CNT);
}
+ /**
+ * Gets the user the Fs client operates on bahalf of.
+ * @return The user the Fs client operates on bahalf of.
+ */
+ protected String getClientFsUser() {
+ return "foo";
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
try {
@@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
/** @throws Exception If failed. */
public void testStatus() throws Exception {
+ Path file1 = new Path("/file1");
- try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class),
+ try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()))) {
file.write(new byte[1024 * 1024]);
}
FsStatus status = fs.getFsStatus();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner());
+
assertEquals(4, grid(0).cluster().nodes().size());
long used = 0, max = 0;
@@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
os.close();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
fs.setOwner(file, "aUser", "aGroup");
assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
int cnt = 2 * 1024;
- FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
- Options.CreateOpts.perms(FsPermission.getDefault()));
+ try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
+ Options.CreateOpts.perms(FsPermission.getDefault()))) {
- for (long i = 0; i < cnt; i++)
- out.writeLong(i);
+ for (long i = 0; i < cnt; i++)
+ out.writeLong(i);
+ }
- out.close();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
- FSDataInputStream in = fs.open(file, 1024);
+ try (FSDataInputStream in = fs.open(file, 1024)) {
- for (long i = 0; i < cnt; i++)
- assertEquals(i, in.readLong());
-
- in.close();
+ for (long i = 0; i < cnt; i++)
+ assertEquals(i, in.readLong());
+ }
}
/** @throws Exception If failed. */
@@ -1191,6 +1218,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+ assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+ assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
}
/** @throws Exception If failed. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 9e84c51..b089995 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
primaryConfFullPath = null;
SecondaryFileSystemProvider provider =
- new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null);
+ new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
- primaryFs = provider.createFileSystem();
+ primaryFs = provider.createFileSystem(null);
primaryFsUri = provider.uri();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index d9a3c59..b828aad 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
@@ -43,6 +44,7 @@ import org.jsr166.*;
import java.io.*;
import java.lang.reflect.*;
import java.net.*;
+import java.security.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
/** Secondary file system configuration path. */
private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
+ /** Secondary file system user. */
+ private static final String SECONDARY_FS_USER = "secondary-default";
+
/** Secondary endpoint configuration. */
protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG;
@@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500";
}
+ /**
+ * Gets the user the Fs client operates on bahalf of.
+ * @return The user the Fs client operates on bahalf of.
+ */
+ protected String getClientFsUser() {
+ return "foo";
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true);
@@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem);
- fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+ UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+ assertNotNull(clientUgi);
+
+ // Create the Fs on behalf of the specific user:
+ clientUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override public Object run() throws Exception {
+ fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+
+ return null;
+ }
+ });
barrier = new CyclicBarrier(THREAD_CNT);
}
@@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
cfg.setDefaultMode(mode);
if (mode != PRIMARY)
- cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH));
+ cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+ SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
@@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
os.close();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
fs.setOwner(file, "aUser", "aGroup");
assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
int cnt = 2 * 1024;
- FSDataOutputStream out = fs.create(file, true, 1024);
-
- for (long i = 0; i < cnt; i++)
- out.writeLong(i);
+ try (FSDataOutputStream out = fs.create(file, true, 1024)) {
- out.close();
+ for (long i = 0; i < cnt; i++)
+ out.writeLong(i);
+ }
- FSDataInputStream in = fs.open(file, 1024);
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
- for (long i = 0; i < cnt; i++)
- assertEquals(i, in.readLong());
+ try (FSDataInputStream in = fs.open(file, 1024)) {
- in.close();
+ for (long i = 0; i < cnt; i++)
+ assertEquals(i, in.readLong());
+ }
}
/** @throws Exception If failed. */
@@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
String path = fs.getFileStatus(file).getPath().toString();
- assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file"));
+ assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file"));
}
/** @throws Exception If failed. */
@@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
public void testGetWorkingDirectoryIfDefault() throws Exception {
String path = fs.getWorkingDirectory().toString();
- assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous")));
+ assertTrue(path.endsWith("/user/" + getClientFsUser()));
}
/** @throws Exception If failed. */
@@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
@SuppressWarnings("OctalInteger")
public void testMkdirs() throws Exception {
Path fsHome = new Path(PRIMARY_URI);
- Path dir = new Path(fsHome, "/tmp/staging");
- Path nestedDir = new Path(dir, "nested");
+ final Path dir = new Path(fsHome, "/tmp/staging");
+ final Path nestedDir = new Path(dir, "nested");
- FsPermission dirPerm = FsPermission.createImmutable((short)0700);
- FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
+ final FsPermission dirPerm = FsPermission.createImmutable((short)0700);
+ final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
assertTrue(fs.mkdirs(dir, dirPerm));
assertTrue(fs.mkdirs(nestedDir, nestedDirPerm));
assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+ assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+ assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
}
/** @throws Exception If failed. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
index b92b213..fcfd587 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
@@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest
try {
switchHandlerErrorFlag(true);
- HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG);
+ HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null);
client.handshake(null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
index e103c5f..2c17ba9 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
@@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe
Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null);
+ cache.clear(); // avoid influence of previous tests in the same process.
+
String name = "igfs:" + getTestGridName(0) + "@";
Configuration cfg = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
index 8cf31a2..5f90bd4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*;
* Test file systems for the working directory multi-threading support.
*/
public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+ /** the number of threads */
private static final int THREAD_COUNT = 3;
/** {@inheritDoc} */
@@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
try {
int curThreadNum = threadNum.getAndIncrement();
- FileSystem fs = FileSystem.get(uri, cfg);
-
- HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
-
if ("file".equals(uri.getScheme()))
FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
@@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
}
/**
- * Test IGFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testIgfs() throws Exception {
- testFileSystem(URI.create(igfsScheme()));
- }
-
- /**
- * Test HDFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testHdfs() throws Exception {
- testFileSystem(URI.create("hdfs://localhost/"));
- }
-
- /**
* Test LocalFS multi-thread working directory.
*
* @throws Exception If fails.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
index 8a046e0..89bf830 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
@@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
int sigma = max((int)ceil(precission * exp), 5);
- X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission +
+ X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
" sigma: " + sigma);
- assertTrue(abs(exp - levelsCnts[level]) <= sigma);
+ assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
}
}