You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/20 12:55:13 UTC
[2/2] ignite git commit: IGNITE-3918: Moved public classes from
"ignite-hadoop-impl" to "ignite-hadoop" module.
IGNITE-3918: Moved public classes from "ignite-hadoop-impl" to "ignite-hadoop" module.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41de3ab5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41de3ab5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41de3ab5
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 41de3ab575a8bfbea07fa661beac1d6e7735ea3b
Parents: cb304b1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 20 15:54:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 20 15:54:48 2016 +0300
----------------------------------------------------------------------
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 275 ---------
.../fs/CachingHadoopFileSystemFactory.java | 85 ---
.../hadoop/fs/HadoopFileSystemFactory.java | 52 --
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 580 -------------------
.../fs/KerberosHadoopFileSystemFactory.java | 217 -------
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 21 +-
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 21 +-
.../internal/processors/hadoop/HadoopUtils.java | 4 -
.../HadoopBasicFileSystemFactoryDelegate.java | 162 ++++++
.../HadoopCachingFileSystemFactoryDelegate.java | 75 +++
.../HadoopDefaultFileSystemFactoryDelegate.java | 61 ++
...doopIgfsSecondaryFileSystemDelegateImpl.java | 471 +++++++++++++++
...HadoopKerberosFileSystemFactoryDelegate.java | 112 ++++
...KerberosHadoopFileSystemFactorySelfTest.java | 7 +-
.../igfs/HadoopFIleSystemFactorySelfTest.java | 55 +-
...adoopIgfsSecondaryFileSystemTestAdapter.java | 10 +-
...oopSecondaryFileSystemConfigurationTest.java | 14 +-
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 164 ++++++
.../fs/CachingHadoopFileSystemFactory.java | 41 ++
.../hadoop/fs/HadoopFileSystemFactory.java | 45 ++
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 257 ++++++++
.../fs/KerberosHadoopFileSystemFactory.java | 142 +++++
.../hadoop/delegate/HadoopDelegateUtils.java | 117 ++++
.../HadoopFileSystemFactoryDelegate.java | 36 ++
.../HadoopIgfsSecondaryFileSystemDelegate.java | 28 +
25 files changed, 1796 insertions(+), 1256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
deleted file mode 100644
index a01bfaf..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
-import org.apache.ignite.hadoop.util.UserNameMapper;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
-
-/**
- * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call.
- * <p>
- * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop.
- */
-public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** File system URI. */
- private String uri;
-
- /** File system config paths. */
- private String[] cfgPaths;
-
- /** User name mapper. */
- private UserNameMapper usrNameMapper;
-
- /** Configuration of the secondary filesystem, never null. */
- protected transient Configuration cfg;
-
- /** Resulting URI. */
- protected transient URI fullUri;
-
- /**
- * Constructor.
- */
- public BasicHadoopFileSystemFactory() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public final FileSystem get(String name) throws IOException {
- String name0 = IgfsUtils.fixUserName(name);
-
- if (usrNameMapper != null)
- name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
-
- return getWithMappedName(name0);
- }
-
- /**
- * Internal file system create routine.
- *
- * @param usrName User name.
- * @return File system.
- * @throws IOException If failed.
- */
- protected FileSystem getWithMappedName(String usrName) throws IOException {
- assert cfg != null;
-
- try {
- // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
- // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
- // classloader to classloader of current class to avoid strange class-cast-exceptions.
- ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
- try {
- return create(usrName);
- }
- finally {
- HadoopUtils.restoreContextClassLoader(oldLdr);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
- }
-
- /**
- * Internal file system creation routine, invoked in correct class loader context.
- *
- * @param usrName User name.
- * @return File system.
- * @throws IOException If failed.
- * @throws InterruptedException if the current thread is interrupted.
- */
- protected FileSystem create(String usrName) throws IOException, InterruptedException {
- return FileSystem.get(fullUri, cfg, usrName);
- }
-
- /**
- * Gets file system URI.
- * <p>
- * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
- * <p>
- * If not set, default URI will be picked from file system configuration using
- * {@link FileSystem#getDefaultUri(Configuration)} method.
- *
- * @return File system URI.
- */
- @Nullable public String getUri() {
- return uri;
- }
-
- /**
- * Sets file system URI. See {@link #getUri()} for more information.
- *
- * @param uri File system URI.
- */
- public void setUri(@Nullable String uri) {
- this.uri = uri;
- }
-
- /**
- * Gets paths to additional file system configuration files (e.g. core-site.xml).
- * <p>
- * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable.
- * <p>
- * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means
- * that path order might be important in some cases.
- * <p>
- * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
- * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well.
- *
- * @return Paths to file system configuration files.
- */
- @Nullable public String[] getConfigPaths() {
- return cfgPaths;
- }
-
- /**
- * Set paths to additional file system configuration files (e.g. core-site.xml). See {@link #getConfigPaths()} for
- * more information.
- *
- * @param cfgPaths Paths to file system configuration files.
- */
- public void setConfigPaths(@Nullable String... cfgPaths) {
- this.cfgPaths = cfgPaths;
- }
-
- /**
- * Get optional user name mapper.
- * <p>
- * When IGFS is invoked from Hadoop, user name is passed along the way to ensure that request will be performed
- * with proper user context. User name is passed in a simple form and doesn't contain any extended information,
- * such as host, domain or Kerberos realm. You may use name mapper to translate plain user name to full user
- * name required by security engine of the underlying file system.
- * <p>
- * For example you may want to use {@link KerberosUserNameMapper} to user name from {@code "johndoe"} to
- * {@code "johndoe@YOUR.REALM.COM"}.
- *
- * @return User name mapper.
- */
- @Nullable public UserNameMapper getUserNameMapper() {
- return usrNameMapper;
- }
-
- /**
- * Set optional user name mapper. See {@link #getUserNameMapper()} for more information.
- *
- * @param usrNameMapper User name mapper.
- */
- public void setUserNameMapper(@Nullable UserNameMapper usrNameMapper) {
- this.usrNameMapper = usrNameMapper;
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- cfg = HadoopUtils.safeCreateConfiguration();
-
- if (cfgPaths != null) {
- for (String cfgPath : cfgPaths) {
- if (cfgPath == null)
- throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths));
- else {
- URL url = U.resolveIgniteUrl(cfgPath);
-
- if (url == null) {
- // If secConfPath is given, it should be resolvable:
- throw new IgniteException("Failed to resolve secondary file system configuration path " +
- "(ensure that it exists locally and you have read access to it): " + cfgPath);
- }
-
- cfg.addResource(url);
- }
- }
- }
-
- // If secondary fs URI is not given explicitly, try to get it from the configuration:
- if (uri == null)
- fullUri = FileSystem.getDefaultUri(cfg);
- else {
- try {
- fullUri = new URI(uri);
- }
- catch (URISyntaxException use) {
- throw new IgniteException("Failed to resolve secondary file system URI: " + uri);
- }
- }
-
- if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
- ((LifecycleAware)usrNameMapper).start();
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
- ((LifecycleAware)usrNameMapper).stop();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, uri);
-
- if (cfgPaths != null) {
- out.writeInt(cfgPaths.length);
-
- for (String cfgPath : cfgPaths)
- U.writeString(out, cfgPath);
- }
- else
- out.writeInt(-1);
-
- out.writeObject(usrNameMapper);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- uri = U.readString(in);
-
- int cfgPathsCnt = in.readInt();
-
- if (cfgPathsCnt != -1) {
- cfgPaths = new String[cfgPathsCnt];
-
- for (int i = 0; i < cfgPathsCnt; i++)
- cfgPaths[i] = U.readString(in);
- }
-
- usrNameMapper = (UserNameMapper)in.readObject();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
deleted file mode 100644
index bcbb082..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on
- * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each
- * user instead.
- * <p>
- * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user
- * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to
- * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation
- * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either
- * use {@link BasicHadoopFileSystemFactory} or implement your own factory.
- */
-public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Per-user file system cache. */
- private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
- @Override public FileSystem createValue(String key) throws IOException {
- return CachingHadoopFileSystemFactory.super.getWithMappedName(key);
- }
- }
- );
-
- /**
- * Public non-arg constructor.
- */
- public CachingHadoopFileSystemFactory() {
- // noop
- }
-
- /** {@inheritDoc} */
- @Override public FileSystem getWithMappedName(String name) throws IOException {
- return cache.getOrCreate(name);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- super.start();
-
- // Disable caching.
- cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- super.stop();
-
- try {
- cache.close();
- }
- catch (IgniteCheckedException ice) {
- throw new IgniteException(ice);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
deleted file mode 100644
index 5ad08ab..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.igfs.IgfsMode;
-import org.apache.ignite.lifecycle.LifecycleAware;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}.
- * <p>
- * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required.
- * <p>
- * It is implementation dependent whether to rely on built-in Hadoop file system cache, implement own caching facility
- * or doesn't cache file systems at all.
- * <p>
- * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be
- * performed by Ignite. You may want to implement some initialization or cleanup there.
- * <p>
- * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the
- * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file
- * system paths.
- */
-public interface HadoopFileSystemFactory extends Serializable {
- /**
- * Gets file system for the given user name.
- *
- * @param usrName User name
- * @return File system.
- * @throws IOException In case of error.
- */
- public FileSystem get(String usrName) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
deleted file mode 100644
index 6b5c776..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsException;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.apache.ignite.igfs.IgfsUserContext;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
-import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
-import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.lifecycle.LifecycleAware;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}.
- * <p>
- * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}.
- */
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware,
- HadoopPayloadAware {
- /** The default user name. It is used if no user context is set. */
- private String dfltUsrName;
-
- /** Factory. */
- private HadoopFileSystemFactory fsFactory;
-
- /**
- * Default constructor for Spring.
- */
- public IgniteHadoopIgfsSecondaryFileSystem() {
- // No-op.
- }
-
- /**
- * Simple constructor that is to be used by default.
- *
- * @param uri URI of file system.
- * @throws IgniteCheckedException In case of error.
- * @deprecated Use {@link #getFileSystemFactory()} instead.
- */
- @Deprecated
- public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException {
- this(uri, null, null);
- }
-
- /**
- * Constructor.
- *
- * @param uri URI of file system.
- * @param cfgPath Additional path to Hadoop configuration.
- * @throws IgniteCheckedException In case of error.
- * @deprecated Use {@link #getFileSystemFactory()} instead.
- */
- @Deprecated
- public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath)
- throws IgniteCheckedException {
- this(uri, cfgPath, null);
- }
-
- /**
- * Constructor.
- *
- * @param uri URI of file system.
- * @param cfgPath Additional path to Hadoop configuration.
- * @param userName User name.
- * @throws IgniteCheckedException In case of error.
- * @deprecated Use {@link #getFileSystemFactory()} instead.
- */
- @Deprecated
- public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
- @Nullable String userName) throws IgniteCheckedException {
- setDefaultUserName(userName);
-
- CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
-
- fac.setUri(uri);
-
- if (cfgPath != null)
- fac.setConfigPaths(cfgPath);
-
- setFileSystemFactory(fac);
- }
-
- /**
- * Gets default user name.
- * <p>
- * Defines user name which will be used during file system invocation in case no user name is defined explicitly
- * through {@link FileSystem#get(URI, Configuration, String)}.
- * <p>
- * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name
- * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or
- * {@link IgfsUserContext#doAs(String, Callable)} methods.
- * <p>
- * If not set value of system property {@code "user.name"} will be used. If this property is not set either,
- * {@code "anonymous"} will be used.
- *
- * @return Default user name.
- */
- @Nullable public String getDefaultUserName() {
- return dfltUsrName;
- }
-
- /**
- * Sets default user name. See {@link #getDefaultUserName()} for details.
- *
- * @param dfltUsrName Default user name.
- */
- public void setDefaultUserName(@Nullable String dfltUsrName) {
- this.dfltUsrName = dfltUsrName;
- }
-
- /**
- * Gets secondary file system factory.
- * <p>
- * This factory will be used whenever a call to a target {@link FileSystem} is required.
- * <p>
- * If not set, {@link CachingHadoopFileSystemFactory} will be used.
- *
- * @return Secondary file system factory.
- */
- public HadoopFileSystemFactory getFileSystemFactory() {
- return fsFactory;
- }
-
- /**
- * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details.
- *
- * @param factory Secondary file system factory.
- */
- public void setFileSystemFactory(HadoopFileSystemFactory factory) {
- this.fsFactory = factory;
- }
-
- /**
- * Convert IGFS path into Hadoop path.
- *
- * @param path IGFS path.
- * @return Hadoop path.
- */
- private Path convert(IgfsPath path) {
- URI uri = fileSystemForUser().getUri();
-
- return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
- }
-
- /**
- * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
- *
- * @param e Exception to check.
- * @param detailMsg Detailed error message.
- * @return Appropriate exception.
- */
- private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
- return cast(detailMsg, e);
- }
-
- /**
- * Cast IO exception to IGFS exception.
- *
- * @param e IO exception.
- * @return IGFS exception.
- */
- public static IgfsException cast(String msg, IOException e) {
- if (e instanceof FileNotFoundException)
- return new IgfsPathNotFoundException(e);
- else if (e instanceof ParentNotDirectoryException)
- return new IgfsParentNotDirectoryException(msg, e);
- else if (e instanceof PathIsNotEmptyDirectoryException)
- return new IgfsDirectoryNotEmptyException(e);
- else if (e instanceof PathExistsException)
- return new IgfsPathAlreadyExistsException(msg, e);
- else
- return new IgfsException(msg, e);
- }
-
- /**
- * Convert Hadoop FileStatus properties to map.
- *
- * @param status File status.
- * @return IGFS attributes.
- */
- private static Map<String, String> properties(FileStatus status) {
- FsPermission perm = status.getPermission();
-
- if (perm == null)
- perm = FsPermission.getDefault();
-
- HashMap<String, String> res = new HashMap<>(3);
-
- res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()));
- res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
- res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean exists(IgfsPath path) {
- try {
- return fileSystemForUser().exists(convert(path));
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
- HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
-
- final FileSystem fileSys = fileSystemForUser();
-
- try {
- if (props0.userName() != null || props0.groupName() != null)
- fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
- if (props0.permission() != null)
- fileSys.setPermission(convert(path), props0.permission());
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
- }
-
- //Result is not used in case of secondary FS.
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void rename(IgfsPath src, IgfsPath dest) {
- // Delegate to the secondary file system.
- try {
- if (!fileSystemForUser().rename(convert(src), convert(dest)))
- throw new IgfsException("Failed to rename (secondary file system returned false) " +
- "[src=" + src + ", dest=" + dest + ']');
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(IgfsPath path, boolean recursive) {
- try {
- return fileSystemForUser().delete(convert(path), recursive);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(IgfsPath path) {
- try {
- if (!fileSystemForUser().mkdirs(convert(path)))
- throw new IgniteException("Failed to make directories [path=" + path + "]");
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
- try {
- if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
- throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
- try {
- FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
- if (statuses == null)
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
- Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
- for (FileStatus status : statuses)
- res.add(new IgfsPath(path, status.getPath().getName()));
-
- return res;
- }
- catch (FileNotFoundException ignored) {
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
- try {
- FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
- if (statuses == null)
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
- Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
- for (FileStatus s : statuses) {
- IgfsEntryInfo fsInfo = s.isDirectory() ?
- IgfsUtils.createDirectory(
- IgniteUuid.randomUuid(),
- null,
- properties(s),
- s.getAccessTime(),
- s.getModificationTime()
- ) :
- IgfsUtils.createFile(
- IgniteUuid.randomUuid(),
- (int)s.getBlockSize(),
- s.getLen(),
- null,
- null,
- false,
- properties(s),
- s.getAccessTime(),
- s.getModificationTime()
- );
-
- res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1));
- }
-
- return res;
- }
- catch (FileNotFoundException ignored) {
- throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
- return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream create(IgfsPath path, boolean overwrite) {
- try {
- return fileSystemForUser().create(convert(path), overwrite);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
- long blockSize, @Nullable Map<String, String> props) {
- HadoopIgfsProperties props0 =
- new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
-
- try {
- return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
- (short) replication, blockSize, null);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
- ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
- ", blockSize=" + blockSize + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
- @Nullable Map<String, String> props) {
- try {
- return fileSystemForUser().append(convert(path), bufSize);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(final IgfsPath path) {
- try {
- final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
-
- if (status == null)
- return null;
-
- final Map<String, String> props = properties(status);
-
- return new IgfsFile() {
- @Override public IgfsPath path() {
- return path;
- }
-
- @Override public boolean isFile() {
- return status.isFile();
- }
-
- @Override public boolean isDirectory() {
- return status.isDirectory();
- }
-
- @Override public int blockSize() {
- // By convention directory has blockSize == 0, while file has blockSize > 0:
- return isDirectory() ? 0 : (int)status.getBlockSize();
- }
-
- @Override public long groupBlockSize() {
- return status.getBlockSize();
- }
-
- @Override public long accessTime() {
- return status.getAccessTime();
- }
-
- @Override public long modificationTime() {
- return status.getModificationTime();
- }
-
- @Override public String property(String name) throws IllegalArgumentException {
- String val = props.get(name);
-
- if (val == null)
- throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
-
- return val;
- }
-
- @Nullable @Override public String property(String name, @Nullable String dfltVal) {
- String val = props.get(name);
-
- return val == null ? dfltVal : val;
- }
-
- @Override public long length() {
- return status.getLen();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, String> properties() {
- return props;
- }
- };
- }
- catch (FileNotFoundException ignore) {
- return null;
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public long usedSpaceSize() {
- try {
- // We don't use FileSystem#getUsed() since it counts only the files
- // in the filesystem root, not all the files recursively.
- return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
- try {
- // We don't use FileSystem#getUsed() since it counts only the files
- // in the filesystem root, not all the files recursively.
- fileSystemForUser().setTimes(convert(path), modificationTime, accessTime);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed set times for path: " + path);
- }
- }
-
- /**
- * Gets the underlying {@link FileSystem}.
- * This method is used solely for testing.
- * @return the underlying Hadoop {@link FileSystem}.
- */
- public FileSystem fileSystem() {
- return fileSystemForUser();
- }
-
- /**
- * Gets the FileSystem for the current context user.
- * @return the FileSystem instance, never null.
- */
- private FileSystem fileSystemForUser() {
- String user = IgfsUserContext.currentUser();
-
- if (F.isEmpty(user))
- user = IgfsUtils.fixUserName(dfltUsrName);
-
- assert !F.isEmpty(user);
-
- try {
- return fsFactory.get(user);
- }
- catch (IOException ioe) {
- throw new IgniteException(ioe);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- dfltUsrName = IgfsUtils.fixUserName(dfltUsrName);
-
- if (fsFactory == null)
- fsFactory = new CachingHadoopFileSystemFactory();
-
- if (fsFactory instanceof LifecycleAware)
- ((LifecycleAware) fsFactory).start();
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- if (fsFactory instanceof LifecycleAware)
- ((LifecycleAware)fsFactory).stop();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopFileSystemFactory getPayload() {
- return fsFactory;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
deleted file mode 100644
index bbfbc59..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
- * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user.
- * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details.
- * The principal and the key tab name to be used for Kerberos authentication are set explicitly
- * in the factory configuration.
- *
- * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set
- * to {@code true}, file system instances will be cached by Hadoop.
- */
-public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** The default interval used to re-login from the key tab, in milliseconds. */
- public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L;
-
- /** Keytab full file name. */
- private String keyTab;
-
- /** Keytab principal. */
- private String keyTabPrincipal;
-
- /** The re-login interval. See {@link #getReloginInterval()} for more information. */
- private long reloginInterval = DFLT_RELOGIN_INTERVAL;
-
- /** Time of last re-login attempt, in system milliseconds. */
- private transient volatile long lastReloginTime;
-
- /**
- * Constructor.
- */
- public KerberosHadoopFileSystemFactory() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public FileSystem getWithMappedName(String name) throws IOException {
- reloginIfNeeded();
-
- return super.getWithMappedName(name);
- }
-
- /** {@inheritDoc} */
- @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
- UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
- UserGroupInformation.getLoginUser());
-
- return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override public FileSystem run() throws Exception {
- return FileSystem.get(fullUri, cfg);
- }
- });
- }
-
- /**
- * Gets the key tab principal short name (e.g. "hdfs").
- *
- * @return The key tab principal.
- */
- @Nullable public String getKeyTabPrincipal() {
- return keyTabPrincipal;
- }
-
- /**
- * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information.
- *
- * @param keyTabPrincipal The key tab principal name.
- */
- public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) {
- this.keyTabPrincipal = keyTabPrincipal;
- }
-
- /**
- * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
- * <p>
- * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
- * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well.
- *
- * @return The key tab file name.
- */
- @Nullable public String getKeyTab() {
- return keyTab;
- }
-
- /**
- * Sets the key tab file name. See {@link #getKeyTab()} for more information.
- *
- * @param keyTab The key tab file name.
- */
- public void setKeyTab(@Nullable String keyTab) {
- this.keyTab = keyTab;
- }
-
- /**
- * The interval used to re-login from the key tab, in milliseconds.
- * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is
- * because the ticket renew window starts from {@code 0.8 * ticket life time}.
- * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min)
- * is obeys this rule well.
- *
- * <p>Zero value means that re-login should be attempted on each file system operation.
- * Negative values are not allowed.
- *
- * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
- * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
- * have passed since the time of the previous login.
- * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
- * more detail.
- *
- * @return The re-login interval, in milliseconds.
- */
- public long getReloginInterval() {
- return reloginInterval;
- }
-
- /**
- * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information.
- *
- * @param reloginInterval The re-login interval, in milliseconds.
- */
- public void setReloginInterval(long reloginInterval) {
- this.reloginInterval = reloginInterval;
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
- A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
- A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
-
- super.start();
-
- try {
- UserGroupInformation.setConfiguration(cfg);
- UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
- }
- catch (IOException ioe) {
- throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
- ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
- }
- }
-
- /**
- * Re-logins the user if needed.
- * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
- * frequent than one attempt per {@code reloginInterval}.
- * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
- * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
- *
- * <p>This operation expected to be called upon each operation with the file system created with the factory.
- * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
- * is no need to invoke it otherwise specially.
- *
- * @throws IOException If login fails.
- */
- private void reloginIfNeeded() throws IOException {
- long now = System.currentTimeMillis();
-
- if (now >= lastReloginTime + reloginInterval) {
- UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-
- lastReloginTime = now;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- U.writeString(out, keyTab);
- U.writeString(out, keyTabPrincipal);
- out.writeLong(reloginInterval);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- super.readExternal(in);
-
- keyTab = U.readString(in);
- keyTabPrincipal = U.readString(in);
- reloginInterval = in.readLong();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index a06129e..85fc76e 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -42,6 +42,8 @@ import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
@@ -58,7 +60,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
import java.io.BufferedOutputStream;
@@ -165,7 +166,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
private IgfsModeResolver modeRslvr;
/** The secondary file system factory. */
- private HadoopFileSystemFactory factory;
+ private HadoopFileSystemFactoryDelegate factory;
/** Management connection flag. */
private boolean mgmt;
@@ -332,7 +333,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (initSecondary) {
try {
- factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ HadoopFileSystemFactory factory0 =
+ (HadoopFileSystemFactory)paths.getPayload(getClass().getClassLoader());
+
+ factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to get secondary file system factory.", e);
@@ -343,11 +347,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
FileSystemConfiguration.class.getName() + "?)");
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).start();
+ factory.start();
try {
- FileSystem secFs = factory.get(user);
+ FileSystem secFs = (FileSystem)factory.get(user);
secondaryUri = secFs.getUri();
@@ -423,8 +426,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (clientLog.isLogEnabled())
clientLog.close();
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).stop();
+ if (factory != null)
+ factory.stop();
// Reset initialized resources.
uri = null;
@@ -1359,6 +1362,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (factory == null)
return null;
- return factory.get(user);
+ return (FileSystem)factory.get(user);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index bd8ed2d..32e51df 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -46,6 +46,8 @@ import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
@@ -63,7 +65,6 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
import java.io.BufferedOutputStream;
@@ -169,7 +170,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
private IgfsModeResolver modeRslvr;
/** The secondary file system factory. */
- private HadoopFileSystemFactory factory;
+ private HadoopFileSystemFactoryDelegate factory;
/** Whether custom sequential reads before prefetch value is provided. */
private boolean seqReadsBeforePrefetchOverride;
@@ -341,7 +342,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (initSecondary) {
try {
- factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ HadoopFileSystemFactory factory0 =
+ (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+
+ factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to get secondary file system factory.", e);
@@ -354,11 +358,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
assert factory != null;
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).start();
+ factory.start();
try {
- FileSystem secFs = factory.get(user);
+ FileSystem secFs = (FileSystem)factory.get(user);
secondaryUri = secFs.getUri();
@@ -385,8 +388,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.close();
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).stop();
+ if (factory != null)
+ factory.stop();
// Reset initialized resources.
rmtClient = null;
@@ -1071,6 +1074,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
private FileSystem secondaryFileSystem() throws IOException{
assert factory != null;
- return factory.get(user);
+ return (FileSystem)factory.get(user);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 83ccdf0..08c3cb5 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -25,12 +25,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.TreeSet;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..9e69914
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.util.UserNameMapper;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
+
+/**
+ * Basic Hadoop file system factory delegate.
+ */
+public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
+ /** Proxy. */
+ protected final HadoopFileSystemFactory proxy;
+
+ /** Configuration of the secondary filesystem, never null. */
+ protected Configuration cfg;
+
+ /** Resulting URI. */
+ protected URI fullUri;
+
+ /** User name mapper. */
+ private UserNameMapper usrNameMapper;
+
+ /**
+ * Constructor.
+ *
+ * @param proxy Proxy.
+ */
+ public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) {
+ this.proxy = proxy;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String name) throws IOException {
+ String name0 = IgfsUtils.fixUserName(name);
+
+ if (usrNameMapper != null)
+ name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
+
+ return getWithMappedName(name0);
+ }
+
+ /**
+ * Internal file system create routine.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ */
+ protected FileSystem getWithMappedName(String usrName) throws IOException {
+ assert cfg != null;
+
+ try {
+ // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
+ // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
+ // classloader to classloader of current class to avoid strange class-cast-exceptions.
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ return create(usrName);
+ }
+ finally {
+ HadoopUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", e);
+ }
+ }
+
+ /**
+ * Internal file system creation routine, invoked in correct class loader context.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ * @throws InterruptedException if the current thread is interrupted.
+ */
+ protected FileSystem create(String usrName) throws IOException, InterruptedException {
+ return FileSystem.get(fullUri, cfg, usrName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy;
+
+ cfg = HadoopUtils.safeCreateConfiguration();
+
+ if (proxy0.getConfigPaths() != null) {
+ for (String cfgPath : proxy0.getConfigPaths()) {
+ if (cfgPath == null)
+ throw new NullPointerException("Configuration path cannot be null: " +
+ Arrays.toString(proxy0.getConfigPaths()));
+ else {
+ URL url = U.resolveIgniteUrl(cfgPath);
+
+ if (url == null) {
+ // If secConfPath is given, it should be resolvable:
+ throw new IgniteException("Failed to resolve secondary file system configuration path " +
+ "(ensure that it exists locally and you have read access to it): " + cfgPath);
+ }
+
+ cfg.addResource(url);
+ }
+ }
+ }
+
+ // If secondary fs URI is not given explicitly, try to get it from the configuration:
+ if (proxy0.getUri() == null)
+ fullUri = FileSystem.getDefaultUri(cfg);
+ else {
+ try {
+ fullUri = new URI(proxy0.getUri());
+ }
+ catch (URISyntaxException use) {
+ throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri());
+ }
+ }
+
+ usrNameMapper = proxy0.getUserNameMapper();
+
+ if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
+ ((LifecycleAware)usrNameMapper).start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
+ ((LifecycleAware)usrNameMapper).stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..04bbeb8
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+
+import java.io.IOException;
+
+/**
+ * Caching Hadoop file system factory delegate.
+ */
+public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
+ /** Per-user file system cache. */
+ private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
+ @Override public FileSystem createValue(String key) throws IOException {
+ return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key);
+ }
+ }
+ );
+
+ /**
+ * Constructor.
+ *
+ * @param proxy Proxy.
+ */
+ public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) {
+ super(proxy);
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem getWithMappedName(String name) throws IOException {
+ return cache.getOrCreate(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ super.start();
+
+ // Disable caching.
+ cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ super.stop();
+
+ try {
+ cache.close();
+ }
+ catch (IgniteCheckedException ice) {
+ throw new IgniteException(ice);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..3eb6239
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+
+/**
+ * Hadoop file system factory delegate for non-standard factories.
+ */
+public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
+ /** Factory. */
+ private final HadoopFileSystemFactory factory;
+
+ /**
+ * Constructor.
+ *
+ * @param factory Factory.
+ */
+ public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) {
+ assert factory != null;
+
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String usrName) throws IOException {
+ return (FileSystem)factory.get(usrName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware)factory).start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware)factory).stop();
+ }
+}