You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/23 11:40:54 UTC
[12/12] ignite git commit: IGNITE-2206: WIP.
IGNITE-2206: WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31d3289d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31d3289d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31d3289d
Branch: refs/heads/ignite-2206
Commit: 31d3289df789eab0ba7bbe8350507c69cc0ba845
Parents: 13f8170
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Dec 23 13:40:50 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 23 13:40:50 2015 +0300
----------------------------------------------------------------------
.../fs/CachingHadoopFileSystemFactory.java | 10 +-
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 227 ++++++-------------
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 1 +
.../hadoop/IgfsSecondaryFileSystemEx.java | 15 --
4 files changed, 78 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/31d3289d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index 3bad850..1e97b30 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -139,18 +139,21 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory,
return fileSys;
}
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, uriStr);
U.writeCollection(out, cfgPathStr);
}
+ /** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
uriStr = U.readString(in);
cfgPathStr = new ArrayList(U.readCollection(in));
}
+ /** {@inheritDoc} */
@Override public void start() throws IgniteException {
cfg = HadoopUtils.safeCreateConfiguration();
@@ -161,13 +164,15 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory,
if (url == null) {
// If secConfPath is given, it should be resolvable:
- throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
-
+ throw new IgniteException("Failed to resolve secondary file system configuration path " +
"(ensure that it exists locally and you have read access to it): " + confPath);
}
cfg.addResource(url);
}
+ else {
+ // TODO: Throw exception.
+ }
}
}
@@ -191,6 +196,7 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory,
cfg.setBoolean(prop, true);
}
+ /** {@inheritDoc} */
@Override public void stop() throws IgniteException {
try {
fileSysLazyMap.close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/31d3289d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 756989c..aa1952d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -17,15 +17,6 @@
package org.apache.ignite.hadoop.fs;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -52,11 +43,19 @@ import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
import org.apache.ignite.internal.processors.igfs.IgfsFileInfo;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
-
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
@@ -66,54 +65,17 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
* In fact, this class deals with different FileSystems depending on the user context,
* see {@link IgfsUserContext#currentUser()}.
*/
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem,
- LifecycleAware, HadoopPayloadAware {
-// /** Properties of file system, see {@link #properties()}
-// * */
-// private final Map<String, String> props = new HashMap<>();
-
- /** Secondary file system provider. */
- //private SecondaryFileSystemProvider secProvider;
-
- /** FileSystem instance created for the default user.
- * Stored outside the fileSysLazyMap due to performance reasons. */
- private FileSystem dfltFs;
-
-// /** */
-// private String uriStr;
-//
-// /** Note: */
-// private URI uri;
-//
-// /** */
-// private Collection<String> cfgPathsStr;
-//
-// /** */
-// private @Nullable Collection<URI> cfgPaths;
-
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware,
+ HadoopPayloadAware {
/** The default user name. It is used if no user context is set. */
- private String usrName = IgfsUtils.fixUserName(null);
+ private String dfltUsrName = IgfsUtils.fixUserName(null);
/** */
private HadoopFileSystemFactory fsFactory;
- private final AtomicBoolean started = new AtomicBoolean();
-
-// /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
-// private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
-// new ValueFactory<String, FileSystem>() {
-// @Override public FileSystem createValue(String key) {
-// try {
-// assert !F.isEmpty(key);
-//
-// return secProvider.createFileSystem(key);
-// }
-// catch (IOException ioe) {
-// throw new IgniteException(ioe);
-// }
-// }
-// }
-// );
+ /** FileSystem instance created for the default user. Stored outside due to performance reasons. */
+ // TODO: Remove.
+ private volatile FileSystem dfltFs;
/**
* Default constructor for Spring.
@@ -139,6 +101,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @param cfgPath Additional path to Hadoop configuration.
* @throws IgniteCheckedException In case of error.
*/
+ @Deprecated
public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath)
throws IgniteCheckedException {
this(uri, cfgPath, null);
@@ -168,6 +131,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
setUserName(userName);
}
+ // TODO: Add getter.
+ // TODO: Add docs.
/**
*
* @param factory
@@ -178,33 +143,19 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
this.fsFactory = factory;
}
+ // TODO: Add getter.
+ // TODO: Add docs.
+ // TODO: Rename to "setDefaultUserName"
+
/**
*
* @param usrName
*/
public void setUserName(String usrName) {
// TODO: Move fix to start routine.
- this.usrName = IgfsUtils.fixUserName(usrName);
+ this.dfltUsrName = IgfsUtils.fixUserName(usrName);
}
-// /**
-// * Sets the file system properties.
-// */
-// private void setProperties() {
-// String uri = this.uri.toString();
-//
-// if (!uri.endsWith("/"))
-// uri += "/";
-//
-// String cfgPath = secProvider.configurationPath();
-//
-// if (cfgPath != null)
-// props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
-//
-// props.put(SECONDARY_FS_URI, uri);
-// props.put(SECONDARY_FS_USER_NAME, dfltUserName);
-// }
-
/**
* Convert IGFS path into Hadoop path.
*
@@ -212,7 +163,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @return Hadoop path.
*/
private Path convert(IgfsPath path) {
- URI uri = fileSysForUser().getUri();
+ URI uri = fileSystemForUser().getUri();
return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
}
@@ -266,7 +217,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
try {
- return fileSysForUser().exists(convert(path));
+ return fileSystemForUser().exists(convert(path));
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -277,7 +228,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
- final FileSystem fileSys = fileSysForUser();
+ final FileSystem fileSys = fileSystemForUser();
try {
if (props0.userName() != null || props0.groupName() != null)
@@ -298,7 +249,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public void rename(IgfsPath src, IgfsPath dest) {
// Delegate to the secondary file system.
try {
- if (!fileSysForUser().rename(convert(src), convert(dest)))
+ if (!fileSystemForUser().rename(convert(src), convert(dest)))
throw new IgfsException("Failed to rename (secondary file system returned false) " +
"[src=" + src + ", dest=" + dest + ']');
}
@@ -310,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
try {
- return fileSysForUser().delete(convert(path), recursive);
+ return fileSystemForUser().delete(convert(path), recursive);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -320,7 +271,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
try {
- if (!fileSysForUser().mkdirs(convert(path)))
+ if (!fileSystemForUser().mkdirs(convert(path)))
throw new IgniteException("Failed to make directories [path=" + path + "]");
}
catch (IOException e) {
@@ -331,7 +282,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
try {
- if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+ if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
}
catch (IOException e) {
@@ -342,7 +293,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
try {
- FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
+ FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -365,7 +316,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
try {
- FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
+ FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -392,13 +343,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
- return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
+ return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
try {
- return fileSysForUser().create(convert(path), overwrite);
+ return fileSystemForUser().create(convert(path), overwrite);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -412,8 +363,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
try {
- return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
- (short)replication, blockSize, null);
+ return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+ (short) replication, blockSize, null);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -426,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
- return fileSysForUser().append(convert(path), bufSize);
+ return fileSystemForUser().append(convert(path), bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -436,7 +387,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
try {
- final FileStatus status = fileSysForUser().getFileStatus(convert(path));
+ final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
if (status == null)
return null;
@@ -511,71 +462,37 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
- return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
+ return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
}
}
-// /** {@inheritDoc} */
-// @Override public Map<String, String> properties() {
-// return Collections.emptyMap();
-// }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- Exception e = null;
-
- try {
- if (dfltFs != null)
- dfltFs.close();
- }
- catch (Exception e0) {
- e = e0;
- }
-
- try {
- if (fsFactory instanceof LifecycleAware)
- ((LifecycleAware)fsFactory).stop();
- }
- catch (IgniteException ie) {
- if (e == null)
- e = ie;
- }
-
- if (e != null)
- throw new IgniteException(e);
- }
-
/**
* Gets the underlying {@link FileSystem}.
* This method is used solely for testing.
* @return the underlying Hadoop {@link FileSystem}.
*/
public FileSystem fileSystem() {
- return fileSysForUser();
+ return fileSystemForUser();
}
/**
* Gets the FileSystem for the current context user.
* @return the FileSystem instance, never null.
*/
- private FileSystem fileSysForUser() {
- assert started.get(); // Ensure the Fs is started.
-
+ private FileSystem fileSystemForUser() {
String user = IgfsUserContext.currentUser();
if (F.isEmpty(user))
- user = usrName; // default is never empty.
+ user = dfltUsrName; // default is never empty.
assert !F.isEmpty(user);
- if (F.eq(user, usrName))
+ if (F.eq(user, dfltUsrName))
return dfltFs; // optimization
- //assert fsFactory.uri() != null : "uri!";
-
try {
return fsFactory.create(user);
}
@@ -584,48 +501,42 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
}
}
- /**
- * Should be invoked by client (from Spring?) after all the setters invoked.
- *
- * @throws IgniteCheckedException
- */
- @Override public void start() {
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
// #start() should not ever be invoked if these properties are not set:
A.ensure(fsFactory != null, "factory");
- A.ensure(usrName != null, "dfltUserName");
-
- // Avoid
- if (started.compareAndSet(false, true)) {
- if (fsFactory instanceof LifecycleAware)
- ((LifecycleAware) fsFactory).start();
-
- try {
- //this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
-
- // File system creation for the default user name.
- // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
- //this.dfltFs = secProvider.createFileSystem(dfltUserName);
- this.dfltFs = fsFactory.create(usrName);
+ A.ensure(dfltUsrName != null, "userName");
- assert dfltFs != null;
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- }
+ if (fsFactory instanceof LifecycleAware)
+ ((LifecycleAware) fsFactory).start();
}
-// /** {@inheritDoc} */
-// @Nullable @Override public HadoopFileSystemFactory<FileSystem> getSecondaryFileSystemFactory() {
-// return fsFactory;
-// }
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ Exception e = null;
+ try {
+ if (dfltFs != null)
+ dfltFs.close();
+ }
+ catch (Exception e0) {
+ e = e0;
+ }
+ try {
+ if (fsFactory instanceof LifecycleAware)
+ ((LifecycleAware)fsFactory).stop();
+ }
+ catch (IgniteException ie) {
+ if (e == null)
+ e = ie;
+ }
- @Override public void stop() throws IgniteException {
- close();
+ if (e != null)
+ throw new IgniteException(e);
}
+ /** {@inheritDoc} */
@Override public HadoopFileSystemFactory getPayload() {
return fsFactory;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/31d3289d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 9463412..1546995 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -164,6 +164,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** IGFS mode resolver. */
private IgfsModeResolver modeRslvr;
+ // TODO: Secondary file system must be changed to factory.
/** Secondary file system instance. */
private FileSystem secondaryFs;
http://git-wip-us.apache.org/repos/asf/ignite/blob/31d3289d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java
deleted file mode 100644
index 582e798..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java
+++ /dev/null
@@ -1,15 +0,0 @@
-//package org.apache.ignite.internal.processors.hadoop;
-//
-//import org.apache.ignite.IgniteCheckedException;
-//import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-//
-///**
-// *
-// */
-//public interface IgfsSecondaryFileSystemEx extends IgfsSecondaryFileSystem {
-// /**
-// *
-// * @throws IgniteCheckedException
-// */
-// public void start() throws IgniteCheckedException;
-//}