You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 09:45:00 UTC
[33/38] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bf07cfae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bf07cfae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bf07cfae
Branch: refs/heads/master
Commit: bf07cfaea33a59d6bfd310ace13b562f4c086097
Parents: 81e0195
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 11:32:07 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 11:32:07 2014 +0300
----------------------------------------------------------------------
.../examples/ggfs/GgfsMapReduceExample.java | 4 +-
.../main/java/org/apache/ignite/IgniteFs.java | 2 +-
.../apache/ignite/fs/IgniteFsConfiguration.java | 4 +-
.../ignite/fs/mapreduce/IgniteFsFileRange.java | 72 +++++++
.../IgniteFsInputStreamJobAdapter.java | 45 +++++
.../apache/ignite/fs/mapreduce/IgniteFsJob.java | 62 ++++++
.../ignite/fs/mapreduce/IgniteFsJobAdapter.java | 20 ++
.../fs/mapreduce/IgniteFsRangeInputStream.java | 189 +++++++++++++++++++
.../fs/mapreduce/IgniteFsRecordResolver.java | 49 +++++
.../ignite/fs/mapreduce/IgniteFsTask.java | 165 ++++++++++++++++
.../ignite/fs/mapreduce/IgniteFsTaskArgs.java | 74 ++++++++
.../mapreduce/IgniteFsTaskNoReduceAdapter.java | 34 ++++
.../org/apache/ignite/fs/mapreduce/package.html | 15 ++
.../grid/ggfs/mapreduce/IgniteFsFileRange.java | 72 -------
.../IgniteFsInputStreamJobAdapter.java | 45 -----
.../grid/ggfs/mapreduce/IgniteFsJob.java | 62 ------
.../grid/ggfs/mapreduce/IgniteFsJobAdapter.java | 20 --
.../mapreduce/IgniteFsRangeInputStream.java | 189 -------------------
.../ggfs/mapreduce/IgniteFsRecordResolver.java | 49 -----
.../grid/ggfs/mapreduce/IgniteFsTask.java | 165 ----------------
.../grid/ggfs/mapreduce/IgniteFsTaskArgs.java | 74 --------
.../mapreduce/IgniteFsTaskNoReduceAdapter.java | 34 ----
.../gridgain/grid/ggfs/mapreduce/package.html | 15 --
.../IgniteFsByteDelimiterRecordResolver.java | 2 +-
.../IgniteFsFixedLengthRecordResolver.java | 2 +-
.../processors/ggfs/GridGgfsAsyncImpl.java | 2 +-
.../kernal/processors/ggfs/GridGgfsImpl.java | 2 +-
.../kernal/processors/ggfs/GridGgfsJobImpl.java | 2 +-
.../processors/ggfs/GridGgfsProcessor.java | 2 +-
.../ggfs/GridGgfsProcessorAdapter.java | 2 +-
.../processors/ggfs/GridNoopGgfsProcessor.java | 2 +-
.../processors/ggfs/IgniteFsTaskArgsImpl.java | 2 +-
.../processors/ggfs/GridGgfsTaskSelfTest.java | 4 +-
.../GridGgfsAbstractRecordResolverSelfTest.java | 2 +-
...GgfsByteDelimiterRecordResolverSelfTest.java | 2 +-
...idGgfsFixedLengthRecordResolverSelfTest.java | 2 +-
...sNewLineDelimiterRecordResolverSelfTest.java | 2 +-
...fsStringDelimiterRecordResolverSelfTest.java | 2 +-
...idHadoopDefaultMapReducePlannerSelfTest.java | 2 +-
39 files changed, 747 insertions(+), 747 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
index ec9c52a..10913b0 100644
--- a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
+++ b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
@@ -12,15 +12,15 @@ package org.gridgain.examples.ggfs;
import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import java.io.*;
import java.util.*;
/**
- * Example that shows how to use {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask} to find lines matching particular pattern in the file in pretty
+ * Example that shows how to use {@link org.apache.ignite.fs.mapreduce.IgniteFsTask} to find lines matching particular pattern in the file in pretty
* the same way as {@code grep} command does.
* <p>
* Remote nodes should always be started with configuration file which includes
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
index 323a778..11e4732 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
@@ -10,9 +10,9 @@
package org.apache.ignite;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.jetbrains.annotations.*;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java
index 13e6746..f98b92b 100644
--- a/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java
@@ -753,13 +753,13 @@ public class IgniteFsConfiguration {
/**
* Get maximum default range size of a file being split during GGFS task execution. When GGFS task is about to
- * be executed, it requests file block locations first. Each location is defined as {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsFileRange} which
+ * be executed, it requests file block locations first. Each location is defined as {@link org.apache.ignite.fs.mapreduce.IgniteFsFileRange} which
* has length. In case this parameter is set to positive value, then GGFS will split single file range into smaller
* ranges with length not greater that this parameter. The only exception to this case is when maximum task range
* length is smaller than file block size. In this case maximum task range size will be overridden and set to file
* block size.
* <p>
- * Note that this parameter is applied when task is split into jobs before {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsRecordResolver} is
+ * Note that this parameter is applied when task is split into jobs before {@link org.apache.ignite.fs.mapreduce.IgniteFsRecordResolver} is
* applied. Therefore, final file ranges being assigned to particular jobs could be greater than value of this
* parameter depending on file data layout and selected resolver type.
* <p>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java
new file mode 100644
index 0000000..9e49cb1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java
@@ -0,0 +1,72 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.fs.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Entity representing part of GGFS file identified by file path, start position, and length.
+ */
+public class IgniteFsFileRange {
+ /** File path. */
+ private IgniteFsPath path;
+
+ /** Start position. */
+ private long start;
+
+ /** Length. */
+ private long len;
+
+ /**
+ * Creates file range.
+ *
+ * @param path File path.
+ * @param start Start position.
+ * @param len Length.
+ */
+ public IgniteFsFileRange(IgniteFsPath path, long start, long len) {
+ this.path = path;
+ this.start = start;
+ this.len = len;
+ }
+
+ /**
+ * Gets file path.
+ *
+ * @return File path.
+ */
+ public IgniteFsPath path() {
+ return path;
+ }
+
+ /**
+ * Gets range start position.
+ *
+ * @return Start position.
+ */
+ public long start() {
+ return start;
+ }
+
+ /**
+ * Gets range length.
+ *
+ * @return Length.
+ */
+ public long length() {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsFileRange.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java
new file mode 100644
index 0000000..23449be
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java
@@ -0,0 +1,45 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.fs.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.*;
+
+import java.io.*;
+
+/**
+ * Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.fs.IgniteFsInputStream} to bytes within
+ * the {@link IgniteFsFileRange} assigned to the job.
+ * <p>
+ * Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into
+ * {@link GridFixedSizeInputStream} limited to range length.
+ */
+public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
+ /** {@inheritDoc} */
+ @Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in)
+ throws GridException, IOException {
+ in.seek(range.start());
+
+ return execute(ggfs, new IgniteFsRangeInputStream(in, range));
+ }
+
+ /**
+ * Executes this job.
+ *
+ * @param ggfs GGFS instance.
+ * @param in Input stream.
+ * @return Execution result.
+ * @throws GridException If execution failed.
+ * @throws IOException If IO exception encountered while working with stream.
+ */
+ public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java
new file mode 100644
index 0000000..b4921eb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java
@@ -0,0 +1,62 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.fs.*;
+import org.gridgain.grid.*;
+
+import java.io.*;
+
+/**
+ * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the
+ * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
+ * <p>
+ * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.fs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this
+ * job is expected to operate on, and already opened {@link org.apache.ignite.fs.IgniteFsInputStream} for the file this range belongs to.
+ * <p>
+ * Note that provided input stream has position already adjusted to range start. However, it will not
+ * automatically stop on range end. This is done to provide capability in some cases to look beyond
+ * the range end or seek position before the reange start.
+ * <p>
+ * In majority of the cases, when you want to process only provided range, you should explicitly control amount
+ * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates
+ * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
+ * {@link IgniteFsRangeInputStream}.
+ * <p>
+ * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations.
+ */
+public interface IgniteFsJob {
+ /**
+ * Executes this job.
+ *
+ * @param ggfs GGFS instance.
+ * @param range File range aligned to record boundaries.
+ * @param in Input stream for split file. This input stream is not aligned to range and points to file start
+ * by default.
+ * @return Execution result.
+ * @throws GridException If execution failed.
+ * @throws IOException If file system operation resulted in IO exception.
+ */
+ public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws GridException,
+ IOException;
+
+ /**
+ * This method is called when system detects that completion of this
+ * job can no longer alter the overall outcome (for example, when parent task
+ * has already reduced the results). Job is also cancelled when
+ * {@link org.apache.ignite.compute.ComputeTaskFuture#cancel()} is called.
+ * <p>
+ * Note that job cancellation is only a hint, and just like with
+ * {@link Thread#interrupt()} method, it is really up to the actual job
+ * instance to gracefully finish execution and exit.
+ */
+ public void cancel();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java
new file mode 100644
index 0000000..af20038
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java
@@ -0,0 +1,20 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+/**
+ * Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method.
+ */
+public abstract class IgniteFsJobAdapter implements IgniteFsJob {
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java
new file mode 100644
index 0000000..f59a7d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java
@@ -0,0 +1,189 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.fs.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Decorator for regular {@link org.apache.ignite.fs.IgniteFsInputStream} which streams only data within the given range.
+ * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create
+ * jobs which will be working only with the assigned range. You can also use it explicitly when
+ * working with {@link IgniteFsJob} directly.
+ */
+public final class IgniteFsRangeInputStream extends IgniteFsInputStream {
+ /** Base input stream. */
+ private final IgniteFsInputStream is;
+
+ /** Start position. */
+ private final long start;
+
+ /** Maximum stream length. */
+ private final long maxLen;
+
+ /** Current position within the stream. */
+ private long pos;
+
+ /**
+ * Constructor.
+ *
+ * @param is Base input stream.
+ * @param start Start position.
+ * @param maxLen Maximum stream length.
+ * @throws IOException In case of exception.
+ */
+ public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
+ if (is == null)
+ throw new IllegalArgumentException("Input stream cannot be null.");
+
+ if (start < 0)
+ throw new IllegalArgumentException("Start position cannot be negative.");
+
+ if (start >= is.length())
+ throw new IllegalArgumentException("Start position cannot be greater that file length.");
+
+ if (maxLen < 0)
+ throw new IllegalArgumentException("Length cannot be negative.");
+
+ if (start + maxLen > is.length())
+ throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length.");
+
+ this.is = is;
+ this.start = start;
+ this.maxLen = maxLen;
+
+ is.seek(start);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long length() {
+ return is.length();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param is Base input stream.
+ * @param range File range.
+ * @throws IOException In case of exception.
+ */
+ public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException {
+ this(is, range.start(), range.length());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ if (pos < maxLen) {
+ int res = is.read();
+
+ if (res != -1)
+ pos++;
+
+ return res;
+ }
+ else
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(@NotNull byte[] b, int off, int len) throws IOException {
+ if (pos < maxLen) {
+ len = (int)Math.min(len, maxLen - pos);
+
+ int res = is.read(b, off, len);
+
+ if (res != -1)
+ pos += res;
+
+ return res;
+ }
+ else
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+ seek(pos);
+
+ return read(buf, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf) throws IOException {
+ readFully(pos, buf, 0, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+ seek(pos);
+
+ for (int readBytes = 0; readBytes < len;) {
+ int read = read(buf, off + readBytes, len - readBytes);
+
+ if (read == -1)
+ throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos +
+ ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
+
+ readBytes += read;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seek(long pos) throws IOException {
+ if (pos < 0)
+ throw new IOException("Seek position cannot be negative: " + pos);
+
+ is.seek(start + pos);
+
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long position() {
+ return pos;
+ }
+
+ /**
+ * Since range input stream represents a part of larger file stream, there is an offset at which this
+ * range input stream starts in original input stream. This method returns start offset of this input
+ * stream relative to original input stream.
+ *
+ * @return Start offset in original input stream.
+ */
+ public long startOffset() {
+ return start;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int available() {
+ long l = maxLen - pos;
+
+ if (l < 0)
+ return 0;
+
+ if (l > Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+
+ return (int)l;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ is.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsRangeInputStream.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
new file mode 100644
index 0000000..3e347b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
@@ -0,0 +1,49 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.fs.*;
+import org.gridgain.grid.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain
+ * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual
+ * execution in order to adjust record boundaries in a way consistent with user data.
+ * <p>
+ * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file.
+ * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver
+ * you can adjust job range so that it covers the whole line(s).
+ * <p>
+ * The following record resolvers are available out of the box:
+ * <ul>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver}</li>
+ * </ul>
+ */
+public interface IgniteFsRecordResolver extends Serializable {
+ /**
+ * Adjusts record start offset and length.
+ *
+ * @param ggfs GGFS instance to use.
+ * @param stream Input stream for split file.
+ * @param suggestedRecord Suggested file system record.
+ * @return New adjusted record. If this method returns {@code null}, original record is ignored.
+ * @throws GridException If resolve failed.
+ * @throws IOException If resolve failed.
+ */
+ @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ IgniteFsFileRange suggestedRecord) throws GridException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
new file mode 100644
index 0000000..c1383bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
@@ -0,0 +1,165 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.fs.*;
+import org.apache.ignite.resources.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.ggfs.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
+ * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
+ * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
+ * {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
+ * <p>
+ * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
+ * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size
+ * is provided (either through {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()}
+ * argument), then ranges could be further divided into smaller chunks.
+ * <p>
+ * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a
+ * {@link IgniteFsJob}.
+ * <p>
+ * Finally all generated jobs are sent to Grid nodes for execution.
+ * <p>
+ * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step.
+ * <p>
+ * Here is an example of such a task:
+ * <pre name="code" class="java">
+ * public class WordCountTask extends GridGgfsTask<String, Integer> {
+ * @Override
+ * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException {
+ * // New job will be created for each range within each file.
+ * // We pass user-provided argument (which is essentially a word to look for) to that job.
+ * return new WordCountJob(args.userArgument());
+ * }
+ *
+ * // Aggregate results into one compound result.
+ * public Integer reduce(List<GridComputeJobResult> results) throws GridException {
+ * Integer total = 0;
+ *
+ * for (GridComputeJobResult res : results) {
+ * Integer cnt = res.getData();
+ *
+ * // Null can be returned for non-existent file in case we decide to ignore such situations.
+ * if (cnt != null)
+ * total += cnt;
+ * }
+ *
+ * return total;
+ * }
+ * }
+ * </pre>
+ */
+public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Injected grid. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable IgniteFsTaskArgs<T> args) throws GridException {
+ assert ignite != null;
+ assert args != null;
+
+ IgniteFs ggfs = ignite.fileSystem(args.ggfsName());
+ GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs();
+
+ Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();
+
+ Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid);
+
+ for (IgniteFsPath path : args.paths()) {
+ IgniteFsFile file = ggfs.info(path);
+
+ if (file == null) {
+ if (args.skipNonExistentFiles())
+ continue;
+ else
+ throw new GridException("Failed to process GGFS file because it doesn't exist: " + path);
+ }
+
+ Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength());
+
+ long totalLen = 0;
+
+ for (IgniteFsBlockLocation loc : aff) {
+ ClusterNode node = null;
+
+ for (UUID nodeId : loc.nodeIds()) {
+ node = nodes.get(nodeId);
+
+ if (node != null)
+ break;
+ }
+
+ if (node == null)
+ throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc +
+ ", subgrid=" + subgrid + ']');
+
+ IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
+
+ if (job != null) {
+ ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(),
+ loc.length(), args.recordResolver());
+
+ splitMap.put(jobImpl, node);
+ }
+
+ totalLen += loc.length();
+ }
+
+ assert totalLen == file.length();
+ }
+
+ return splitMap;
+ }
+
+ /**
+ * Callback invoked during task map procedure to create job that will process specified split
+ * for GGFS file.
+ *
+ * @param path Path.
+ * @param range File range based on consecutive blocks. This range will be further
+ * realigned to record boundaries on destination node.
+ * @param args Task argument.
+ * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped.
+ * @throws GridException If job creation failed.
+ */
+ @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
+ IgniteFsTaskArgs<T> args) throws GridException;
+
+ /**
+ * Maps list by node ID.
+ *
+ * @param subgrid Subgrid.
+ * @return Map.
+ */
+ private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) {
+ Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size());
+
+ for (ClusterNode node : subgrid)
+ res.put(node.id(), node);
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java
new file mode 100644
index 0000000..3880fd3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java
@@ -0,0 +1,74 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.fs.*;
+
+import java.util.*;
+
+/**
+ * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods,
+ * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is
+ * passed to {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
+ * <p>
+ * Task arguments encapsulates the following data:
+ * <ul>
+ * <li>GGFS name</li>
+ * <li>File paths passed to {@code GridGgfs.execute()} method</li>
+ * <li>{@link IgniteFsRecordResolver} for that task</li>
+ * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li>
+ * <li>User-defined task argument</li>
+ * <li>Maximum file range length for that task (see {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li>
+ * </ul>
+ */
+public interface IgniteFsTaskArgs<T> {
+ /**
+ * Gets GGFS name.
+ *
+ * @return GGFS name.
+ */
+ public String ggfsName();
+
+ /**
+ * Gets file paths to process.
+ *
+ * @return File paths to process.
+ */
+ public Collection<IgniteFsPath> paths();
+
+ /**
+ * Gets record resolver for the task.
+ *
+ * @return Record resolver.
+ */
+ public IgniteFsRecordResolver recordResolver();
+
+ /**
+ * Flag indicating whether to fail or simply skip non-existent files.
+ *
+ * @return {@code True} if non-existent files should be skipped.
+ */
+ public boolean skipNonExistentFiles();
+
+ /**
+ * User argument provided for task execution.
+ *
+ * @return User argument.
+ */
+ public T userArgument();
+
+ /**
+ * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including
+ * all consecutive blocks will be used without any limitations.
+ *
+ * @return Maximum range length.
+ */
+ public long maxRangeLength();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java
new file mode 100644
index 0000000..45c6e89
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java
@@ -0,0 +1,34 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.fs.mapreduce;
+
+import org.apache.ignite.compute.*;
+
+import java.util.*;
+
+/**
+ * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in
+ * results returned by jobs.
+ */
+public abstract class IgniteFsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default implementation which will ignore all results sent from execution nodes.
+ *
+ * @param results Received results of broadcasted remote executions. Note that if task class has
+ * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty.
+ * @return Will always return {@code null}.
+ */
+ @Override public R reduce(List<ComputeJobResult> results) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html
new file mode 100644
index 0000000..cde37cf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+ @html.file.header
+ _________ _____ __________________ _____
+ __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+-->
+<html>
+<body>
+ <!-- Package description. -->
+ Contains APIs for In-Memory MapReduce over GGFS.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java
deleted file mode 100644
index 57ca491..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.fs.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Entity representing part of GGFS file identified by file path, start position, and length.
- */
-public class IgniteFsFileRange {
- /** File path. */
- private IgniteFsPath path;
-
- /** Start position. */
- private long start;
-
- /** Length. */
- private long len;
-
- /**
- * Creates file range.
- *
- * @param path File path.
- * @param start Start position.
- * @param len Length.
- */
- public IgniteFsFileRange(IgniteFsPath path, long start, long len) {
- this.path = path;
- this.start = start;
- this.len = len;
- }
-
- /**
- * Gets file path.
- *
- * @return File path.
- */
- public IgniteFsPath path() {
- return path;
- }
-
- /**
- * Gets range start position.
- *
- * @return Start position.
- */
- public long start() {
- return start;
- }
-
- /**
- * Gets range length.
- *
- * @return Length.
- */
- public long length() {
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteFsFileRange.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
deleted file mode 100644
index 0432046..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.*;
-import org.apache.ignite.fs.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.util.*;
-
-import java.io.*;
-
-/**
- * Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.fs.IgniteFsInputStream} to bytes within
- * the {@link IgniteFsFileRange} assigned to the job.
- * <p>
- * Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into
- * {@link GridFixedSizeInputStream} limited to range length.
- */
-public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
- /** {@inheritDoc} */
- @Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in)
- throws GridException, IOException {
- in.seek(range.start());
-
- return execute(ggfs, new IgniteFsRangeInputStream(in, range));
- }
-
- /**
- * Executes this job.
- *
- * @param ggfs GGFS instance.
- * @param in Input stream.
- * @return Execution result.
- * @throws GridException If execution failed.
- * @throws IOException If IO exception encountered while working with stream.
- */
- public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
deleted file mode 100644
index 5bcf6b3..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.*;
-import org.apache.ignite.fs.*;
-import org.gridgain.grid.*;
-
-import java.io.*;
-
-/**
- * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the
- * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
- * <p>
- * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.fs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this
- * job is expected to operate on, and already opened {@link org.apache.ignite.fs.IgniteFsInputStream} for the file this range belongs to.
- * <p>
- * Note that provided input stream has position already adjusted to range start. However, it will not
- * automatically stop on range end. This is done to provide capability in some cases to look beyond
- * the range end or seek position before the reange start.
- * <p>
- * In majority of the cases, when you want to process only provided range, you should explicitly control amount
- * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates
- * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
- * {@link IgniteFsRangeInputStream}.
- * <p>
- * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations.
- */
-public interface IgniteFsJob {
- /**
- * Executes this job.
- *
- * @param ggfs GGFS instance.
- * @param range File range aligned to record boundaries.
- * @param in Input stream for split file. This input stream is not aligned to range and points to file start
- * by default.
- * @return Execution result.
- * @throws GridException If execution failed.
- * @throws IOException If file system operation resulted in IO exception.
- */
- public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws GridException,
- IOException;
-
- /**
- * This method is called when system detects that completion of this
- * job can no longer alter the overall outcome (for example, when parent task
- * has already reduced the results). Job is also cancelled when
- * {@link org.apache.ignite.compute.ComputeTaskFuture#cancel()} is called.
- * <p>
- * Note that job cancellation is only a hint, and just like with
- * {@link Thread#interrupt()} method, it is really up to the actual job
- * instance to gracefully finish execution and exit.
- */
- public void cancel();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java
deleted file mode 100644
index e8aa1ea..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-/**
- * Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method.
- */
-public abstract class IgniteFsJobAdapter implements IgniteFsJob {
- /** {@inheritDoc} */
- @Override public void cancel() {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java
deleted file mode 100644
index 6c123eb..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.fs.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Decorator for regular {@link org.apache.ignite.fs.IgniteFsInputStream} which streams only data within the given range.
- * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create
- * jobs which will be working only with the assigned range. You can also use it explicitly when
- * working with {@link IgniteFsJob} directly.
- */
-public final class IgniteFsRangeInputStream extends IgniteFsInputStream {
- /** Base input stream. */
- private final IgniteFsInputStream is;
-
- /** Start position. */
- private final long start;
-
- /** Maximum stream length. */
- private final long maxLen;
-
- /** Current position within the stream. */
- private long pos;
-
- /**
- * Constructor.
- *
- * @param is Base input stream.
- * @param start Start position.
- * @param maxLen Maximum stream length.
- * @throws IOException In case of exception.
- */
- public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
- if (is == null)
- throw new IllegalArgumentException("Input stream cannot be null.");
-
- if (start < 0)
- throw new IllegalArgumentException("Start position cannot be negative.");
-
- if (start >= is.length())
- throw new IllegalArgumentException("Start position cannot be greater that file length.");
-
- if (maxLen < 0)
- throw new IllegalArgumentException("Length cannot be negative.");
-
- if (start + maxLen > is.length())
- throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length.");
-
- this.is = is;
- this.start = start;
- this.maxLen = maxLen;
-
- is.seek(start);
- }
-
- /** {@inheritDoc} */
- @Override public long length() {
- return is.length();
- }
-
- /**
- * Constructor.
- *
- * @param is Base input stream.
- * @param range File range.
- * @throws IOException In case of exception.
- */
- public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException {
- this(is, range.start(), range.length());
- }
-
- /** {@inheritDoc} */
- @Override public int read() throws IOException {
- if (pos < maxLen) {
- int res = is.read();
-
- if (res != -1)
- pos++;
-
- return res;
- }
- else
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public int read(@NotNull byte[] b, int off, int len) throws IOException {
- if (pos < maxLen) {
- len = (int)Math.min(len, maxLen - pos);
-
- int res = is.read(b, off, len);
-
- if (res != -1)
- pos += res;
-
- return res;
- }
- else
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
- seek(pos);
-
- return read(buf, off, len);
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf) throws IOException {
- readFully(pos, buf, 0, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
- seek(pos);
-
- for (int readBytes = 0; readBytes < len;) {
- int read = read(buf, off + readBytes, len - readBytes);
-
- if (read == -1)
- throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos +
- ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
-
- readBytes += read;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void seek(long pos) throws IOException {
- if (pos < 0)
- throw new IOException("Seek position cannot be negative: " + pos);
-
- is.seek(start + pos);
-
- this.pos = pos;
- }
-
- /** {@inheritDoc} */
- @Override public long position() {
- return pos;
- }
-
- /**
- * Since range input stream represents a part of larger file stream, there is an offset at which this
- * range input stream starts in original input stream. This method returns start offset of this input
- * stream relative to original input stream.
- *
- * @return Start offset in original input stream.
- */
- public long startOffset() {
- return start;
- }
-
- /** {@inheritDoc} */
- @Override public int available() {
- long l = maxLen - pos;
-
- if (l < 0)
- return 0;
-
- if (l > Integer.MAX_VALUE)
- return Integer.MAX_VALUE;
-
- return (int)l;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- is.close();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteFsRangeInputStream.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
deleted file mode 100644
index 697f575..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.*;
-import org.apache.ignite.fs.*;
-import org.gridgain.grid.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain
- * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual
- * execution in order to adjust record boundaries in a way consistent with user data.
- * <p>
- * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file.
- * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver
- * you can adjust job range so that it covers the whole line(s).
- * <p>
- * The following record resolvers are available out of the box:
- * <ul>
- * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li>
- * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li>
- * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li>
- * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver}</li>
- * </ul>
- */
-public interface IgniteFsRecordResolver extends Serializable {
- /**
- * Adjusts record start offset and length.
- *
- * @param ggfs GGFS instance to use.
- * @param stream Input stream for split file.
- * @param suggestedRecord Suggested file system record.
- * @return New adjusted record. If this method returns {@code null}, original record is ignored.
- * @throws GridException If resolve failed.
- * @throws IOException If resolve failed.
- */
- @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
- IgniteFsFileRange suggestedRecord) throws GridException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
deleted file mode 100644
index bfde75c..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.fs.*;
-import org.apache.ignite.resources.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.processors.ggfs.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
- * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
- * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
- * {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
- * <p>
- * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
- * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size
- * is provided (either through {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()}
- * argument), then ranges could be further divided into smaller chunks.
- * <p>
- * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a
- * {@link IgniteFsJob}.
- * <p>
- * Finally all generated jobs are sent to Grid nodes for execution.
- * <p>
- * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step.
- * <p>
- * Here is an example of such a task:
- * <pre name="code" class="java">
- * public class WordCountTask extends GridGgfsTask<String, Integer> {
- * @Override
- * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException {
- * // New job will be created for each range within each file.
- * // We pass user-provided argument (which is essentially a word to look for) to that job.
- * return new WordCountJob(args.userArgument());
- * }
- *
- * // Aggregate results into one compound result.
- * public Integer reduce(List<GridComputeJobResult> results) throws GridException {
- * Integer total = 0;
- *
- * for (GridComputeJobResult res : results) {
- * Integer cnt = res.getData();
- *
- * // Null can be returned for non-existent file in case we decide to ignore such situations.
- * if (cnt != null)
- * total += cnt;
- * }
- *
- * return total;
- * }
- * }
- * </pre>
- */
-public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Injected grid. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** {@inheritDoc} */
- @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable IgniteFsTaskArgs<T> args) throws GridException {
- assert ignite != null;
- assert args != null;
-
- IgniteFs ggfs = ignite.fileSystem(args.ggfsName());
- GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs();
-
- Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();
-
- Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid);
-
- for (IgniteFsPath path : args.paths()) {
- IgniteFsFile file = ggfs.info(path);
-
- if (file == null) {
- if (args.skipNonExistentFiles())
- continue;
- else
- throw new GridException("Failed to process GGFS file because it doesn't exist: " + path);
- }
-
- Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength());
-
- long totalLen = 0;
-
- for (IgniteFsBlockLocation loc : aff) {
- ClusterNode node = null;
-
- for (UUID nodeId : loc.nodeIds()) {
- node = nodes.get(nodeId);
-
- if (node != null)
- break;
- }
-
- if (node == null)
- throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc +
- ", subgrid=" + subgrid + ']');
-
- IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
-
- if (job != null) {
- ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(),
- loc.length(), args.recordResolver());
-
- splitMap.put(jobImpl, node);
- }
-
- totalLen += loc.length();
- }
-
- assert totalLen == file.length();
- }
-
- return splitMap;
- }
-
- /**
- * Callback invoked during task map procedure to create job that will process specified split
- * for GGFS file.
- *
- * @param path Path.
- * @param range File range based on consecutive blocks. This range will be further
- * realigned to record boundaries on destination node.
- * @param args Task argument.
- * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped.
- * @throws GridException If job creation failed.
- */
- @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
- IgniteFsTaskArgs<T> args) throws GridException;
-
- /**
- * Maps list by node ID.
- *
- * @param subgrid Subgrid.
- * @return Map.
- */
- private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) {
- Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size());
-
- for (ClusterNode node : subgrid)
- res.put(node.id(), node);
-
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java
deleted file mode 100644
index d6622ef..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.fs.*;
-
-import java.util.*;
-
-/**
- * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods,
- * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is
- * passed to {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
- * <p>
- * Task arguments encapsulates the following data:
- * <ul>
- * <li>GGFS name</li>
- * <li>File paths passed to {@code GridGgfs.execute()} method</li>
- * <li>{@link IgniteFsRecordResolver} for that task</li>
- * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li>
- * <li>User-defined task argument</li>
- * <li>Maximum file range length for that task (see {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li>
- * </ul>
- */
-public interface IgniteFsTaskArgs<T> {
- /**
- * Gets GGFS name.
- *
- * @return GGFS name.
- */
- public String ggfsName();
-
- /**
- * Gets file paths to process.
- *
- * @return File paths to process.
- */
- public Collection<IgniteFsPath> paths();
-
- /**
- * Gets record resolver for the task.
- *
- * @return Record resolver.
- */
- public IgniteFsRecordResolver recordResolver();
-
- /**
- * Flag indicating whether to fail or simply skip non-existent files.
- *
- * @return {@code True} if non-existent files should be skipped.
- */
- public boolean skipNonExistentFiles();
-
- /**
- * User argument provided for task execution.
- *
- * @return User argument.
- */
- public T userArgument();
-
- /**
- * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including
- * all consecutive blocks will be used without any limitations.
- *
- * @return Maximum range length.
- */
- public long maxRangeLength();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java
deleted file mode 100644
index 180d7a4..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.compute.*;
-
-import java.util.*;
-
-/**
- * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in
- * results returned by jobs.
- */
-public abstract class IgniteFsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Default implementation which will ignore all results sent from execution nodes.
- *
- * @param results Received results of broadcasted remote executions. Note that if task class has
- * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty.
- * @return Will always return {@code null}.
- */
- @Override public R reduce(List<ComputeJobResult> results) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html
deleted file mode 100644
index cde37cf..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html
+++ /dev/null
@@ -1,15 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<!--
- @html.file.header
- _________ _____ __________________ _____
- __ ____/___________(_)______ /__ ____/______ ____(_)_______
- _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
--->
-<html>
-<body>
- <!-- Package description. -->
- Contains APIs for In-Memory MapReduce over GGFS.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
index 9ca7b54..480e574 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
@@ -11,9 +11,9 @@ package org.gridgain.grid.ggfs.mapreduce.records;
import org.apache.ignite.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.tostring.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
index ddfd402..b360483 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
@@ -11,8 +11,8 @@ package org.gridgain.grid.ggfs.mapreduce.records;
import org.apache.ignite.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.util.typedef.internal.*;
import java.io.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
index 0351392..24f0a5c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
@@ -11,9 +11,9 @@ package org.gridgain.grid.kernal.processors.ggfs;
import org.apache.ignite.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.jetbrains.annotations.*;
import java.net.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
index a54515c..2f00e69 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
@@ -14,13 +14,13 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.events.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.eviction.*;
import org.gridgain.grid.cache.eviction.ggfs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.managers.eventstorage.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
index bdd8e66..93aff1a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
@@ -12,9 +12,9 @@ package org.gridgain.grid.kernal.processors.ggfs;
import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.resources.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.kernal.*;
import java.io.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
index 5ba2e40..335b05c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
@@ -14,11 +14,11 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.license.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
index a3bc423..ab4c8af 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
@@ -12,7 +12,7 @@ package org.gridgain.grid.kernal.processors.ggfs;
import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.*;
import org.gridgain.grid.util.ipc.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
index d1f9d6e..1f2875b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
@@ -12,7 +12,7 @@ package org.gridgain.grid.kernal.processors.ggfs;
import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.util.ipc.*;
import org.gridgain.grid.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
index 78ecc7d..fe73fe4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
@@ -10,7 +10,7 @@
package org.gridgain.grid.kernal.processors.ggfs;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.util.typedef.internal.*;
import java.io.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
index 02aa9ef..c6d6f24 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
@@ -13,11 +13,11 @@ import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.gridgain.grid.spi.discovery.tcp.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
@@ -35,7 +35,7 @@ import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
import static org.apache.ignite.fs.IgniteFsMode.*;
/**
- * Tests for {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask}.
+ * Tests for {@link org.apache.ignite.fs.mapreduce.IgniteFsTask}.
*/
public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest {
/** Predefined words dictionary. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java
index 7347f0f..905410c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java
@@ -12,8 +12,8 @@ package org.gridgain.grid.kernal.processors.ggfs.split;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.spi.discovery.tcp.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
index b9f6b8d..bd9712a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
@@ -10,7 +10,7 @@
package org.gridgain.grid.kernal.processors.ggfs.split;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.gridgain.grid.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
index e9d3357..7bebc96 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
@@ -10,7 +10,7 @@
package org.gridgain.grid.kernal.processors.ggfs.split;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.gridgain.grid.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
index b1efc24..93d7286 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
@@ -10,7 +10,7 @@
package org.gridgain.grid.kernal.processors.ggfs.split;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.gridgain.grid.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
index bea2eff..efa044a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
@@ -10,7 +10,7 @@
package org.gridgain.grid.kernal.processors.ggfs.split;
import org.apache.ignite.fs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.gridgain.grid.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
index 51424f4..e0b444d 100644
--- a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
@@ -12,10 +12,10 @@ package org.gridgain.grid.kernal.processors.hadoop;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.fs.*;
+import org.apache.ignite.fs.mapreduce.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.hadoop.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;