You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/02 16:01:19 UTC
[3/4] incubator-ignite git commit: # IGNITE-386: Finished.
# IGNITE-386: Finished.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6dbf2953
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6dbf2953
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6dbf2953
Branch: refs/heads/ignite-386
Commit: 6dbf2953a64edc959bda3363279eea212acb7751
Parents: 7ce55aa
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 2 15:43:41 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 2 15:43:41 2015 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 7 +-
.../fs/IgniteHadoopSecondaryFileSystem.java | 413 ++++++
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 1254 ++++++++++++++++++
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 1008 ++++++++++++++
.../mapreduce/IgniteHadoopCounterGroup.java | 121 ++
.../hadoop/mapreduce/IgniteHadoopCounters.java | 217 +++
.../protocol/IgniteHadoopClientProtocol.java | 334 +++++
.../IgniteHadoopClientProtocolProvider.java | 137 ++
8 files changed, 3488 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java
index 51bf08e..9f46b00 100644
--- a/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite_new/configuration/FileSystemConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite_new.configuration;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite_new.filesystem.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -121,7 +122,7 @@ public class FileSystemConfiguration {
private int mgmtPort = DFLT_MGMT_PORT;
/** Secondary file system */
- private Igfs secondaryFs;
+ private SecondaryFileSystem secondaryFs;
/** IGFS mode. */
private IgfsMode dfltMode = DFLT_MODE;
@@ -517,7 +518,7 @@ public class FileSystemConfiguration {
*
* @return Secondary file system.
*/
- public Igfs getSecondaryFileSystem() {
+ public SecondaryFileSystem getSecondaryFileSystem() {
return secondaryFs;
}
@@ -527,7 +528,7 @@ public class FileSystemConfiguration {
*
* @param fileSystem
*/
- public void setSecondaryFileSystem(Igfs fileSystem) {
+ public void setSecondaryFileSystem(SecondaryFileSystem fileSystem) {
secondaryFs = fileSystem;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
new file mode 100644
index 0000000..007172a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite_new.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite_new.filesystem.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}.
+ */
+public class IgniteHadoopSecondaryFileSystem implements SecondaryFileSystem, AutoCloseable {
+ /** Property name for path to Hadoop configuration. */
+ public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
+
+ /** Property name for URI of file system. */
+ public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
+
+ /** Hadoop file system. */
+ private final FileSystem fileSys;
+
+ /** Properties of file system */
+ private final Map<String, String> props = new HashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param uri URI of file system.
+ * @param cfgPath Additional path to Hadoop configuration.
+ * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ */
+ public IgniteHadoopSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException {
+ Configuration cfg = new Configuration();
+
+ if (cfgPath != null)
+ cfg.addResource(U.resolveIgniteUrl(cfgPath));
+
+ try {
+ fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
+ }
+ catch (IOException | URISyntaxException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ uri = fileSys.getUri().toString();
+
+ if (!uri.endsWith("/"))
+ uri += "/";
+
+ props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+ props.put(SECONDARY_FS_URI, uri);
+ }
+
+ /**
+ * Convert IGFS path into Hadoop path.
+ *
+ * @param path IGFS path.
+ * @return Hadoop path.
+ */
+ private Path convert(IgfsPath path) {
+ URI uri = fileSys.getUri();
+
+ return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
+ }
+
+ /**
+ * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
+ *
+ * @param e Exception to check.
+ * @param detailMsg Detailed error message.
+ * @return Appropriate exception.
+ */
+ private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
+ boolean wrongVer = X.hasCause(e, RemoteException.class) ||
+ (e.getMessage() != null && e.getMessage().contains("Failed on local"));
+
+ IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) :
+ new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
+ "version.", e);
+
+ return igfsErr;
+ }
+
+ /**
+ * Cast IO exception to IGFS exception.
+ *
+ * @param e IO exception.
+ * @return IGFS exception.
+ */
+ public static IgfsException cast(String msg, IOException e) {
+ if (e instanceof FileNotFoundException)
+ return new IgfsFileNotFoundException(e);
+ else if (e instanceof ParentNotDirectoryException)
+ return new IgfsParentNotDirectoryException(msg, e);
+ else if (e instanceof PathIsNotEmptyDirectoryException)
+ return new IgfsDirectoryNotEmptyException(e);
+ else if (e instanceof PathExistsException)
+ return new IgfsPathAlreadyExistsException(msg, e);
+ else
+ return new IgfsException(msg, e);
+ }
+
+ /**
+ * Convert Hadoop FileStatus properties to map.
+ *
+ * @param status File status.
+ * @return IGFS attributes.
+ */
+ private static Map<String, String> properties(FileStatus status) {
+ FsPermission perm = status.getPermission();
+
+ if (perm == null)
+ perm = FsPermission.getDefault();
+
+ return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(),
+ PROP_GROUP_NAME, status.getGroup());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(IgfsPath path) {
+ try {
+ return fileSys.exists(convert(path));
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
+ IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
+
+ try {
+ if (props0.userName() != null || props0.groupName() != null)
+ fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+
+ if (props0.permission() != null)
+ fileSys.setPermission(convert(path), props0.permission());
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
+ }
+
+ //Result is not used in case of secondary FS.
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(IgfsPath src, IgfsPath dest) {
+ // Delegate to the secondary file system.
+ try {
+ if (!fileSys.rename(convert(src), convert(dest)))
+ throw new IgfsException("Failed to rename (secondary file system returned false) " +
+ "[src=" + src + ", dest=" + dest + ']');
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(IgfsPath path, boolean recursive) {
+ try {
+ return fileSys.delete(convert(path), recursive);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path) {
+ try {
+ if (!fileSys.mkdirs(convert(path)))
+ throw new IgniteException("Failed to make directories [path=" + path + "]");
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
+ try {
+ if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
+ throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+ try {
+ FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+ if (statuses == null)
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+
+ Collection<IgfsPath> res = new ArrayList<>(statuses.length);
+
+ for (FileStatus status : statuses)
+ res.add(new IgfsPath(path, status.getPath().getName()));
+
+ return res;
+ }
+ catch (FileNotFoundException ignored) {
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+ try {
+ FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+ if (statuses == null)
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+
+ Collection<IgfsFile> res = new ArrayList<>(statuses.length);
+
+ for (FileStatus status : statuses) {
+ IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) :
+ new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false,
+ properties(status));
+
+ res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1));
+ }
+
+ return res;
+ }
+ catch (FileNotFoundException ignored) {
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsReader open(IgfsPath path, int bufSize) {
+ return new IgfsHadoopReader(fileSys, convert(path), bufSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream create(IgfsPath path, boolean overwrite) {
+ try {
+ return fileSys.create(convert(path), overwrite);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+ long blockSize, @Nullable Map<String, String> props) {
+ IgfsHadoopFSProperties props0 =
+ new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
+
+ try {
+ return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
+ null);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
+ ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
+ ", blockSize=" + blockSize + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+ @Nullable Map<String, String> props) {
+ try {
+ return fileSys.append(convert(path), bufSize);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile info(final IgfsPath path) {
+ try {
+ final FileStatus status = fileSys.getFileStatus(convert(path));
+
+ if (status == null)
+ return null;
+
+ final Map<String, String> props = properties(status);
+
+ return new IgfsFile() {
+ @Override public IgfsPath path() {
+ return path;
+ }
+
+ @Override public boolean isFile() {
+ return status.isFile();
+ }
+
+ @Override public boolean isDirectory() {
+ return status.isDirectory();
+ }
+
+ @Override public int blockSize() {
+ return (int)status.getBlockSize();
+ }
+
+ @Override public long groupBlockSize() {
+ return status.getBlockSize();
+ }
+
+ @Override public long accessTime() {
+ return status.getAccessTime();
+ }
+
+ @Override public long modificationTime() {
+ return status.getModificationTime();
+ }
+
+ @Override public String property(String name) throws IllegalArgumentException {
+ String val = props.get(name);
+
+ if (val == null)
+ throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
+
+ return val;
+ }
+
+ @Nullable @Override public String property(String name, @Nullable String dfltVal) {
+ String val = props.get(name);
+
+ return val == null ? dfltVal : val;
+ }
+
+ @Override public long length() {
+ return status.getLen();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, String> properties() {
+ return props;
+ }
+ };
+
+ }
+ catch (FileNotFoundException ignore) {
+ return null;
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long usedSpaceSize() {
+ try {
+ return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<String, String> properties() {
+ return props;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ try {
+ fileSys.close();
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6dbf2953/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v1/IgniteHadoopFileSystem.java
new file mode 100644
index 0000000..6cc0cbb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -0,0 +1,1254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite_new.hadoop.fs.v1;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.hadoop.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.IgniteFs.*;
+import static org.apache.ignite.configuration.IgfsConfiguration.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
+import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
+import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
+
+/**
+ * {@code IGFS} Hadoop 1.x file system driver over file system API. To use
+ * {@code IGFS} as Hadoop file system, you should configure this class
+ * in Hadoop's {@code core-site.xml} as follows:
+ * <pre name="code" class="xml">
+ * <property>
+ * <name>fs.default.name</name>
+ * <value>igfs://ipc</value>
+ * </property>
+ *
+ * <property>
+ * <name>fs.igfs.impl</name>
+ * <value>org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem</value>
+ * </property>
+ * </pre>
+ * You should also add Ignite JAR and all libraries to Hadoop classpath. To
+ * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
+ * distribution:
+ * <pre name="code" class="bash">
+ * export IGNITE_HOME=/path/to/Ignite/distribution
+ * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
+ *
+ * for f in $IGNITE_HOME/libs/*.jar; do
+ * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
+ * done
+ * </pre>
+ * <h1 class="header">Data vs Clients Nodes</h1>
+ * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on
+ * data nodes. Client nodes are responsible for basic file system operations as well as
+ * accessing data nodes remotely. Usually, client nodes are started together
+ * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually
+ * started together with Hadoop {@code task-tracker} processes.
+ * <p>
+ * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml}
+ * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation.
+ */
+public class IgniteHadoopFileSystem extends FileSystem {
+ /** Internal property to indicate management connection. */
+ public static final String IGFS_MANAGEMENT = "fs.igfs.management.connection";
+
+ /** Empty array of file block locations. */
+ private static final BlockLocation[] EMPTY_BLOCK_LOCATIONS = new BlockLocation[0];
+
+ /** Empty array of file statuses. */
+ public static final FileStatus[] EMPTY_FILE_STATUS = new FileStatus[0];
+
+ /** Ensures that close routine is invoked at most once. */
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Grid remote client. */
+ private IgfsHadoopWrapper rmtClient;
+
+ /** User name for each thread. */
+ private final ThreadLocal<String> userName = new ThreadLocal<String>(){
+ /** {@inheritDoc} */
+ @Override protected String initialValue() {
+ return DFLT_USER_NAME;
+ }
+ };
+
+ /** Working directory for each thread. */
+ private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
+ /** {@inheritDoc} */
+ @Override protected Path initialValue() {
+ return getHomeDirectory();
+ }
+ };
+
+ /** Default replication factor. */
+ private short dfltReplication;
+
+ /** Base file system uri. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private URI uri;
+
+ /** Authority. */
+ private String uriAuthority;
+
+ /** Client logger. */
+ private IgfsLogger clientLog;
+
+ /** Secondary URI string. */
+ private URI secondaryUri;
+
+ /** IGFS mode resolver. */
+ private IgfsModeResolver modeRslvr;
+
+ /** Secondary file system instance. */
+ private FileSystem secondaryFs;
+
+ /** Management connection flag. */
+ private boolean mgmt;
+
+ /** Whether custom sequential reads before prefetch value is provided. */
+ private boolean seqReadsBeforePrefetchOverride;
+
+ /** IGFS group block size. */
+ private long igfsGrpBlockSize;
+
+ /** Flag that controls whether file writes should be colocated. */
+ private boolean colocateFileWrites;
+
+ /** Prefer local writes. */
+ private boolean preferLocFileWrites;
+
+ /** Custom-provided sequential reads before prefetch. */
+ private int seqReadsBeforePrefetch;
+
+ /** The cache was disabled when the instance was creating. */
+ private boolean cacheEnabled;
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ if (uri == null)
+ throw new IllegalStateException("URI is null (was IgfsHadoopFileSystem properly initialized?).");
+
+ return uri;
+ }
+
+ /**
+ * Enter busy state.
+ *
+ * @throws java.io.IOException If file system is stopped.
+ */
+ private void enterBusy() throws IOException {
+ if (closeGuard.get())
+ throw new IOException("File system is stopped.");
+ }
+
+ /**
+ * Leave busy state.
+ */
+ private void leaveBusy() {
+ // No-op.
+ }
+
+ /**
+ * Public setter that can be used by direct users of FS or Visor.
+ *
+ * @param colocateFileWrites Whether all ongoing file writes should be colocated.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void colocateFileWrites(boolean colocateFileWrites) {
+ this.colocateFileWrites = colocateFileWrites;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(URI name, Configuration cfg) throws IOException {
+ enterBusy();
+
+ try {
+ if (rmtClient != null)
+ throw new IOException("File system is already initialized: " + rmtClient);
+
+ A.notNull(name, "name");
+ A.notNull(cfg, "cfg");
+
+ super.initialize(name, cfg);
+
+ setConf(cfg);
+
+ String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
+
+ cacheEnabled = !cfg.getBoolean(disableCacheName, false);
+
+ mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
+
+ if (!IGFS_SCHEME.equals(name.getScheme()))
+ throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME +
+ "://[name]/[optional_path], actual=" + name + ']');
+
+ uri = name;
+
+ uriAuthority = uri.getAuthority();
+
+ setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+
+ // Override sequential reads before prefetch if needed.
+ seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
+
+ if (seqReadsBeforePrefetch > 0)
+ seqReadsBeforePrefetchOverride = true;
+
+ // In Ignite replication factor is controlled by data cache affinity.
+ // We use replication factor to force the whole file to be stored on local node.
+ dfltReplication = (short)cfg.getInt("dfs.replication", 3);
+
+ // Get file colocation control flag.
+ colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false);
+ preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
+
+ // Get log directory.
+ String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR);
+
+ File logDirFile = U.resolveIgnitePath(logDirCfg);
+
+ String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
+
+ rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG);
+
+ // Handshake.
+ IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
+
+ igfsGrpBlockSize = handshake.blockSize();
+
+ IgfsPaths paths = handshake.secondaryPaths();
+
+ // Initialize client logger.
+ Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
+
+ if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
+ // Initiate client logger.
+ if (logDir == null)
+ throw new IOException("Failed to resolve log directory: " + logDirCfg);
+
+ Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
+
+ clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize);
+ }
+ else
+ clientLog = IgfsLogger.disabledLogger();
+
+ modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
+
+ boolean initSecondary = paths.defaultMode() == PROXY;
+
+ if (paths.pathModes() != null && !paths.pathModes().isEmpty()) {
+ for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
+ IgfsMode mode = pathMode.getValue();
+
+ initSecondary |= mode == PROXY;
+ }
+ }
+
+ if (initSecondary) {
+ Map<String, String> props = paths.properties();
+
+ String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI);
+ String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH);
+
+ if (secConfPath == null)
+ throw new IOException("Failed to connect to the secondary file system because configuration " +
+ "path is not provided.");
+
+ if (secUri == null)
+ throw new IOException("Failed to connect to the secondary file system because URI is not " +
+ "provided.");
+
+ try {
+ secondaryUri = new URI(secUri);
+
+ URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath);
+
+ Configuration conf = new Configuration();
+
+ if (secondaryCfgUrl != null)
+ conf.addResource(secondaryCfgUrl);
+
+ String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme());
+
+ conf.setBoolean(prop, true);
+
+ secondaryFs = FileSystem.get(secondaryUri, conf);
+ }
+ catch (URISyntaxException ignore) {
+ if (!mgmt)
+ throw new IOException("Failed to resolve secondary file system URI: " + secUri);
+ else
+ LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " +
+ "will have no effect).");
+ }
+ catch (IOException e) {
+ if (!mgmt)
+ throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
+ else
+ LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " +
+ "will have no effect): " + e.getMessage());
+ }
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkPath(Path path) {
+ URI uri = path.toUri();
+
+ if (uri.isAbsolute()) {
+ if (!F.eq(uri.getScheme(), IGFS_SCHEME))
+ throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" +
+ uri.getAuthority() + ']');
+
+ if (!F.eq(uri.getAuthority(), uriAuthority))
+ throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" +
+ uri.getAuthority() + ']');
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public short getDefaultReplication() {
+ return dfltReplication;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void finalize() throws Throwable {
+ super.finalize();
+
+ close0();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (cacheEnabled && get(getUri(), getConf()) == this)
+ return;
+
+ close0();
+ }
+
+ /**
+ * Closes file system.
+ *
+ * @throws java.io.IOException If failed.
+ */
+ private void close0() throws IOException {
+ if (closeGuard.compareAndSet(false, true)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+
+ if (rmtClient == null)
+ return;
+
+ super.close();
+
+ rmtClient.close(false);
+
+ if (clientLog.isLogEnabled())
+ clientLog.close();
+
+ if (secondaryFs != null)
+ U.closeQuiet(secondaryFs);
+
+ // Reset initialized resources.
+ uri = null;
+ rmtClient = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTimes(Path p, long mtime, long atime) throws IOException {
+ enterBusy();
+
+ try {
+ A.notNull(p, "p");
+
+ if (mode(p) == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ // No-op for management connection.
+ return;
+ }
+
+ secondaryFs.setTimes(toSecondary(p), mtime, atime);
+ }
+ else {
+ IgfsPath path = convert(p);
+
+ rmtClient.setTimes(path, atime, mtime);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setPermission(Path p, FsPermission perm) throws IOException {
+ enterBusy();
+
+ try {
+ A.notNull(p, "p");
+
+ if (mode(p) == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ // No-op for management connection.
+ return;
+ }
+
+ secondaryFs.setPermission(toSecondary(p), perm);
+ }
+ else if (rmtClient.update(convert(p), permission(perm)) == null) {
+ throw new IOException("Failed to set file permission (file not found?)" +
+ " [path=" + p + ", perm=" + perm + ']');
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setOwner(Path p, String username, String grpName) throws IOException {
+ A.notNull(p, "p");
+ A.notNull(username, "username");
+ A.notNull(grpName, "grpName");
+
+ enterBusy();
+
+ try {
+ if (mode(p) == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ // No-op for management connection.
+ return;
+ }
+
+ secondaryFs.setOwner(toSecondary(p), username, grpName);
+ }
+ else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null)
+ throw new IOException("Failed to set file permission (file not found?)" +
+ " [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']');
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufSize) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = mode(path);
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ throw new IOException("Failed to open file (secondary file system is not initialized): " + f);
+ }
+
+ FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
+
+ if (clientLog.isLogEnabled()) {
+ // At this point we do not know file size, so we perform additional request to remote FS to get it.
+ FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
+
+ long size = status != null ? status.getLen() : -1;
+
+ long logId = IgfsLogger.nextId();
+
+ clientLog.logOpen(logId, path, PROXY, bufSize, size);
+
+ return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId));
+ }
+ else
+ return is;
+ }
+ else {
+ IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ?
+ rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
+
+ long logId = -1;
+
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
+
+ clientLog.logOpen(logId, path, mode, bufSize, stream.length());
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
+ ", bufSize=" + bufSize + ']');
+
+ IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(),
+ bufSize, LOG, clientLog, logId);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
+
+ return new FSDataInputStream(igfsIn);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public FSDataOutputStream create(Path f, FsPermission perm, boolean overwrite, int bufSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ OutputStream out = null;
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = mode(path);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
+ path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ throw new IOException("Failed to create file (secondary file system is not initialized): " + f);
+ }
+
+ FSDataOutputStream os =
+ secondaryFs.create(toSecondary(f), perm, overwrite, bufSize, replication, blockSize, progress);
+
+ if (clientLog.isLogEnabled()) {
+ long logId = IgfsLogger.nextId();
+
+ clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
+
+ return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId));
+ }
+ else
+ return os;
+ }
+ else {
+ // Create stream and close it in the 'finally' section if any sequential operation failed.
+ IgfsHadoopStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites,
+ replication, blockSize, F.asMap(PROP_PERMISSION, toString(perm),
+ PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)));
+
+ assert stream != null;
+
+ long logId = -1;
+
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
+
+ clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
+
+ IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, clientLog,
+ logId);
+
+ bufSize = Math.max(64 * 1024, bufSize);
+
+ out = new BufferedOutputStream(igfsOut, bufSize);
+
+ FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
+
+ // Mark stream created successfully.
+ out = null;
+
+ return res;
+ }
+ }
+ finally {
+ // Close if failed during stream creation.
+ if (out != null)
+ U.closeQuiet(out);
+
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = mode(path);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opening output stream in append [thread=" + Thread.currentThread().getName() +
+ ", path=" + path + ", bufSize=" + bufSize + ']');
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ throw new IOException("Failed to append file (secondary file system is not initialized): " + f);
+ }
+
+ FSDataOutputStream os = secondaryFs.append(toSecondary(f), bufSize, progress);
+
+ if (clientLog.isLogEnabled()) {
+ long logId = IgfsLogger.nextId();
+
+ clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
+
+ return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId));
+ }
+ else
+ return os;
+ }
+ else {
+ IgfsHadoopStreamDelegate stream = rmtClient.append(path, false, null);
+
+ assert stream != null;
+
+ long logId = -1;
+
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
+
+ clientLog.logAppend(logId, path, mode, bufSize);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
+
+ IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, clientLog,
+ logId);
+
+ bufSize = Math.max(64 * 1024, bufSize);
+
+ BufferedOutputStream out = new BufferedOutputStream(igfsOut, bufSize);
+
+ return new FSDataOutputStream(out, null, 0);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rename(Path src, Path dst) throws IOException {
+ A.notNull(src, "src");
+ A.notNull(dst, "dst");
+
+ enterBusy();
+
+ try {
+ IgfsPath srcPath = convert(src);
+ IgfsPath dstPath = convert(dst);
+ IgfsMode mode = mode(srcPath);
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ return false;
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRename(srcPath, PROXY, dstPath);
+
+ return secondaryFs.rename(toSecondary(src), toSecondary(dst));
+ }
+ else {
+ // Will throw exception if failed.
+ rmtClient.rename(srcPath, dstPath);
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRename(srcPath, mode, dstPath);
+
+ return true;
+ }
+ }
+ catch (IOException e) {
+ // Intentionally ignore IGFS exceptions here to follow Hadoop contract.
+ if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null ||
+ !X.hasCause(e.getCause(), IgfsException.class)))
+ throw e;
+ else
+ return false;
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean delete(Path f) throws IOException {
+ return delete(f, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = mode(path);
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ return false;
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logDelete(path, PROXY, recursive);
+
+ return secondaryFs.delete(toSecondary(f), recursive);
+ }
+ else {
+ // Will throw exception if delete failed.
+ boolean res = rmtClient.delete(path, recursive);
+
+ if (clientLog.isLogEnabled())
+ clientLog.logDelete(path, mode, recursive);
+
+ return res;
+ }
+ }
+ catch (IOException e) {
+ // Intentionally ignore IGFS exceptions here to follow Hadoop contract.
+ if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null ||
+ !X.hasCause(e.getCause(), IgfsException.class)))
+ throw e;
+ else
+ return false;
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = mode(path);
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ return EMPTY_FILE_STATUS;
+ }
+
+ FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
+
+ if (arr == null)
+ throw new FileNotFoundException("File " + f + " does not exist.");
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = toPrimary(arr[i]);
+
+ if (clientLog.isLogEnabled()) {
+ String[] fileArr = new String[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ fileArr[i] = arr[i].getPath().toString();
+
+ clientLog.logListDirectory(path, PROXY, fileArr);
+ }
+
+ return arr;
+ }
+ else {
+ Collection<IgfsFile> list = rmtClient.listFiles(path);
+
+ if (list == null)
+ throw new FileNotFoundException("File " + f + " does not exist.");
+
+ List<IgfsFile> files = new ArrayList<>(list);
+
+ FileStatus[] arr = new FileStatus[files.size()];
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = convert(files.get(i));
+
+ if (clientLog.isLogEnabled()) {
+ String[] fileArr = new String[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ fileArr[i] = arr[i].getPath().toString();
+
+ clientLog.logListDirectory(path, mode, fileArr);
+ }
+
+ return arr;
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getHomeDirectory() {
+ Path path = new Path("/user/" + userName.get());
+
+ return path.makeQualified(getUri(), null);
+ }
+
+ /**
+ * Set user name and default working directory for current thread.
+ *
+ * @param userName User name.
+ */
+ public void setUser(String userName) {
+ this.userName.set(userName);
+
+ setWorkingDirectory(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path newPath) {
+ if (newPath == null) {
+ Path homeDir = getHomeDirectory();
+
+ if (secondaryFs != null)
+ secondaryFs.setWorkingDirectory(toSecondary(homeDir));
+
+ workingDir.set(homeDir);
+ }
+ else {
+ Path fixedNewPath = fixRelativePart(newPath);
+
+ String res = fixedNewPath.toUri().getPath();
+
+ if (!DFSUtil.isValidName(res))
+ throw new IllegalArgumentException("Invalid DFS directory name " + res);
+
+ if (secondaryFs != null)
+ secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
+
+ workingDir.set(fixedNewPath);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return workingDir.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mkdirs(Path f, FsPermission perm) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = mode(path);
+
+ if (mode == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ return false;
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logMakeDirectory(path, PROXY);
+
+ return secondaryFs.mkdirs(toSecondary(f), perm);
+ }
+ else {
+ boolean mkdirRes = rmtClient.mkdirs(path, permission(perm));
+
+ if (clientLog.isLogEnabled())
+ clientLog.logMakeDirectory(path, mode);
+
+ return mkdirRes;
+ }
+ }
+ catch (IOException e) {
+ // Intentionally ignore IGFS exceptions here to follow Hadoop contract.
+ if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null ||
+ !X.hasCause(e.getCause(), IgfsException.class)))
+ throw e;
+ else
+ return false;
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ if (mode(f) == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ throw new IOException("Failed to get file status (secondary file system is not initialized): " + f);
+ }
+
+ return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
+ }
+ else {
+ IgfsFile info = rmtClient.info(convert(f));
+
+ if (info == null)
+ throw new FileNotFoundException("File not found: " + f);
+
+ return convert(info);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ContentSummary getContentSummary(Path f) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ if (mode(f) == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ throw new IOException("Failed to get content summary (secondary file system is not initialized): " +
+ f);
+ }
+
+ return secondaryFs.getContentSummary(toSecondary(f));
+ }
+ else {
+ IgfsPathSummary sum = rmtClient.contentSummary(convert(f));
+
+ return new ContentSummary(sum.totalLength(), sum.filesCount(), sum.directoriesCount(),
+ -1, sum.totalLength(), rmtClient.fsStatus().spaceTotal());
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public BlockLocation[] getFileBlockLocations(FileStatus status, long start, long len) throws IOException {
+ A.notNull(status, "status");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(status.getPath());
+
+ if (mode(status.getPath()) == PROXY) {
+ if (secondaryFs == null) {
+ assert mgmt;
+
+ return EMPTY_BLOCK_LOCATIONS;
+ }
+
+ Path secPath = toSecondary(status.getPath());
+
+ return secondaryFs.getFileBlockLocations(secondaryFs.getFileStatus(secPath), start, len);
+ }
+ else {
+ long now = System.currentTimeMillis();
+
+ List<IgfsBlockLocation> affinity = new ArrayList<>(rmtClient.affinity(path, start, len));
+
+ BlockLocation[] arr = new BlockLocation[affinity.size()];
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = convert(affinity.get(i));
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
+ (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
+
+ return arr;
+ }
+ }
+ catch (FileNotFoundException ignored) {
+ return EMPTY_BLOCK_LOCATIONS;
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public long getDefaultBlockSize() {
+ return igfsGrpBlockSize;
+ }
+
+ /**
+ * Resolve path mode.
+ *
+ * @param path HDFS path.
+ * @return Path mode.
+ */
+ public IgfsMode mode(Path path) {
+ return mode(convert(path));
+ }
+
+ /**
+ * Resolve path mode.
+ *
+ * @param path IGFS path.
+ * @return Path mode.
+ */
+ public IgfsMode mode(IgfsPath path) {
+ return modeRslvr.resolveMode(path);
+ }
+
+ /**
+ * Convert the given path to path acceptable by the primary file system.
+ *
+ * @param path Path.
+ * @return Primary file system path.
+ */
+ private Path toPrimary(Path path) {
+ return convertPath(path, uri);
+ }
+
+ /**
+ * Convert the given path to path acceptable by the secondary file system.
+ *
+ * @param path Path.
+ * @return Secondary file system path.
+ */
+ private Path toSecondary(Path path) {
+ assert secondaryFs != null;
+ assert secondaryUri != null;
+
+ return convertPath(path, secondaryUri);
+ }
+
+ /**
+ * Convert path using the given new URI.
+ *
+ * @param path Old path.
+ * @param newUri New URI.
+ * @return New path.
+ */
+ private Path convertPath(Path path, URI newUri) {
+ assert newUri != null;
+
+ if (path != null) {
+ URI pathUri = path.toUri();
+
+ try {
+ return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
+ pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
+ }
+ catch (URISyntaxException e) {
+ throw new IgniteException("Failed to construct secondary file system path from the primary file " +
+ "system path: " + path, e);
+ }
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Convert a file status obtained from the secondary file system to a status of the primary file system.
+ *
+ * @param status Secondary file system status.
+ * @return Primary file system status.
+ */
+ @SuppressWarnings("deprecation")
+ private FileStatus toPrimary(FileStatus status) {
+ return status != null ? new FileStatus(status.getLen(), status.isDir(), status.getReplication(),
+ status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
+ status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
+ }
+
+ /**
+ * Convert IGFS path into Hadoop path.
+ *
+ * @param path IGFS path.
+ * @return Hadoop path.
+ */
+ private Path convert(IgfsPath path) {
+ return new Path(IGFS_SCHEME, uriAuthority, path.toString());
+ }
+
+ /**
+ * Convert Hadoop path into IGFS path.
+ *
+ * @param path Hadoop path.
+ * @return IGFS path.
+ */
+ @Nullable private IgfsPath convert(@Nullable Path path) {
+ if (path == null)
+ return null;
+
+ return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
+ new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+ }
+
+ /**
+ * Convert IGFS affinity block location into Hadoop affinity block location.
+ *
+ * @param block IGFS affinity block location.
+ * @return Hadoop affinity block location.
+ */
+ private BlockLocation convert(IgfsBlockLocation block) {
+ Collection<String> names = block.names();
+ Collection<String> hosts = block.hosts();
+
+ return new BlockLocation(
+ names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */,
+ hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */,
+ block.start(), block.length()
+ ) {
+ @Override public String toString() {
+ try {
+ return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() +
+ ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']';
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Convert IGFS file information into Hadoop file status.
+ *
+ * @param file IGFS file information.
+ * @return Hadoop file status.
+ */
+ @SuppressWarnings("deprecation")
+ private FileStatus convert(IgfsFile file) {
+ return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(),
+ file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file),
+ file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"),
+ convert(file.path())) {
+ @Override public String toString() {
+ return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
+ ", mtime=" + getModificationTime() + ", atime=" + getAccessTime() + ']';
+ }
+ };
+ }
+
+ /**
+ * Convert Hadoop permission into IGFS file attribute.
+ *
+ * @param perm Hadoop permission.
+ * @return IGFS attributes.
+ */
+ private Map<String, String> permission(FsPermission perm) {
+ if (perm == null)
+ perm = FsPermission.getDefault();
+
+ return F.asMap(PROP_PERMISSION, toString(perm));
+ }
+
+ /**
+ * @param perm Permission.
+ * @return String.
+ */
+ private static String toString(FsPermission perm) {
+ return String.format("%04o", perm.toShort());
+ }
+
+ /**
+ * Convert IGFS file attributes into Hadoop permission.
+ *
+ * @param file File info.
+ * @return Hadoop permission.
+ */
+ private FsPermission permission(IgfsFile file) {
+ String perm = file.property(PROP_PERMISSION, null);
+
+ if (perm == null)
+ return FsPermission.getDefault();
+
+ try {
+ return new FsPermission((short)Integer.parseInt(perm, 8));
+ }
+ catch (NumberFormatException ignore) {
+ return FsPermission.getDefault();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteHadoopFileSystem.class, this);
+ }
+}