You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/18 10:02:34 UTC
[2/7] ignite git commit: IGNITE-3920: Removed IgfsPaths and secondary
file system propagation to client Hadoop file systems. This closes #1077.
This closes #1107.
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 82ad683..c9d4cbb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -17,6 +17,21 @@
package org.apache.ignite.hadoop.fs.v2;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,7 +42,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.InvalidPathException;
@@ -36,57 +50,27 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
-import org.apache.ignite.internal.processors.igfs.IgfsPaths;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
-import static org.apache.ignite.igfs.IgfsMode.PROXY;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
@@ -163,15 +147,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** Default replication factor. */
private short dfltReplication;
- /** Secondary URI string. */
- private URI secondaryUri;
-
- /** Mode resolver. */
- private IgfsModeResolver modeRslvr;
-
- /** The secondary file system factory. */
- private HadoopFileSystemFactoryDelegate factory;
-
/** Whether custom sequential reads before prefetch value is provided. */
private boolean seqReadsBeforePrefetchOverride;
@@ -303,8 +278,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
grpBlockSize = handshake.blockSize();
- IgfsPaths paths = handshake.secondaryPaths();
-
Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
@@ -318,59 +291,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
}
else
clientLog = IgfsLogger.disabledLogger();
-
- try {
- modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
- }
- catch (IgniteCheckedException ice) {
- throw new IOException(ice);
- }
-
- boolean initSecondary = paths.defaultMode() == PROXY;
-
- if (!initSecondary && paths.pathModes() != null) {
- for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
- IgfsMode mode = pathMode.getValue();
-
- if (mode == PROXY) {
- initSecondary = true;
-
- break;
- }
- }
- }
-
- if (initSecondary) {
- try {
- HadoopFileSystemFactory factory0 =
- (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
-
- factory = HadoopDelegateUtils.fileSystemFactoryDelegate(getClass().getClassLoader(), factory0);
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to get secondary file system factory.", e);
- }
-
- if (factory == null)
- throw new IOException("Failed to get secondary file system factory (did you set " +
- IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
- FileSystemConfiguration.class.getName() + "?)");
-
- assert factory != null;
-
- factory.start();
-
- try {
- FileSystem secFs = (FileSystem)factory.get(user);
-
- secondaryUri = secFs.getUri();
-
- A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
- }
- catch (IOException e) {
- throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
- }
- }
}
finally {
leaveBusy();
@@ -388,9 +308,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.close();
- if (factory != null)
- factory.stop();
-
// Reset initialized resources.
rmtClient = null;
}
@@ -414,19 +331,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** {@inheritDoc} */
@Override public boolean setReplication(Path f, short replication) throws IOException {
- return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
+ return false;
}
/** {@inheritDoc} */
@Override public void setTimes(Path f, long mtime, long atime) throws IOException {
- if (mode(f) == PROXY)
- secondaryFileSystem().setTimes(f, mtime, atime);
- else {
- if (mtime == -1 && atime == -1)
- return;
+ if (mtime == -1 && atime == -1)
+ return;
- rmtClient.setTimes(convert(f), atime, mtime);
- }
+ rmtClient.setTimes(convert(f), atime, mtime);
}
/** {@inheritDoc} */
@@ -443,13 +356,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
A.notNull(p, "p");
- if (mode(p) == PROXY)
- secondaryFileSystem().setPermission(toSecondary(p), perm);
- else {
- if (rmtClient.update(convert(p), permission(perm)) == null)
- throw new IOException("Failed to set file permission (file not found?)" +
- " [path=" + p + ", perm=" + perm + ']');
- }
+ if (rmtClient.update(convert(p), permission(perm)) == null)
+ throw new IOException("Failed to set file permission (file not found?)" +
+ " [path=" + p + ", perm=" + perm + ']');
}
finally {
leaveBusy();
@@ -465,9 +374,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
enterBusy();
try {
- if (mode(p) == PROXY)
- secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
- else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr,
+ if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr,
IgfsUtils.PROP_GROUP_NAME, grp)) == null) {
throw new IOException("Failed to set file permission (file not found?)" +
" [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
@@ -486,50 +393,29 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (mode == PROXY) {
- FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
-
- if (clientLog.isLogEnabled()) {
- // At this point we do not know file size, so we perform additional request to remote FS to get it.
- FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
- long size = status != null ? status.getLen() : -1;
+ HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ?
+ rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
- long logId = IgfsLogger.nextId();
+ long logId = -1;
- clientLog.logOpen(logId, path, PROXY, bufSize, size);
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
- return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId));
- }
- else
- return is;
+ clientLog.logOpen(logId, path, bufSize, stream.length());
}
- else {
- HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ?
- rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
-
- long logId = -1;
-
- if (clientLog.isLogEnabled()) {
- logId = IgfsLogger.nextId();
-
- clientLog.logOpen(logId, path, mode, bufSize, stream.length());
- }
- if (LOG.isDebugEnabled())
- LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
- ", bufSize=" + bufSize + ']');
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
+ ", bufSize=" + bufSize + ']');
- HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(),
- bufSize, LOG, clientLog, logId);
+ HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(),
+ bufSize, LOG, clientLog, logId);
- if (LOG.isDebugEnabled())
- LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
- return new FSDataInputStream(igfsIn);
- }
+ return new FSDataInputStream(igfsIn);
}
finally {
leaveBusy();
@@ -561,80 +447,60 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
if (LOG.isDebugEnabled())
LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
- if (mode == PROXY) {
- FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
- replication, blockSize, progress);
-
- if (clientLog.isLogEnabled()) {
- long logId = IgfsLogger.nextId();
-
- if (append)
- clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
- else
- clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
-
- return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId));
- }
- else
- return os;
- }
- else {
- Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
- IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
-
- // Create stream and close it in the 'finally' section if any sequential operation failed.
- HadoopIgfsStreamDelegate stream;
+ Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
+ IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
- long logId = -1;
+ // Create stream and close it in the 'finally' section if any sequential operation failed.
+ HadoopIgfsStreamDelegate stream;
- if (append) {
- stream = rmtClient.append(path, create, permMap);
+ long logId = -1;
- if (clientLog.isLogEnabled()) {
- logId = IgfsLogger.nextId();
+ if (append) {
+ stream = rmtClient.append(path, create, permMap);
- clientLog.logAppend(logId, path, mode, bufSize);
- }
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
- if (LOG.isDebugEnabled())
- LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
+ clientLog.logAppend(logId, path, bufSize);
}
- else {
- stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
- permMap);
- if (clientLog.isLogEnabled()) {
- logId = IgfsLogger.nextId();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
+ }
+ else {
+ stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
+ permMap);
- clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
- }
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
- if (LOG.isDebugEnabled())
- LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
+ clientLog.logCreate(logId, path, overwrite, bufSize, replication, blockSize);
}
- assert stream != null;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
+ }
+
+ assert stream != null;
- HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG,
- clientLog, logId);
+ HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG,
+ clientLog, logId);
- bufSize = Math.max(64 * 1024, bufSize);
+ bufSize = Math.max(64 * 1024, bufSize);
- out = new BufferedOutputStream(igfsOut, bufSize);
+ out = new BufferedOutputStream(igfsOut, bufSize);
- FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
+ FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
- // Mark stream created successfully.
- out = null;
+ // Mark stream created successfully.
+ out = null;
- return res;
- }
+ return res;
}
finally {
// Close if failed during stream creation.
@@ -661,15 +527,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
IgfsPath srcPath = convert(src);
IgfsPath dstPath = convert(dst);
- IgfsMode srcMode = modeRslvr.resolveMode(srcPath);
-
if (clientLog.isLogEnabled())
- clientLog.logRename(srcPath, srcMode, dstPath);
+ clientLog.logRename(srcPath, dstPath);
- if (srcMode == PROXY)
- secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
- else
- rmtClient.rename(srcPath, dstPath);
+ rmtClient.rename(srcPath, dstPath);
}
finally {
leaveBusy();
@@ -685,19 +546,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (mode == PROXY) {
- if (clientLog.isLogEnabled())
- clientLog.logDelete(path, PROXY, recursive);
-
- return secondaryFileSystem().delete(toSecondary(f), recursive);
- }
-
boolean res = rmtClient.delete(path, recursive);
if (clientLog.isLogEnabled())
- clientLog.logDelete(path, mode, recursive);
+ clientLog.logDelete(path, recursive);
return res;
}
@@ -708,16 +560,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** {@inheritDoc} */
@Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
- // Checksum has effect for secondary FS only.
- if (factory != null)
- secondaryFileSystem().setVerifyChecksum(verifyChecksum);
+ // No-op.
}
/** {@inheritDoc} */
@Override public FileChecksum getFileChecksum(Path f) throws IOException {
- if (mode(f) == PROXY)
- return secondaryFileSystem().getFileChecksum(f);
-
return null;
}
@@ -729,52 +576,29 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
- if (mode == PROXY) {
- FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
+ Collection<IgfsFile> list = rmtClient.listFiles(path);
- if (arr == null)
- throw new FileNotFoundException("File " + f + " does not exist.");
+ if (list == null)
+ throw new FileNotFoundException("File " + f + " does not exist.");
- for (int i = 0; i < arr.length; i++)
- arr[i] = toPrimary(arr[i]);
+ List<IgfsFile> files = new ArrayList<>(list);
- if (clientLog.isLogEnabled()) {
- String[] fileArr = new String[arr.length];
+ FileStatus[] arr = new FileStatus[files.size()];
- for (int i = 0; i < arr.length; i++)
- fileArr[i] = arr[i].getPath().toString();
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = convert(files.get(i));
- clientLog.logListDirectory(path, PROXY, fileArr);
- }
-
- return arr;
- }
- else {
- Collection<IgfsFile> list = rmtClient.listFiles(path);
-
- if (list == null)
- throw new FileNotFoundException("File " + f + " does not exist.");
-
- List<IgfsFile> files = new ArrayList<>(list);
-
- FileStatus[] arr = new FileStatus[files.size()];
+ if (clientLog.isLogEnabled()) {
+ String[] fileArr = new String[arr.length];
for (int i = 0; i < arr.length; i++)
- arr[i] = convert(files.get(i));
-
- if (clientLog.isLogEnabled()) {
- String[] fileArr = new String[arr.length];
-
- for (int i = 0; i < arr.length; i++)
- fileArr[i] = arr[i].getPath().toString();
-
- clientLog.logListDirectory(path, mode, fileArr);
- }
+ fileArr[i] = arr[i].getPath().toString();
- return arr;
+ clientLog.logListDirectory(path, fileArr);
}
+
+ return arr;
}
finally {
leaveBusy();
@@ -789,20 +613,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
- if (mode == PROXY) {
- if (clientLog.isLogEnabled())
- clientLog.logMakeDirectory(path, PROXY);
+ rmtClient.mkdirs(path, permission(perm));
- secondaryFileSystem().mkdirs(toSecondary(f), perm);
- }
- else {
- rmtClient.mkdirs(path, permission(perm));
-
- if (clientLog.isLogEnabled())
- clientLog.logMakeDirectory(path, mode);
- }
+ if (clientLog.isLogEnabled())
+ clientLog.logMakeDirectory(path);
}
finally {
leaveBusy();
@@ -816,16 +631,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
enterBusy();
try {
- if (mode(f) == PROXY)
- return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
- else {
- IgfsFile info = rmtClient.info(convert(f));
+ IgfsFile info = rmtClient.info(convert(f));
- if (info == null)
- throw new FileNotFoundException("File not found: " + f);
+ if (info == null)
+ throw new FileNotFoundException("File not found: " + f);
- return convert(info);
- }
+ return convert(info);
}
finally {
leaveBusy();
@@ -841,25 +652,21 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
enterBusy();
try {
- if (modeRslvr.resolveMode(igfsPath) == PROXY)
- return secondaryFileSystem().getFileBlockLocations(path, start, len);
- else {
- long now = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
- List<IgfsBlockLocation> affinity = new ArrayList<>(
- rmtClient.affinity(igfsPath, start, len));
+ List<IgfsBlockLocation> affinity = new ArrayList<>(
+ rmtClient.affinity(igfsPath, start, len));
- BlockLocation[] arr = new BlockLocation[affinity.size()];
+ BlockLocation[] arr = new BlockLocation[affinity.size()];
- for (int i = 0; i < arr.length; i++)
- arr[i] = convert(affinity.get(i));
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = convert(affinity.get(i));
- if (LOG.isDebugEnabled())
- LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
- (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
+ if (LOG.isDebugEnabled())
+ LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
+ (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
- return arr;
- }
+ return arr;
}
finally {
leaveBusy();
@@ -867,77 +674,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
}
/**
- * Resolve path mode.
- *
- * @param path HDFS path.
- * @return Path mode.
- */
- public IgfsMode mode(Path path) {
- return modeRslvr.resolveMode(convert(path));
- }
-
- /**
- * Convert the given path to path acceptable by the primary file system.
- *
- * @param path Path.
- * @return Primary file system path.
- */
- private Path toPrimary(Path path) {
- return convertPath(path, getUri());
- }
-
- /**
- * Convert the given path to path acceptable by the secondary file system.
- *
- * @param path Path.
- * @return Secondary file system path.
- */
- private Path toSecondary(Path path) {
- assert factory != null;
- assert secondaryUri != null;
-
- return convertPath(path, secondaryUri);
- }
-
- /**
- * Convert path using the given new URI.
- *
- * @param path Old path.
- * @param newUri New URI.
- * @return New path.
- */
- private Path convertPath(Path path, URI newUri) {
- assert newUri != null;
-
- if (path != null) {
- URI pathUri = path.toUri();
-
- try {
- return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
- pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
- }
- catch (URISyntaxException e) {
- throw new IgniteException("Failed to construct secondary file system path from the primary file " +
- "system path: " + path, e);
- }
- }
- else
- return null;
- }
-
- /**
- * Convert a file status obtained from the secondary file system to a status of the primary file system.
- *
- * @param status Secondary file system status.
- * @return Primary file system status.
- */
- private FileStatus toPrimary(FileStatus status) {
- return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
- status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
- status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
- }
-
- /**
* Convert IGFS path into Hadoop path.
*
* @param path IGFS path.
@@ -1065,15 +801,4 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
public String user() {
return user;
}
-
- /**
- * Gets cached or creates a {@link FileSystem}.
- *
- * @return The secondary file system.
- */
- private FileSystem secondaryFileSystem() throws IOException{
- assert factory != null;
-
- return (FileSystem)factory.get(user);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
index fe6492e..9c7febf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.hadoop.impl.delegate;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -125,8 +126,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
}
- //Result is not used in case of secondary FS.
- return null;
+ return info(path);
}
/** {@inheritDoc} */
@@ -300,7 +300,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
final Map<String, String> props = properties(status);
- return new IgfsFile() {
+ return new IgfsFileImpl(new IgfsFile() {
@Override public IgfsPath path() {
return path;
}
@@ -353,7 +353,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
@Override public Map<String, String> properties() {
return props;
}
- };
+ }, 0);
}
catch (FileNotFoundException ignore) {
return null;
@@ -400,6 +400,9 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
return blks;
}
+ catch (FileNotFoundException ignored) {
+ return Collections.emptyList();
+ }
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed affinity for path: " + path);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
index 2294134..23bfc4f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
@@ -91,4 +93,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
* @return the user name.
*/
public String user();
+
+ /**
+ * @return Mode resolver.
+ * @throws IgniteCheckedException On error.
+ */
+ IgfsModeResolver modeResolver() throws IgniteCheckedException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
index 0ca2e56..0577c73 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
@@ -210,10 +212,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
@Override public IgfsHandshakeResponse apply() {
igfs.clientLogDirectory(logDir);
- return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
- igfs.globalSampling());
- }
- });
+ return new IgfsHandshakeResponse(igfs.name(), igfs.groupBlockSize(), igfs.globalSampling());
+ }
+ });
}
/**
@@ -660,4 +661,18 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
@Override public String user() {
return user;
}
+
+ /** {@inheritDoc} */
+ @Override public IgfsModeResolver modeResolver() throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsModeResolver>() {
+ @Override public IgfsModeResolver apply() {
+ return ((IgfsImpl)igfs).modeResolver();
+ }
+ });
+ }
+ catch (IllegalStateException ignored) {
+ throw new HadoopIgfsCommunicationException("Failed to get mode resolver because Grid is stopping");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
index 88f26f1..2780966 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
@@ -28,11 +28,13 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest;
import org.apache.ignite.internal.igfs.common.IgfsMessage;
+import org.apache.ignite.internal.igfs.common.IgfsModeResolverRequest;
import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
import org.apache.ignite.internal.igfs.common.IgfsStatusRequest;
import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -52,6 +54,7 @@ import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MODE_RESOLVER;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ;
@@ -103,6 +106,10 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
+ /** Expected result is {@code IgfsFile}. */
+ private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
+ IgfsModeResolver> MODE_RESOLVER_RES = createClosure();
+
/** IGFS name. */
private final String igfs;
@@ -518,4 +525,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
@Override public String user() {
return userName;
}
+
+ /** {@inheritDoc} */
+ @Override public IgfsModeResolver modeResolver() throws IgniteCheckedException {
+ return io.send(new IgfsModeResolverRequest()).chain(MODE_RESOLVER_RES).get();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
index 6fa5d60..bee7dc2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
@@ -32,6 +32,7 @@ import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -272,6 +273,19 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
}
/**
+ * @return Mode resolver.
+ * @throws IOException On error.
+ */
+ public IgfsModeResolver modeResolver() throws IOException{
+ return withReconnectHandling(new FileSystemClosure<IgfsModeResolver>() {
+ @Override public IgfsModeResolver apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.modeResolver();
+ }
+ });
+ }
+
+ /**
* Execute closure which is not path-specific.
*
* @param clo Closure.
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
index 6b5690c..1793a05 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
@@ -127,9 +127,8 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
// Create remote instance.
FileSystem fs = FileSystem.get(URI.create("igfs://primary@127.0.0.1:10500/"), baseConfiguration());
- // Ensure lifecycle callback was invoked.
- assert START_CNT.get() == 2;
- assert STOP_CNT.get() == 0;
+ assertEquals(1, START_CNT.get());
+ assertEquals(0, STOP_CNT.get());
// Check file system operations.
assert fs.exists(PATH_DUAL);
@@ -148,17 +147,16 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
assert secondary.exists(IGFS_PATH_PROXY);
assert fs.exists(PATH_PROXY);
- // Close file system and ensure that associated factory was notified.
fs.close();
- assert START_CNT.get() == 2;
- assert STOP_CNT.get() == 1;
+ assertEquals(1, START_CNT.get());
+ assertEquals(0, STOP_CNT.get());
// Stop primary node and ensure that base factory was notified.
G.stop(primary.context().kernalContext().grid().name(), true);
- assert START_CNT.get() == 2;
- assert STOP_CNT.get() == 2;
+ assertEquals(1, START_CNT.get());
+ assertEquals(1, STOP_CNT.get());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index f70838a..2214b5b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -71,6 +71,7 @@ import org.apache.ignite.igfs.IgfsIpcEndpointType;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
@@ -2108,6 +2109,15 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
}
/**
+ * @throws Exception If failed.
+ */
+ public void testModeResolver() throws Exception {
+ IgfsModeResolver mr = ((IgniteHadoopFileSystem)fs).getModeResolver();
+
+ assertEquals(mode, mr.resolveMode(IgfsPath.ROOT));
+ }
+
+ /**
* Verifies that client reconnects after connection to the server has been lost (multithreaded mode).
*
* @throws Exception If error occurs.
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java
index b61492a..6de033f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java
@@ -31,7 +31,6 @@ import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.internal.igfs.common.IgfsLogger.DELIM_FIELD;
import static org.apache.ignite.internal.igfs.common.IgfsLogger.DELIM_FIELD_VAL;
import static org.apache.ignite.internal.igfs.common.IgfsLogger.HDR;
@@ -123,7 +122,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest
otherLog.close();
- log.logDelete(PATH, PRIMARY, false);
+ log.logDelete(PATH, false);
log.close();
@@ -166,7 +165,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest
public void testLogRead() throws Exception {
IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10);
- log.logOpen(1, PATH, PRIMARY, 2, 3L);
+ log.logOpen(1, PATH, 2, 3L);
log.logRandomRead(1, 4L, 5);
log.logSeek(1, 6L);
log.logSkip(1, 7L);
@@ -177,7 +176,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest
log.close();
checkLog(
- new SB().a(U.jvmPid() + d() + TYPE_OPEN_IN + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 1 + d() + 2 +
+ new SB().a(U.jvmPid() + d() + TYPE_OPEN_IN + d() + PATH_STR_ESCAPED + d() + d() + 1 + d() + 2 +
d() + 3 + d(14)).toString(),
new SB().a(U.jvmPid() + d() + TYPE_RANDOM_READ + d(3) + 1 + d(7) + 4 + d() + 5 + d(8)).toString(),
new SB().a(U.jvmPid() + d() + TYPE_SEEK + d(3) + 1 + d(7) + 6 + d(9)).toString(),
@@ -196,16 +195,16 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest
public void testLogWrite() throws Exception {
IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10);
- log.logCreate(1, PATH, PRIMARY, true, 2, new Integer(3).shortValue(), 4L);
- log.logAppend(2, PATH, PRIMARY, 8);
+ log.logCreate(1, PATH, true, 2, new Integer(3).shortValue(), 4L);
+ log.logAppend(2, PATH, 8);
log.logCloseOut(2, 9L, 10L, 11);
log.close();
checkLog(
- new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 1 + d() +
+ new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + d() + 1 + d() +
2 + d(2) + 0 + d() + 1 + d() + 3 + d() + 4 + d(10)).toString(),
- new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 2 + d() +
+ new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + d() + 2 + d() +
8 + d(2) + 1 + d(13)).toString(),
new SB().a(U.jvmPid() + d() + TYPE_CLOSE_OUT + d(3) + 2 + d(11) + 9 + d() + 10 + d() + 11 + d(3))
.toString()
@@ -225,20 +224,20 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest
String file1 = "/dir3/file1.test";
String file2 = "/dir3/file1.test";
- log.logMakeDirectory(PATH, PRIMARY);
- log.logRename(PATH, PRIMARY, new IgfsPath(newFile));
- log.logListDirectory(PATH, PRIMARY, new String[] { file1, file2 });
- log.logDelete(PATH, PRIMARY, false);
+ log.logMakeDirectory(PATH);
+ log.logRename(PATH, new IgfsPath(newFile));
+ log.logListDirectory(PATH, new String[] { file1, file2 });
+ log.logDelete(PATH, false);
log.close();
checkLog(
- new SB().a(U.jvmPid() + d() + TYPE_DIR_MAKE + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(17)).toString(),
- new SB().a(U.jvmPid() + d() + TYPE_RENAME + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(15) + newFile +
+ new SB().a(U.jvmPid() + d() + TYPE_DIR_MAKE + d() + PATH_STR_ESCAPED + d() + d(17)).toString(),
+ new SB().a(U.jvmPid() + d() + TYPE_RENAME + d() + PATH_STR_ESCAPED + d() + d(15) + newFile +
d(2)).toString(),
- new SB().a(U.jvmPid() + d() + TYPE_DIR_LIST + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(17) + file1 +
+ new SB().a(U.jvmPid() + d() + TYPE_DIR_LIST + d() + PATH_STR_ESCAPED + d() + d(17) + file1 +
DELIM_FIELD_VAL + file2).toString(),
- new SB().a(U.jvmPid() + d() + TYPE_DELETE + d(1) + PATH_STR_ESCAPED + d() + PRIMARY + d(16) + 0 +
+ new SB().a(U.jvmPid() + d() + TYPE_DELETE + d(1) + PATH_STR_ESCAPED + d() + d(16) + 0 +
d()).toString()
);
}
@@ -247,6 +246,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest
* Ensure that log file has only the following lines.
*
* @param lines Expected lines.
+ * @throws Exception If failed.
*/
private void checkLog(String... lines) throws Exception {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(LOG_FILE)));
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
deleted file mode 100644
index e710b97..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.igfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
-import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
-import org.apache.ignite.igfs.IgfsIpcEndpointType;
-import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-
-import java.net.URI;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-
-/**
- * Ensures correct modes resolution for SECONDARY paths.
- */
-public class IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest extends IgfsCommonAbstractTest {
- /** File system. */
- private IgniteHadoopFileSystem fs;
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- U.closeQuiet(fs);
-
- fs = null;
-
- G.stopAll(true);
- }
-
- /**
- * Perform initial startup.
- *
- * @param initDfltPathModes WHether to initialize default path modes.
- * @throws Exception If failed.
- */
- @SuppressWarnings({"NullableProblems", "unchecked"})
- private void startUp(boolean initDfltPathModes) throws Exception {
- startUpSecondary();
-
- FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
- igfsCfg.setName("igfs");
- igfsCfg.setBlockSize(512 * 1024);
- igfsCfg.setInitializeDefaultPathModes(initDfltPathModes);
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.TCP);
- endpointCfg.setPort(10500);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- igfsCfg.setManagementPort(-1);
- igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
- "igfs://igfs-secondary@127.0.0.1:11500/",
- "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
-
- CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
- dataCacheCfg.setCacheMode(PARTITIONED);
- dataCacheCfg.setNearConfiguration(null);
- dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
- dataCacheCfg.setBackups(0);
- dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
- metaCacheCfg.setCacheMode(REPLICATED);
- metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- igfsCfg.setDataCacheConfiguration(dataCacheCfg);
- igfsCfg.setMetaCacheConfiguration(metaCacheCfg);
-
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- cfg.setIgniteInstanceName("igfs-grid");
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
- cfg.setDiscoverySpi(discoSpi);
- cfg.setFileSystemConfiguration(igfsCfg);
-
- cfg.setLocalHost("127.0.0.1");
-
- G.start(cfg);
-
- Configuration fsCfg = new Configuration();
-
- fsCfg.addResource(U.resolveIgniteUrl("modules/core/src/test/config/hadoop/core-site-loopback.xml"));
-
- fsCfg.setBoolean("fs.igfs.impl.disable.cache", true);
-
- fs = (IgniteHadoopFileSystem)FileSystem.get(new URI("igfs://igfs@/"), fsCfg);
- }
-
- /**
- * Startup secondary file system.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("unchecked")
- private void startUpSecondary() throws Exception {
- FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
- igfsCfg.setName("igfs-secondary");
- igfsCfg.setBlockSize(512 * 1024);
- igfsCfg.setDefaultMode(PRIMARY);
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.TCP);
- endpointCfg.setPort(11500);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
- cacheCfg.setCacheMode(PARTITIONED);
- cacheCfg.setNearConfiguration(null);
- cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
- cacheCfg.setBackups(0);
- cacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
- metaCacheCfg.setCacheMode(REPLICATED);
- metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- igfsCfg.setDataCacheConfiguration(cacheCfg);
- igfsCfg.setMetaCacheConfiguration(metaCacheCfg);
-
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- cfg.setIgniteInstanceName("igfs-grid-secondary");
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
- cfg.setDiscoverySpi(discoSpi);
- cfg.setFileSystemConfiguration(igfsCfg);
-
- cfg.setLocalHost("127.0.0.1");
-
- G.start(cfg);
- }
-
- /**
- * Test scenario when defaults are initialized.
- *
- * @throws Exception If failed.
- */
- public void testDefaultsInitialized() throws Exception {
- check(true);
- }
-
- /**
- * Test scenario when defaults are not initialized.
- *
- * @throws Exception If failed.
- */
- public void testDefaultsNotInitialized() throws Exception {
- check(false);
- }
-
- /**
- * Actual check.
- *
- * @param initDfltPathModes Whether to initialize default path modes.
- * @throws Exception If failed.
- */
- private void check(boolean initDfltPathModes) throws Exception {
- startUp(initDfltPathModes);
-
- assertEquals(initDfltPathModes, fs.hasSecondaryFileSystem());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index b012083..576d8db 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSy
import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest;
-import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopCommandLineTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopFileSystemsTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopGroupingTest;
@@ -165,8 +164,6 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalToClientDualAsyncSelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalToClientProxySelfTest.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.class.getName())));
-
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemClientSelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoggerStateSelfTest.class.getName())));