You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 11:09:38 UTC
[43/63] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index a01bfaf..89b8028 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -17,37 +17,26 @@
package org.apache.ignite.hadoop.fs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
import org.apache.ignite.hadoop.util.UserNameMapper;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
/**
* Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call.
* <p>
* If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop.
*/
-public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware {
+public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable {
/** */
private static final long serialVersionUID = 0L;
- /** File system URI. */
+ /** File system URI. */
private String uri;
/** File system config paths. */
@@ -56,12 +45,6 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
/** User name mapper. */
private UserNameMapper usrNameMapper;
- /** Configuration of the secondary filesystem, never null. */
- protected transient Configuration cfg;
-
- /** Resulting URI. */
- protected transient URI fullUri;
-
/**
* Constructor.
*/
@@ -70,64 +53,17 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
}
/** {@inheritDoc} */
- @Override public final FileSystem get(String name) throws IOException {
- String name0 = IgfsUtils.fixUserName(name);
-
- if (usrNameMapper != null)
- name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
-
- return getWithMappedName(name0);
- }
-
- /**
- * Internal file system create routine.
- *
- * @param usrName User name.
- * @return File system.
- * @throws IOException If failed.
- */
- protected FileSystem getWithMappedName(String usrName) throws IOException {
- assert cfg != null;
-
- try {
- // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
- // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
- // classloader to classloader of current class to avoid strange class-cast-exceptions.
- ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
- try {
- return create(usrName);
- }
- finally {
- HadoopUtils.restoreContextClassLoader(oldLdr);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
- }
-
- /**
- * Internal file system creation routine, invoked in correct class loader context.
- *
- * @param usrName User name.
- * @return File system.
- * @throws IOException If failed.
- * @throws InterruptedException if the current thread is interrupted.
- */
- protected FileSystem create(String usrName) throws IOException, InterruptedException {
- return FileSystem.get(fullUri, cfg, usrName);
+ @Override public final Object get(String name) throws IOException {
+ throw new UnsupportedOperationException("Method should not be called directly.");
}
/**
* Gets file system URI.
* <p>
- * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
+ * This URI will be used as a first argument when calling {@code FileSystem.get(URI, Configuration, String)}.
* <p>
* If not set, default URI will be picked from file system configuration using
- * {@link FileSystem#getDefaultUri(Configuration)} method.
+ * {@code FileSystem.getDefaultUri(Configuration)} method.
*
* @return File system URI.
*/
@@ -149,11 +85,8 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
* <p>
* Path could be either absolute or relative to {@code IGNITE_HOME} environment variable.
* <p>
- * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means
+ * All provided paths will be loaded in the order they provided and then applied to {@code Configuration}. It means
* that path order might be important in some cases.
- * <p>
- * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
- * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well.
*
* @return Paths to file system configuration files.
*/
@@ -198,50 +131,6 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- cfg = HadoopUtils.safeCreateConfiguration();
-
- if (cfgPaths != null) {
- for (String cfgPath : cfgPaths) {
- if (cfgPath == null)
- throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths));
- else {
- URL url = U.resolveIgniteUrl(cfgPath);
-
- if (url == null) {
- // If secConfPath is given, it should be resolvable:
- throw new IgniteException("Failed to resolve secondary file system configuration path " +
- "(ensure that it exists locally and you have read access to it): " + cfgPath);
- }
-
- cfg.addResource(url);
- }
- }
- }
-
- // If secondary fs URI is not given explicitly, try to get it from the configuration:
- if (uri == null)
- fullUri = FileSystem.getDefaultUri(cfg);
- else {
- try {
- fullUri = new URI(uri);
- }
- catch (URISyntaxException use) {
- throw new IgniteException("Failed to resolve secondary file system URI: " + uri);
- }
- }
-
- if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
- ((LifecycleAware)usrNameMapper).start();
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
- ((LifecycleAware)usrNameMapper).stop();
- }
-
- /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, uri);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index bcbb082..b90777c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -17,24 +17,14 @@
package org.apache.ignite.hadoop.fs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-
-import java.io.IOException;
-import java.net.URI;
-
/**
- * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on
+ * Caching Hadoop file system factory. Caches {@code FileSystem} instances on per-user basis. Doesn't rely on
* built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each
* user instead.
* <p>
- * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user
+ * This makes cache instance resistant to concurrent calls to {@code FileSystem.close()} in other parts of the user
* code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to
- * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation
+ * {@code FileSystem.get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation
* calls this method only once per user what may lead to token expiration. In such cases it makes sense to either
* use {@link BasicHadoopFileSystemFactory} or implement your own factory.
*/
@@ -42,44 +32,10 @@ public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory
/** */
private static final long serialVersionUID = 0L;
- /** Per-user file system cache. */
- private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
- @Override public FileSystem createValue(String key) throws IOException {
- return CachingHadoopFileSystemFactory.super.getWithMappedName(key);
- }
- }
- );
-
/**
- * Public non-arg constructor.
+ * Constructor.
*/
public CachingHadoopFileSystemFactory() {
- // noop
- }
-
- /** {@inheritDoc} */
- @Override public FileSystem getWithMappedName(String name) throws IOException {
- return cache.getOrCreate(name);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- super.start();
-
- // Disable caching.
- cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- super.stop();
-
- try {
- cache.close();
- }
- catch (IgniteCheckedException ice) {
- throw new IgniteException(ice);
- }
+ // No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
index 5ad08ab..214328f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
@@ -17,16 +17,13 @@
package org.apache.ignite.hadoop.fs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.lifecycle.LifecycleAware;
import java.io.IOException;
import java.io.Serializable;
/**
- * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}.
+ * Factory for Hadoop {@code FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}.
* <p>
* {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required.
* <p>
@@ -35,10 +32,6 @@ import java.io.Serializable;
* <p>
* Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be
* performed by Ignite. You may want to implement some initialization or cleanup there.
- * <p>
- * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the
- * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file
- * system paths.
*/
public interface HadoopFileSystemFactory extends Serializable {
/**
@@ -48,5 +41,5 @@ public interface HadoopFileSystemFactory extends Serializable {
* @return File system.
* @throws IOException In case of error.
*/
- public FileSystem get(String usrName) throws IOException;
+ public Object get(String usrName) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 8085826..f1c1b16 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -17,25 +17,12 @@
package org.apache.ignite.hadoop.fs;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegate;
/**
* Statistic writer implementation that writes info into any Hadoop file system.
@@ -47,57 +34,39 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
/** */
public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
- /** */
- private static final String USER_MACRO = "${USER}";
+ /** Mutex. */
+ private final Object mux = new Object();
- /** */
- private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
+ /** Delegate. */
+ private volatile HadoopFileSystemCounterWriterDelegate delegate;
/** {@inheritDoc} */
@Override public void write(HadoopJob job, HadoopCounters cntrs)
throws IgniteCheckedException {
+ delegate(job).write(job, cntrs);
+ }
- Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
-
- final HadoopJobInfo jobInfo = job.info();
-
- final HadoopJobId jobId = job.id();
-
- for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
- hadoopCfg.set(e.getKey(), e.getValue());
-
- String user = jobInfo.user();
-
- user = IgfsUtils.fixUserName(user);
-
- String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
-
- if (dir == null)
- dir = DEFAULT_COUNTER_WRITER_DIR;
-
- Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
-
- HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
-
- try {
- hadoopCfg.set(MRJobConfig.USER_NAME, user);
+ /**
+ * Get delegate creating it if needed.
+ *
+ * @param job Job.
+ * @return Delegate.
+ */
+ private HadoopFileSystemCounterWriterDelegate delegate(HadoopJob job) {
+ HadoopFileSystemCounterWriterDelegate delegate0 = delegate;
- FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg);
+ if (delegate0 == null) {
+ synchronized (mux) {
+ delegate0 = delegate;
- fs.mkdirs(jobStatPath);
+ if (delegate0 == null) {
+ delegate0 = HadoopDelegateUtils.counterWriterDelegate(job.getClass().getClassLoader(), this);
- try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
- for (T2<String, Long> evt : perfCntr.evts()) {
- out.print(evt.get1());
- out.print(':');
- out.println(evt.get2().toString());
+ delegate = delegate0;
}
-
- out.flush();
}
}
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
+
+ return delegate0;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 6b5c776..c9d08c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -17,62 +17,48 @@
package org.apache.ignite.hadoop.fs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
-import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
+import org.apache.ignite.internal.processors.igfs.IgfsKernalContextAware;
import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/**
- * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}.
+ * Secondary file system which delegates calls to Hadoop {@code org.apache.hadoop.fs.FileSystem}.
* <p>
* Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}.
*/
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware,
- HadoopPayloadAware {
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, IgfsKernalContextAware,
+ LifecycleAware, HadoopPayloadAware {
/** The default user name. It is used if no user context is set. */
private String dfltUsrName;
/** Factory. */
- private HadoopFileSystemFactory fsFactory;
+ private HadoopFileSystemFactory factory;
+
+ /** Kernal context. */
+ private volatile GridKernalContext ctx;
+
+ /** Target. */
+ private volatile HadoopIgfsSecondaryFileSystemDelegate target;
/**
* Default constructor for Spring.
@@ -135,7 +121,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* Gets default user name.
* <p>
* Defines user name which will be used during file system invocation in case no user name is defined explicitly
- * through {@link FileSystem#get(URI, Configuration, String)}.
+ * through {@code FileSystem.get(URI, Configuration, String)}.
* <p>
* Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name
* explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or
@@ -162,14 +148,14 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/**
* Gets secondary file system factory.
* <p>
- * This factory will be used whenever a call to a target {@link FileSystem} is required.
+ * This factory will be used whenever a call to a target {@code FileSystem} is required.
* <p>
* If not set, {@link CachingHadoopFileSystemFactory} will be used.
*
* @return Secondary file system factory.
*/
public HadoopFileSystemFactory getFileSystemFactory() {
- return fsFactory;
+ return factory;
}
/**
@@ -178,403 +164,115 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @param factory Secondary file system factory.
*/
public void setFileSystemFactory(HadoopFileSystemFactory factory) {
- this.fsFactory = factory;
- }
-
- /**
- * Convert IGFS path into Hadoop path.
- *
- * @param path IGFS path.
- * @return Hadoop path.
- */
- private Path convert(IgfsPath path) {
- URI uri = fileSystemForUser().getUri();
-
- return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
- }
-
- /**
- * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
- *
- * @param e Exception to check.
- * @param detailMsg Detailed error message.
- * @return Appropriate exception.
- */
- private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
- return cast(detailMsg, e);
- }
-
- /**
- * Cast IO exception to IGFS exception.
- *
- * @param e IO exception.
- * @return IGFS exception.
- */
- public static IgfsException cast(String msg, IOException e) {
- if (e instanceof FileNotFoundException)
- return new IgfsPathNotFoundException(e);
- else if (e instanceof ParentNotDirectoryException)
- return new IgfsParentNotDirectoryException(msg, e);
- else if (e instanceof PathIsNotEmptyDirectoryException)
- return new IgfsDirectoryNotEmptyException(e);
- else if (e instanceof PathExistsException)
- return new IgfsPathAlreadyExistsException(msg, e);
- else
- return new IgfsException(msg, e);
- }
-
- /**
- * Convert Hadoop FileStatus properties to map.
- *
- * @param status File status.
- * @return IGFS attributes.
- */
- private static Map<String, String> properties(FileStatus status) {
- FsPermission perm = status.getPermission();
-
- if (perm == null)
- perm = FsPermission.getDefault();
-
- HashMap<String, String> res = new HashMap<>(3);
-
- res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()));
- res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
- res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
-
- return res;
+ this.factory = factory;
}
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
- try {
- return fileSystemForUser().exists(convert(path));
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
- }
+ return target.exists(path);
}
/** {@inheritDoc} */
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
- HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
-
- final FileSystem fileSys = fileSystemForUser();
-
- try {
- if (props0.userName() != null || props0.groupName() != null)
- fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
- if (props0.permission() != null)
- fileSys.setPermission(convert(path), props0.permission());
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
- }
-
- //Result is not used in case of secondary FS.
- return null;
+ return target.update(path, props);
}
/** {@inheritDoc} */
@Override public void rename(IgfsPath src, IgfsPath dest) {
- // Delegate to the secondary file system.
- try {
- if (!fileSystemForUser().rename(convert(src), convert(dest)))
- throw new IgfsException("Failed to rename (secondary file system returned false) " +
- "[src=" + src + ", dest=" + dest + ']');
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
- }
+ target.rename(src, dest);
}
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
- try {
- return fileSystemForUser().delete(convert(path), recursive);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
- }
+ return target.delete(path, recursive);
}
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
- try {
- if (!fileSystemForUser().mkdirs(convert(path)))
- throw new IgniteException("Failed to make directories [path=" + path + "]");
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
- }
+ target.mkdirs(path);
}
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
- try {
- if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
- throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
- }
+ target.mkdirs(path, props);
}
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
- try {
- FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
- if (statuses == null)
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
- Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
- for (FileStatus status : statuses)
- res.add(new IgfsPath(path, status.getPath().getName()));
-
- return res;
- }
- catch (FileNotFoundException ignored) {
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
- }
+ return target.listPaths(path);
}
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
- try {
- FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
- if (statuses == null)
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
- Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
- for (FileStatus s : statuses) {
- IgfsEntryInfo fsInfo = s.isDirectory() ?
- IgfsUtils.createDirectory(
- IgniteUuid.randomUuid(),
- null,
- properties(s),
- s.getAccessTime(),
- s.getModificationTime()
- ) :
- IgfsUtils.createFile(
- IgniteUuid.randomUuid(),
- (int)s.getBlockSize(),
- s.getLen(),
- null,
- null,
- false,
- properties(s),
- s.getAccessTime(),
- s.getModificationTime()
- );
-
- res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1));
- }
-
- return res;
- }
- catch (FileNotFoundException ignored) {
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
- }
+ return target.listFiles(path);
}
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
- return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
+ return target.open(path, bufSize);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
- try {
- return fileSystemForUser().create(convert(path), overwrite);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
- }
+ return target.create(path, overwrite);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
long blockSize, @Nullable Map<String, String> props) {
- HadoopIgfsProperties props0 =
- new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
-
- try {
- return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
- (short) replication, blockSize, null);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
- ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
- ", blockSize=" + blockSize + "]");
- }
+ return target.create(path, bufSize, overwrite, replication, blockSize, props);
}
/** {@inheritDoc} */
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
- try {
- return fileSystemForUser().append(convert(path), bufSize);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
- }
+ return target.append(path, bufSize, create, props);
}
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
- try {
- final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
-
- if (status == null)
- return null;
-
- final Map<String, String> props = properties(status);
-
- return new IgfsFile() {
- @Override public IgfsPath path() {
- return path;
- }
-
- @Override public boolean isFile() {
- return status.isFile();
- }
-
- @Override public boolean isDirectory() {
- return status.isDirectory();
- }
-
- @Override public int blockSize() {
- // By convention directory has blockSize == 0, while file has blockSize > 0:
- return isDirectory() ? 0 : (int)status.getBlockSize();
- }
-
- @Override public long groupBlockSize() {
- return status.getBlockSize();
- }
-
- @Override public long accessTime() {
- return status.getAccessTime();
- }
-
- @Override public long modificationTime() {
- return status.getModificationTime();
- }
-
- @Override public String property(String name) throws IllegalArgumentException {
- String val = props.get(name);
-
- if (val == null)
- throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
-
- return val;
- }
-
- @Nullable @Override public String property(String name, @Nullable String dfltVal) {
- String val = props.get(name);
-
- return val == null ? dfltVal : val;
- }
-
- @Override public long length() {
- return status.getLen();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, String> properties() {
- return props;
- }
- };
- }
- catch (FileNotFoundException ignore) {
- return null;
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
- }
+ return target.info(path);
}
/** {@inheritDoc} */
@Override public long usedSpaceSize() {
- try {
- // We don't use FileSystem#getUsed() since it counts only the files
- // in the filesystem root, not all the files recursively.
- return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
- }
+ return target.usedSpaceSize();
}
/** {@inheritDoc} */
@Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
- try {
- // We don't use FileSystem#getUsed() since it counts only the files
- // in the filesystem root, not all the files recursively.
- fileSystemForUser().setTimes(convert(path), modificationTime, accessTime);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed set times for path: " + path);
- }
+ target.setTimes(path, accessTime, modificationTime);
}
- /**
- * Gets the underlying {@link FileSystem}.
- * This method is used solely for testing.
- * @return the underlying Hadoop {@link FileSystem}.
- */
- public FileSystem fileSystem() {
- return fileSystemForUser();
+ /** {@inheritDoc} */
+ @Override public void setKernalContext(GridKernalContext ctx) {
+ this.ctx = ctx;
}
- /**
- * Gets the FileSystem for the current context user.
- * @return the FileSystem instance, never null.
- */
- private FileSystem fileSystemForUser() {
- String user = IgfsUserContext.currentUser();
-
- if (F.isEmpty(user))
- user = IgfsUtils.fixUserName(dfltUsrName);
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ HadoopClassLoader ldr = ctx.hadoopHelper().commonClassLoader();
- assert !F.isEmpty(user);
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(ldr);
try {
- return fsFactory.get(user);
+ target = HadoopDelegateUtils.secondaryFileSystemDelegate(ldr, this);
+
+ target.start();
}
- catch (IOException ioe) {
- throw new IgniteException(ioe);
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- dfltUsrName = IgfsUtils.fixUserName(dfltUsrName);
-
- if (fsFactory == null)
- fsFactory = new CachingHadoopFileSystemFactory();
-
- if (fsFactory instanceof LifecycleAware)
- ((LifecycleAware) fsFactory).start();
- }
-
- /** {@inheritDoc} */
@Override public void stop() throws IgniteException {
- if (fsFactory instanceof LifecycleAware)
- ((LifecycleAware)fsFactory).stop();
+ if (target != null)
+ target.stop();
}
/** {@inheritDoc} */
@Override public HadoopFileSystemFactory getPayload() {
- return fsFactory;
+ return factory;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
index bbfbc59..46d626b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -17,19 +17,12 @@
package org.apache.ignite.hadoop.fs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.security.PrivilegedExceptionAction;
/**
* Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
@@ -57,9 +50,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
/** The re-login interval. See {@link #getReloginInterval()} for more information. */
private long reloginInterval = DFLT_RELOGIN_INTERVAL;
- /** Time of last re-login attempt, in system milliseconds. */
- private transient volatile long lastReloginTime;
-
/**
* Constructor.
*/
@@ -67,25 +57,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
// No-op.
}
- /** {@inheritDoc} */
- @Override public FileSystem getWithMappedName(String name) throws IOException {
- reloginIfNeeded();
-
- return super.getWithMappedName(name);
- }
-
- /** {@inheritDoc} */
- @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
- UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
- UserGroupInformation.getLoginUser());
-
- return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override public FileSystem run() throws Exception {
- return FileSystem.get(fullUri, cfg);
- }
- });
- }
-
/**
* Gets the key tab principal short name (e.g. "hdfs").
*
@@ -106,9 +77,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
/**
* Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
- * <p>
- * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
- * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well.
*
* @return The key tab file name.
*/
@@ -136,10 +104,8 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
* Negative values are not allowed.
*
* <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
- * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
+ * login if less than {@code org.apache.hadoop.security.UserGroupInformation.MIN_TIME_BEFORE_RELOGIN} milliseconds
* have passed since the time of the previous login.
- * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
- * more detail.
*
* @return The re-login interval, in milliseconds.
*/
@@ -157,47 +123,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
- A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
- A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
-
- super.start();
-
- try {
- UserGroupInformation.setConfiguration(cfg);
- UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
- }
- catch (IOException ioe) {
- throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
- ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
- }
- }
-
- /**
- * Re-logins the user if needed.
- * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
- * frequent than one attempt per {@code reloginInterval}.
- * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
- * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
- *
- * <p>This operation expected to be called upon each operation with the file system created with the factory.
- * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
- * is no need to invoke it otherwise specially.
- *
- * @throws IOException If login fails.
- */
- private void reloginIfNeeded() throws IOException {
- long now = System.currentTimeMillis();
-
- if (now >= lastReloginTime + reloginInterval) {
- UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-
- lastReloginTime = now;
- }
- }
-
- /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index a06129e..7133c08 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -42,12 +42,14 @@ import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.processors.igfs.IgfsPaths;
@@ -58,7 +60,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
import java.io.BufferedOutputStream;
@@ -78,13 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
/**
@@ -165,7 +166,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
private IgfsModeResolver modeRslvr;
/** The secondary file system factory. */
- private HadoopFileSystemFactory factory;
+ private HadoopFileSystemFactoryDelegate factory;
/** Management connection flag. */
private boolean mgmt;
@@ -332,7 +333,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (initSecondary) {
try {
- factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ HadoopFileSystemFactory factory0 =
+ (HadoopFileSystemFactory)paths.getPayload(getClass().getClassLoader());
+
+ factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to get secondary file system factory.", e);
@@ -343,11 +347,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
FileSystemConfiguration.class.getName() + "?)");
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).start();
+ factory.start();
try {
- FileSystem secFs = factory.get(user);
+ FileSystem secFs = (FileSystem)factory.get(user);
secondaryUri = secFs.getUri();
@@ -423,8 +426,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (clientLog.isLogEnabled())
clientLog.close();
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).stop();
+ if (factory != null)
+ factory.stop();
// Reset initialized resources.
uri = null;
@@ -1359,6 +1362,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (factory == null)
return null;
- return factory.get(user);
+ return (FileSystem)factory.get(user);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index bd8ed2d..18b8bf9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -46,13 +46,15 @@ import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.processors.igfs.IgfsPaths;
@@ -63,7 +65,6 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
import java.io.BufferedOutputStream;
@@ -86,13 +87,13 @@ import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
/**
@@ -169,7 +170,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
private IgfsModeResolver modeRslvr;
/** The secondary file system factory. */
- private HadoopFileSystemFactory factory;
+ private HadoopFileSystemFactoryDelegate factory;
/** Whether custom sequential reads before prefetch value is provided. */
private boolean seqReadsBeforePrefetchOverride;
@@ -341,7 +342,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (initSecondary) {
try {
- factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ HadoopFileSystemFactory factory0 =
+ (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+
+ factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to get secondary file system factory.", e);
@@ -354,11 +358,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
assert factory != null;
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).start();
+ factory.start();
try {
- FileSystem secFs = factory.get(user);
+ FileSystem secFs = (FileSystem)factory.get(user);
secondaryUri = secFs.getUri();
@@ -385,8 +388,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.close();
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).stop();
+ if (factory != null)
+ factory.stop();
// Reset initialized resources.
rmtClient = null;
@@ -1071,6 +1074,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
private FileSystem secondaryFileSystem() throws IOException{
assert factory != null;
- return factory.get(user);
+ return (FileSystem)factory.get(user);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
index 583af35..343b5ed 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
@@ -17,10 +17,6 @@
package org.apache.ignite.hadoop.mapreduce;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
@@ -32,10 +28,15 @@ import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFactory;
import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
-import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol;
+import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+
import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
index d4a44fa..e1101c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -17,16 +17,6 @@
package org.apache.ignite.hadoop.mapreduce;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -38,13 +28,23 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.UUID;
+
import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 27ffc19..2d1ac0b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -24,11 +24,11 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
@@ -116,7 +116,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
/** {@inheritDoc} */
@Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
@Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
- List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input());
+ List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input());
int reducerCnt = job.info().reducers();
if (reducerCnt < 0)
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
index 26dc4b2..12669aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
@@ -17,14 +17,12 @@
package org.apache.ignite.hadoop.util;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.jetbrains.annotations.Nullable;
import java.io.Serializable;
/**
- * Hadoop file system name mapper. Used by {@link HadoopFileSystemFactory} implementation to pass proper user names
- * to the underlying Hadoop file system.
+ * Hadoop file system name mapper. Ensures that correct user name is passed to the underlying Hadoop file system.
*/
public interface UserNameMapper extends Serializable {
/**