You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 15:26:27 UTC
[37/68] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java
new file mode 100644
index 0000000..b583a1d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.fs;
+
+/**
+ * This class lists parameters that can be specified in Hadoop configuration.
+ * Hadoop configuration can be specified in {@code core-site.xml} file
+ * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
+ * <ul>
+ * <li>
+ * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
+ * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()}
+ * IGFS data node configuration property.
+ * </li>
+ * <li>
+ * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
+ * {@code true}, then all file system operations will be logged to a file.
+ * </li>
+ * <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
+ * <li>
+ * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
+ * it gets flushed to log file. Higher values will imply greater performance, but will increase delay
+ * before record appears in the log file.
+ * </li>
+ * <li>
+ * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
+ * node to which client is connected. If {@code true}, file will not be distributed and will be written
+ * to a single data node. Default value is {@code true}.
+ * </li>
+ * <li>
+ * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
+ * local data node if it has enough free space. After some time it can be redistributed across nodes though.
+ * </li>
+ * </ul>
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
+ * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
+ * <p>
+ * Sample configuration that can be placed to {@code core-site.xml} file:
+ * <pre name="code" class="xml">
+ * <property>
+ * <name>fs.igfs.127.0.0.1:10500.log.enabled</name>
+ * <value>true</value>
+ * </property>
+ * <property>
+ * <name>fs.igfs.127.0.0.1:10500.log.dir</name>
+ * <value>/home/apache/ignite/log/sampling</value>
+ * </property>
+ * <property>
+ * <name>fs.igfs.127.0.0.1:10500.log.batch_size</name>
+ * <value>16</value>
+ * </property>
+ * </pre>
+ * Parameters could also be specified per mapreduce job, e.g.
+ * <pre name="code" class="bash">
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * </pre>
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
+ * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
+ */
+public class HadoopParameters {
+ /** Parameter name for control over file colocation write mode. */
+ public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
+
+ /** Parameter name for custom sequential reads before prefetch value. */
+ public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
+ "fs.igfs.%s.open.sequential_reads_before_prefetch";
+
+ /** Parameter name for client logger directory. */
+ public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
+
+ /** Parameter name for log batch size. */
+ public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
+
+ /** Parameter name for log enabled flag. */
+ public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
+
+ /** Parameter name for prefer local writes flag. */
+ public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
new file mode 100644
index 0000000..89c5938
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.nio.file.Files;
+
+/**
+ * Local file system implementation for Hadoop.
+ */
+public class HadoopRawLocalFileSystem extends FileSystem {
+ /** Working directory for each thread. */
+ private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
+ @Override protected Path initialValue() {
+ return getInitialWorkingDirectory();
+ }
+ };
+
+ /**
+ * Converts Hadoop path to local path.
+ *
+ * @param path Hadoop path.
+ * @return Local path.
+ */
+ File convert(Path path) {
+ checkPath(path);
+
+ if (path.isAbsolute())
+ return new File(path.toUri().getPath());
+
+ return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getHomeDirectory() {
+ return makeQualified(new Path(System.getProperty("user.home")));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getInitialWorkingDirectory() {
+ File f = new File(System.getProperty("user.dir"));
+
+ return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+
+ setConf(conf);
+
+ String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
+
+ if (initWorkDir != null)
+ setWorkingDirectory(new Path(initWorkDir));
+ }
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ return FsConstants.LOCAL_FS_URI;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return new FSDataInputStream(new InStream(checkExists(convert(f))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ File file = convert(f);
+
+ if (!overwrite && !file.createNewFile())
+ throw new IOException("Failed to create new file: " + f.toUri());
+
+ return out(file, false, bufSize);
+ }
+
+ /**
+ * @param file File.
+ * @param append Append flag.
+ * @return Output stream.
+ * @throws IOException If failed.
+ */
+ private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
+ return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
+ bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
+ return out(convert(f), true, bufSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rename(Path src, Path dst) throws IOException {
+ return convert(src).renameTo(convert(dst));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ File file = convert(f);
+
+ if (file.isDirectory() && !recursive)
+ throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
+
+ return U.delete(file);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path dir) {
+ workDir.set(fixRelativePart(dir));
+
+ checkPath(dir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return workDir.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ if(f == null)
+ throw new IllegalArgumentException("mkdirs path arg is null");
+
+ Path parent = f.getParent();
+
+ File p2f = convert(f);
+
+ if(parent != null) {
+ File parent2f = convert(parent);
+
+ if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
+ throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
+
+ }
+
+ return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ return fileStatus(checkExists(convert(f)));
+ }
+
+ /**
+ * @return File status.
+ */
+ private FileStatus fileStatus(File file) throws IOException {
+ boolean dir = file.isDirectory();
+
+ java.nio.file.Path path = dir ? null : file.toPath();
+
+ return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
+ /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
+ new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
+ }
+
+ /**
+ * @param file File.
+ * @return Same file.
+ * @throws FileNotFoundException If does not exist.
+ */
+ private static File checkExists(File file) throws FileNotFoundException {
+ if (!file.exists())
+ throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
+
+ return file;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws IOException {
+ File file = convert(f);
+
+ if (checkExists(file).isFile())
+ return new FileStatus[] {fileStatus(file)};
+
+ File[] files = file.listFiles();
+
+ FileStatus[] res = new FileStatus[files.length];
+
+ for (int i = 0; i < res.length; i++)
+ res[i] = fileStatus(files[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsSymlinks() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
+ Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
+ return getFileStatus(getLinkTarget(f));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getLinkTarget(Path f) throws IOException {
+ File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
+
+ return new Path(file.toURI());
+ }
+
+ /**
+ * Input stream.
+ */
+ private static class InStream extends InputStream implements Seekable, PositionedReadable {
+ /** */
+ private final RandomAccessFile file;
+
+ /**
+ * @param f File.
+ * @throws IOException If failed.
+ */
+ public InStream(File f) throws IOException {
+ file = new RandomAccessFile(f, "r");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read() throws IOException {
+ return file.read();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+ return file.read(b, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ file.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+ long pos0 = file.getFilePointer();
+
+ file.seek(pos);
+ int res = file.read(buf, off, len);
+
+ file.seek(pos0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+ if (read(pos, buf, off, len) != len)
+ throw new IOException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf) throws IOException {
+ readFully(pos, buf, 0, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void seek(long pos) throws IOException {
+ file.seek(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long getPos() throws IOException {
+ return file.getFilePointer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
new file mode 100644
index 0000000..8bb904f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Facade for communication with grid.
+ */
+public interface HadoopIgfs {
+ /**
+ * Perform handshake.
+ *
+ * @param logDir Log directory.
+ * @return Future with handshake result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
+
+ /**
+ * Close connection.
+ *
+ * @param force Force flag.
+ */
+ public void close(boolean force);
+
+ /**
+ * Command to retrieve file info for some IGFS path.
+ *
+ * @param path Path to get file info for.
+ * @return Future for info operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to update file properties.
+ *
+ * @param path IGFS path to update properties.
+ * @param props Properties to update.
+ * @return Future for update operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+ /**
+ * Sets last access time and last modification time for a file.
+ *
+ * @param path Path to update times.
+ * @param accessTime Last access time to set.
+ * @param modificationTime Last modification time to set.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
+ IOException;
+
+ /**
+ * Command to rename given path.
+ *
+ * @param src Source path.
+ * @param dest Destination path.
+ * @return Future for rename operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to delete given path.
+ *
+ * @param path Path to delete.
+ * @param recursive {@code True} if deletion is recursive.
+ * @return Future for delete operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to get affinity for given path, offset and length.
+ *
+ * @param path Path to get affinity for.
+ * @param start Start position (offset).
+ * @param len Data length.
+ * @return Future for affinity command.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
+ IOException;
+
+ /**
+ * Gets path summary.
+ *
+ * @param path Path to get summary for.
+ * @return Future that will be completed when summary is received.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to create directories.
+ *
+ * @param path Path to create.
+ * @return Future for mkdirs operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to get list of files in directory.
+ *
+ * @param path Path to list.
+ * @return Future for listFiles operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to get directory listing.
+ *
+ * @param path Path to list.
+ * @return Future for listPaths operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Performs status request.
+ *
+ * @return Status response.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to open file for reading.
+ *
+ * @param path File path to open.
+ * @return Future for open operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to open file for reading.
+ *
+ * @param path File path to open.
+ * @return Future for open operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
+ IOException;
+
+ /**
+ * Command to create file and open it for output.
+ *
+ * @param path Path to file.
+ * @param overwrite If {@code true} then old file contents will be lost.
+ * @param colocate If {@code true} and called on data node, file will be written on that node.
+ * @param replication Replication factor.
+ * @param props File properties for creation.
+ * @return Stream descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+ int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+
+ /**
+ * Open file for output appending data to the end of a file.
+ *
+ * @param path Path to file.
+ * @param create If {@code true}, file will be created if does not exist.
+ * @param props File properties.
+ * @return Stream descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+ @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
new file mode 100644
index 0000000..ddfe35b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Communication exception indicating a problem between file system and IGFS instance.
+ */
+public class HadoopIgfsCommunicationException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates new exception with given throwable as a nested cause and
+ * source of error message.
+ *
+ * @param cause Non-null throwable cause.
+ */
+ public HadoopIgfsCommunicationException(Exception cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new exception with given error message and optional nested cause exception.
+ *
+ * @param msg Error message.
+ */
+ public HadoopIgfsCommunicationException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new exception with given error message and optional nested cause exception.
+ *
+ * @param msg Error message.
+ * @param cause Cause.
+ */
+ public HadoopIgfsCommunicationException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
new file mode 100644
index 0000000..2294134
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Extended IGFS server interface.
+ */
+public interface HadoopIgfsEx extends HadoopIgfs {
+ /**
+ * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
+ * If connection is closed already, callback will be invoked synchronously inside this method.
+ *
+ * @param delegate Stream delegate.
+ * @param lsnr Event listener.
+ */
+ public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr);
+
+ /**
+ * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
+ *
+ * @param delegate Stream delegate.
+ */
+ public void removeEventListener(HadoopIgfsStreamDelegate delegate);
+
+ /**
+ * Asynchronously reads specified amount of bytes from opened input stream.
+ *
+ * @param delegate Stream delegate.
+ * @param pos Position to read from.
+ * @param len Data length to read.
+ * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
+ * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
+ * be the result of read future.
+ * @param outOff Output offset.
+ * @param outLen Output length.
+ * @return Read data.
+ */
+ public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
+ @Nullable final byte[] outBuf, final int outOff, final int outLen);
+
+ /**
+ * Writes data to the stream with given streamId. This method does not return any future since
+ * no response to write request is sent.
+ *
+ * @param delegate Stream delegate.
+ * @param data Data to write.
+ * @param off Offset.
+ * @param len Length.
+ * @throws IOException If failed.
+ */
+ public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
+
+ /**
+ * Close server stream.
+ *
+ * @param delegate Stream delegate.
+ * @throws IOException If failed.
+ */
+ public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+ /**
+ * Flush output stream.
+ *
+ * @param delegate Stream delegate.
+ * @throws IOException If failed.
+ */
+ public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+ /**
+ * The user this Igfs instance works on behalf of.
+ * @return the user name.
+ */
+ public String user();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
new file mode 100644
index 0000000..cfdd792
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS client future that holds response parse closure.
+ */
+public class HadoopIgfsFuture<T> extends GridFutureAdapter<T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Output buffer. */
+ private byte[] outBuf;
+
+ /** Output offset. */
+ private int outOff;
+
+ /** Output length. */
+ private int outLen;
+
+ /** Read future flag. */
+ private boolean read;
+
+ /**
+ * @return Output buffer.
+ */
+ public byte[] outputBuffer() {
+ return outBuf;
+ }
+
+ /**
+ * @param outBuf Output buffer.
+ */
+ public void outputBuffer(@Nullable byte[] outBuf) {
+ this.outBuf = outBuf;
+ }
+
+ /**
+ * @return Offset in output buffer to write from.
+ */
+ public int outputOffset() {
+ return outOff;
+ }
+
+ /**
+ * @param outOff Offset in output buffer to write from.
+ */
+ public void outputOffset(int outOff) {
+ this.outOff = outOff;
+ }
+
+ /**
+ * @return Length to write to output buffer.
+ */
+ public int outputLength() {
+ return outLen;
+ }
+
+ /**
+ * @param outLen Length to write to output buffer.
+ */
+ public void outputLength(int outLen) {
+ this.outLen = outLen;
+ }
+
+ /**
+ * @param read {@code True} if this is a read future.
+ */
+ public void read(boolean read) {
+ this.read = read;
+ }
+
+ /**
+ * @return {@code True} if this is a read future.
+ */
+ public boolean read() {
+ return read;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
new file mode 100644
index 0000000..8bdcc83
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsInputStream;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.igfs.IgfsUserContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Communication with grid in the same process.
+ */
+public class HadoopIgfsInProc implements HadoopIgfsEx {
+ /** Target IGFS. */
+ private final IgfsEx igfs;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** Event listeners. */
+ private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> lsnrs =
+ new ConcurrentHashMap<>();
+
+ /** Logger. */
+ private final Log log;
+
+ /** The user this Igfs works on behalf of. */
+ private final String user;
+
+ /**
+ * Constructor.
+ *
+ * @param igfs Target IGFS.
+ * @param log Log.
+ */
+ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
+ this.user = IgfsUtils.fixUserName(userName);
+
+ this.igfs = igfs;
+
+ this.log = log;
+
+ bufSize = igfs.configuration().getBlockSize() * 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsHandshakeResponse handshake(final String logDir) {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
+ @Override public IgfsHandshakeResponse apply() {
+ igfs.clientLogDirectory(logDir);
+
+ return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
+ igfs.globalSampling());
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(boolean force) {
+ // Perform cleanup.
+ for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
+ try {
+ lsnr.onClose();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to notify stream event listener", e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+ @Override public IgfsFile apply() {
+ return igfs.info(path);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+ @Override public IgfsFile apply() {
+ return igfs.update(path, props);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
+ try {
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.setTimes(path, accessTime, modificationTime);
+
+ return null;
+ }
+ });
+
+ return true;
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " +
+ path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
+ try {
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.rename(src, dest);
+
+ return null;
+ }
+ });
+
+ return true;
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+ @Override public Boolean apply() {
+ return igfs.delete(path, recursive);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+ @Override public IgfsStatus call() throws IgniteCheckedException {
+ return igfs.globalSpace();
+ }
+ });
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
+ "stopping.");
+ }
+ catch (IgniteCheckedException | RuntimeException | Error e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new AssertionError("Must never go there.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
+ @Override public Collection<IgfsPath> apply() {
+ return igfs.listPaths(path);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
+ @Override public Collection<IgfsFile> apply() {
+ return igfs.listFiles(path);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
+ try {
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.mkdirs(path, props);
+
+ return null;
+ }
+ });
+
+ return true;
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " +
+ path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
+ @Override public IgfsPathSummary apply() {
+ return igfs.summary(path);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " +
+ path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
+ throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+ @Override public Collection<IgfsBlockLocation> apply() {
+ return igfs.affinity(path, start, len);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsInputStream stream = igfs.open(path, bufSize);
+
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
+ throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
+ final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+ colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
+
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+ final @Nullable Map<String, String> props) throws IgniteCheckedException {
+ try {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
+
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+ }
+ });
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (IllegalStateException e) {
+ throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
+ @Nullable byte[] outBuf, int outOff, int outLen) {
+ IgfsInputStream stream = delegate.target();
+
+ try {
+ byte[] res = null;
+
+ if (outBuf != null) {
+ int outTailLen = outBuf.length - outOff;
+
+ if (len <= outTailLen)
+ stream.readFully(pos, outBuf, outOff, len);
+ else {
+ stream.readFully(pos, outBuf, outOff, outTailLen);
+
+ int remainderLen = len - outTailLen;
+
+ res = new byte[remainderLen];
+
+ stream.readFully(pos, res, 0, remainderLen);
+ }
+ } else {
+ res = new byte[len];
+
+ stream.readFully(pos, res, 0, len);
+ }
+
+ return new GridFinishedFuture<>(res);
+ }
+ catch (IllegalStateException | IOException e) {
+ HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+ if (lsnr != null)
+ lsnr.onError(e.getMessage());
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len)
+ throws IOException {
+ try {
+ IgfsOutputStream stream = delegate.target();
+
+ stream.write(data, off, len);
+ }
+ catch (IllegalStateException | IOException e) {
+ HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+ if (lsnr != null)
+ lsnr.onError(e.getMessage());
+
+ if (e instanceof IllegalStateException)
+ throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e);
+ else
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
+ try {
+ IgfsOutputStream stream = delegate.target();
+
+ stream.flush();
+ }
+ catch (IllegalStateException | IOException e) {
+ HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+ if (lsnr != null)
+ lsnr.onError(e.getMessage());
+
+ if (e instanceof IllegalStateException)
+ throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e);
+ else
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
+ Closeable closeable = desc.target();
+
+ try {
+ closeable.close();
+ }
+ catch (IllegalStateException e) {
+ throw new IOException("Failed to close IGFS stream because Grid is stopping.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addEventListener(HadoopIgfsStreamDelegate delegate,
+ HadoopIgfsStreamEventListener lsnr) {
+ HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
+
+ assert lsnr0 == null || lsnr0 == lsnr;
+
+ if (log.isDebugEnabled())
+ log.debug("Added stream event listener [delegate=" + delegate + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) {
+ HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate);
+
+ if (lsnr0 != null && log.isDebugEnabled())
+ log.debug("Removed stream event listener [delegate=" + delegate + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return user;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
new file mode 100644
index 0000000..efc270b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * IGFS input stream wrapper for hadoop interfaces.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable,
+ HadoopIgfsStreamEventListener {
+ /** Minimum buffer size. */
+ private static final int MIN_BUF_SIZE = 4 * 1024;
+
+ /** Server stream delegate. */
+ private HadoopIgfsStreamDelegate delegate;
+
+ /** Stream ID used by logger. */
+ private long logStreamId;
+
+ /** Stream position. */
+ private long pos;
+
+ /** Stream read limit. */
+ private long limit;
+
+ /** Mark position. */
+ private long markPos = -1;
+
+ /** Prefetch buffer. */
+ private DoubleFetchBuffer buf = new DoubleFetchBuffer();
+
+ /** Buffer half size for double-buffering. */
+ private int bufHalfSize;
+
+ /** Closed flag. */
+ private volatile boolean closed;
+
+ /** Flag set if stream was closed due to connection breakage. */
+ private boolean connBroken;
+
+ /** Logger. */
+ private Log log;
+
+ /** Client logger. */
+ private IgfsLogger clientLog;
+
+ /** Read time. */
+ private long readTime;
+
+ /** User time. */
+ private long userTime;
+
+ /** Last timestamp. */
+ private long lastTs;
+
+ /** Amount of read bytes. */
+ private long total;
+
+ /**
+ * Creates input stream.
+ *
+ * @param delegate Server stream delegate.
+ * @param limit Read limit.
+ * @param bufSize Buffer size.
+ * @param log Log.
+ * @param clientLog Client logger.
+ */
+ public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log,
+ IgfsLogger clientLog, long logStreamId) {
+ assert limit >= 0;
+
+ this.delegate = delegate;
+ this.limit = limit;
+ this.log = log;
+ this.clientLog = clientLog;
+ this.logStreamId = logStreamId;
+
+ bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE);
+
+ lastTs = System.nanoTime();
+
+ delegate.hadoop().addEventListener(delegate, this);
+ }
+
+ /**
+ * Read start.
+ */
+ private void readStart() {
+ long now = System.nanoTime();
+
+ userTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /**
+ * Read end.
+ */
+ private void readEnd() {
+ long now = System.nanoTime();
+
+ readTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read() throws IOException {
+ checkClosed();
+
+ readStart();
+
+ try {
+ if (eof())
+ return -1;
+
+ buf.refreshAhead(pos);
+
+ int res = buf.atPosition(pos);
+
+ pos++;
+ total++;
+
+ buf.refreshAhead(pos);
+
+ return res;
+ }
+ catch (IgniteCheckedException e) {
+ throw HadoopIgfsUtils.cast(e);
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
+ checkClosed();
+
+ if (eof())
+ return -1;
+
+ readStart();
+
+ try {
+ long remaining = limit - pos;
+
+ int read = buf.flatten(b, pos, off, len);
+
+ pos += read;
+ total += read;
+ remaining -= read;
+
+ if (remaining > 0 && read != len) {
+ int readAmt = (int)Math.min(remaining, len - read);
+
+ delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get();
+
+ read += readAmt;
+ pos += readAmt;
+ total += readAmt;
+ }
+
+ buf.refreshAhead(pos);
+
+ return read;
+ }
+ catch (IgniteCheckedException e) {
+ throw HadoopIgfsUtils.cast(e);
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long skip(long n) throws IOException {
+ checkClosed();
+
+ if (clientLog.isLogEnabled())
+ clientLog.logSkip(logStreamId, n);
+
+ long oldPos = pos;
+
+ if (pos + n <= limit)
+ pos += n;
+ else
+ pos = limit;
+
+ buf.refreshAhead(pos);
+
+ return pos - oldPos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int available() throws IOException {
+ checkClosed();
+
+ int available = buf.available(pos);
+
+ assert available >= 0;
+
+ return available;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ if (!closed) {
+ readStart();
+
+ if (log.isDebugEnabled())
+ log.debug("Closing input stream: " + delegate);
+
+ delegate.hadoop().closeStream(delegate);
+
+ readEnd();
+
+ if (clientLog.isLogEnabled())
+ clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+
+ markClosed(false);
+
+ if (log.isDebugEnabled())
+ log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime +
+ ", userTime=" + userTime + ']');
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void mark(int readLimit) {
+ markPos = pos;
+
+ if (clientLog.isLogEnabled())
+ clientLog.logMark(logStreamId, readLimit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void reset() throws IOException {
+ checkClosed();
+
+ if (clientLog.isLogEnabled())
+ clientLog.logReset(logStreamId);
+
+ if (markPos == -1)
+ throw new IOException("Stream was not marked.");
+
+ pos = markPos;
+
+ buf.refreshAhead(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean markSupported() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException {
+ long remaining = limit - position;
+
+ int read = (int)Math.min(len, remaining);
+
+ // Return -1 at EOF.
+ if (read == 0)
+ return -1;
+
+ readFully(position, buf, off, read);
+
+ return read;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException {
+ long remaining = limit - position;
+
+ checkClosed();
+
+ if (len > remaining)
+ throw new EOFException("End of stream reached before data was fully read.");
+
+ readStart();
+
+ try {
+ int read = this.buf.flatten(buf, position, off, len);
+
+ total += read;
+
+ if (read != len) {
+ int readAmt = len - read;
+
+ delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get();
+
+ total += readAmt;
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRandomRead(logStreamId, position, len);
+ }
+ catch (IgniteCheckedException e) {
+ throw HadoopIgfsUtils.cast(e);
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long position, byte[] buf) throws IOException {
+ readFully(position, buf, 0, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void seek(long pos) throws IOException {
+ A.ensure(pos >= 0, "position must be non-negative");
+
+ checkClosed();
+
+ if (clientLog.isLogEnabled())
+ clientLog.logSeek(logStreamId, pos);
+
+ if (pos > limit)
+ pos = limit;
+
+ if (log.isDebugEnabled())
+ log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']');
+
+ this.pos = pos;
+
+ buf.refreshAhead(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long getPos() {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized boolean seekToNewSource(long targetPos) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() {
+ markClosed(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(String errMsg) {
+ // No-op.
+ }
+
+ /**
+ * Marks stream as closed.
+ *
+ * @param connBroken {@code True} if connection with server was lost.
+ */
+ private void markClosed(boolean connBroken) {
+ // It is ok to have race here.
+ if (!closed) {
+ closed = true;
+
+ this.connBroken = connBroken;
+
+ delegate.hadoop().removeEventListener(delegate);
+ }
+ }
+
+ /**
+ * @throws IOException If check failed.
+ */
+ private void checkClosed() throws IOException {
+ if (closed) {
+ if (connBroken)
+ throw new IOException("Server connection was lost.");
+ else
+ throw new IOException("Stream is closed.");
+ }
+ }
+
+ /**
+ * @return {@code True} if end of stream reached.
+ */
+ private boolean eof() {
+ return limit == pos;
+ }
+
+ /**
+ * Asynchronous prefetch buffer.
+ */
+ private static class FetchBufferPart {
+ /** Read future. */
+ private IgniteInternalFuture<byte[]> readFut;
+
+ /** Position of cached chunk in file. */
+ private long pos;
+
+ /** Prefetch length. Need to store as read future result might be not available yet. */
+ private int len;
+
+ /**
+ * Creates fetch buffer part.
+ *
+ * @param readFut Read future for this buffer.
+ * @param pos Read position.
+ * @param len Chunk length.
+ */
+ private FetchBufferPart(IgniteInternalFuture<byte[]> readFut, long pos, int len) {
+ this.readFut = readFut;
+ this.pos = pos;
+ this.len = len;
+ }
+
+ /**
+ * Copies cached data if specified position matches cached region.
+ *
+ * @param dst Destination buffer.
+ * @param pos Read position in file.
+ * @param dstOff Offset in destination buffer from which start writing.
+ * @param len Maximum number of bytes to copy.
+ * @return Number of bytes copied.
+ * @throws IgniteCheckedException If read future failed.
+ */
+ public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
+ // If read start position is within cached boundaries.
+ if (contains(pos)) {
+ byte[] data = readFut.get();
+
+ int srcPos = (int)(pos - this.pos);
+ int cpLen = Math.min(len, data.length - srcPos);
+
+ U.arrayCopy(data, srcPos, dst, dstOff, cpLen);
+
+ return cpLen;
+ }
+
+ return 0;
+ }
+
+ /**
+ * @return {@code True} if data is ready to be read.
+ */
+ public boolean ready() {
+ return readFut.isDone();
+ }
+
+ /**
+ * Checks if current buffer part contains given position.
+ *
+ * @param pos Position to check.
+ * @return {@code True} if position matches buffer region.
+ */
+ public boolean contains(long pos) {
+ return this.pos <= pos && this.pos + len > pos;
+ }
+ }
+
+ private class DoubleFetchBuffer {
+ /** */
+ private FetchBufferPart first;
+
+ /** */
+ private FetchBufferPart second;
+
+ /**
+ * Copies fetched data from both buffers to destination array if cached region matched read position.
+ *
+ * @param dst Destination buffer.
+ * @param pos Read position in file.
+ * @param dstOff Destination buffer offset.
+ * @param len Maximum number of bytes to copy.
+ * @return Number of bytes copied.
+ * @throws IgniteCheckedException If any read operation failed.
+ */
+ public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
+ assert dstOff >= 0;
+ assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff +
+ ", len=" + len + ']';
+
+ int bytesCopied = 0;
+
+ if (first != null) {
+ bytesCopied += first.flatten(dst, pos, dstOff, len);
+
+ if (bytesCopied != len && second != null) {
+ assert second.pos == first.pos + first.len;
+
+ bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied);
+ }
+ }
+
+ return bytesCopied;
+ }
+
+ /**
+ * Gets byte at specified position in buffer.
+ *
+ * @param pos Stream position.
+ * @return Read byte.
+ * @throws IgniteCheckedException If read failed.
+ */
+ public int atPosition(long pos) throws IgniteCheckedException {
+ // Should not reach here if stream contains no data.
+ assert first != null;
+
+ if (first.contains(pos)) {
+ byte[] bytes = first.readFut.get();
+
+ return bytes[((int)(pos - first.pos))] & 0xFF;
+ }
+ else {
+ assert second != null;
+ assert second.contains(pos);
+
+ byte[] bytes = second.readFut.get();
+
+ return bytes[((int)(pos - second.pos))] & 0xFF;
+ }
+ }
+
+ /**
+ * Starts asynchronous buffer refresh if needed, depending on current position.
+ *
+ * @param pos Current stream position.
+ */
+ public void refreshAhead(long pos) {
+ if (fullPrefetch(pos)) {
+ first = fetch(pos, bufHalfSize);
+ second = fetch(pos + bufHalfSize, bufHalfSize);
+ }
+ else if (needFlip(pos)) {
+ first = second;
+
+ second = fetch(first.pos + first.len, bufHalfSize);
+ }
+ }
+
+ /**
+ * @param pos Position from which read is expected.
+ * @return Number of bytes available to be read without blocking.
+ */
+ public int available(long pos) {
+ int available = 0;
+
+ if (first != null) {
+ if (first.contains(pos)) {
+ if (first.ready()) {
+ available += (pos - first.pos);
+
+ if (second != null && second.ready())
+ available += second.len;
+ }
+ }
+ else {
+ if (second != null && second.contains(pos) && second.ready())
+ available += (pos - second.pos);
+ }
+ }
+
+ return available;
+ }
+
+ /**
+ * Checks if position shifted enough to forget previous buffer.
+ *
+ * @param pos Current position.
+ * @return {@code True} if need flip buffers.
+ */
+ private boolean needFlip(long pos) {
+ // Return true if we read more then half of second buffer.
+ return second != null && second.contains(pos);
+ }
+
+ /**
+ * Determines if all cached bytes should be discarded and new region should be
+ * prefetched.
+ *
+ * @param curPos Current stream position.
+ * @return {@code True} if need to refresh both blocks.
+ */
+ private boolean fullPrefetch(long curPos) {
+ // If no data was prefetched yet, return true.
+ return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len);
+ }
+
+ /**
+ * Starts asynchronous fetch for given region.
+ *
+ * @param pos Position to read from.
+ * @param size Number of bytes to read.
+ * @return Fetch buffer part.
+ */
+ private FetchBufferPart fetch(long pos, int size) {
+ long remaining = limit - pos;
+
+ size = (int)Math.min(size, remaining);
+
+ return size <= 0 ? null :
+ new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
new file mode 100644
index 0000000..b8bcad9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.igfs.common.IgfsMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response
+ * and request without response.
+ */
+public interface HadoopIgfsIo {
+ /**
+ * Sends given IGFS client message and asynchronously awaits for response.
+ *
+ * @param msg Message to send.
+ * @return Future that will be completed.
+ * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed).
+ */
+ public IgniteInternalFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException;
+
+ /**
+ * Sends given IGFS client message and asynchronously awaits for response. When IO detects response
+ * beginning for given message it stops reading data and passes input stream to closure which can read
+ * response in a specific way.
+ *
+ * @param msg Message to send.
+ * @param outBuf Output buffer. If {@code null}, the output buffer is not used.
+ * @param outOff Output buffer offset.
+ * @param outLen Output buffer length.
+ * @return Future that will be completed when response is returned from closure.
+ * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed).
+ */
+ public <T> IgniteInternalFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen)
+ throws IgniteCheckedException;
+
+ /**
+ * Sends given message and does not wait for response.
+ *
+ * @param msg Message to send.
+ * @throws IgniteCheckedException If send failed.
+ */
+ public void sendPlain(IgfsMessage msg) throws IgniteCheckedException;
+
+ /**
+ * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
+ * If connection is closed already, callback will be invoked synchronously inside this method.
+ *
+ * @param lsnr Event listener.
+ */
+ public void addEventListener(HadoopIgfsIpcIoListener lsnr);
+
+ /**
+ * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
+ *
+ * @param lsnr Event listener.
+ */
+ public void removeEventListener(HadoopIgfsIpcIoListener lsnr);
+}
\ No newline at end of file