You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 09:44:53 UTC
[26/38] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7cd638fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7cd638fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7cd638fe
Branch: refs/heads/master
Commit: 7cd638fe62ddf8518c8e9a41f2b54ef22cf59dab
Parents: b7479ed
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 11:24:58 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 11:24:58 2014 +0300
----------------------------------------------------------------------
.../examples/ggfs/GgfsMapReduceExample.java | 6 +-
.../main/java/org/apache/ignite/IgniteFs.java | 12 +-
.../grid/ggfs/IgniteFsConfiguration.java | 3 +-
.../mapreduce/GridGgfsRangeInputStream.java | 189 -------------------
.../ggfs/mapreduce/GridGgfsRecordResolver.java | 50 -----
.../grid/ggfs/mapreduce/GridGgfsTask.java | 165 ----------------
.../grid/ggfs/mapreduce/GridGgfsTaskArgs.java | 6 +-
.../mapreduce/GridGgfsTaskNoReduceAdapter.java | 4 +-
.../IgniteFsInputStreamJobAdapter.java | 4 +-
.../grid/ggfs/mapreduce/IgniteFsJob.java | 8 +-
.../mapreduce/IgniteFsRangeInputStream.java | 189 +++++++++++++++++++
.../ggfs/mapreduce/IgniteFsRecordResolver.java | 50 +++++
.../grid/ggfs/mapreduce/IgniteFsTask.java | 165 ++++++++++++++++
.../GridGgfsByteDelimiterRecordResolver.java | 2 +-
.../GridGgfsFixedLengthRecordResolver.java | 2 +-
.../processors/ggfs/GridGgfsAsyncImpl.java | 12 +-
.../kernal/processors/ggfs/GridGgfsImpl.java | 26 +--
.../kernal/processors/ggfs/GridGgfsJobImpl.java | 4 +-
.../processors/ggfs/GridGgfsProcessor.java | 2 +-
.../ggfs/GridGgfsProcessorAdapter.java | 2 +-
.../processors/ggfs/GridGgfsTaskArgsImpl.java | 8 +-
.../processors/ggfs/GridNoopGgfsProcessor.java | 2 +-
.../processors/ggfs/GridGgfsTaskSelfTest.java | 4 +-
...idHadoopDefaultMapReducePlannerSelfTest.java | 12 +-
24 files changed, 463 insertions(+), 464 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
index 85f103d..edd3f70 100644
--- a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
+++ b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
@@ -20,7 +20,7 @@ import java.io.*;
import java.util.*;
/**
- * Example that shows how to use {@link GridGgfsTask} to find lines matching particular pattern in the file in pretty
+ * Example that shows how to use {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask} to find lines matching particular pattern in the file in pretty
* the same way as {@code grep} command does.
* <p>
* Remote nodes should always be started with configuration file which includes
@@ -119,7 +119,7 @@ public class GgfsMapReduceExample {
/**
* Grep task.
*/
- private static class GrepTask extends GridGgfsTask<String, Collection<Line>> {
+ private static class GrepTask extends IgniteFsTask<String, Collection<Line>> {
/** {@inheritDoc} */
@Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
GridGgfsTaskArgs<String> args) throws GridException {
@@ -166,7 +166,7 @@ public class GgfsMapReduceExample {
}
/** {@inheritDoc} */
- @Override public Object execute(IgniteFs ggfs, GridGgfsRangeInputStream in) throws GridException, IOException {
+ @Override public Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException {
Collection<Line> res = null;
long start = in.startOffset();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
index 241ab3e..fd3c88c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java
@@ -269,7 +269,7 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport {
* @return Task result.
* @throws GridException If execution failed.
*/
- public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException;
/**
@@ -289,7 +289,7 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport {
* @return Task result.
* @throws GridException If execution failed.
*/
- public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws GridException;
@@ -305,8 +305,8 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport {
* @return Task result.
* @throws GridException If execution failed.
*/
- public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException;
+ public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException;
/**
* Executes GGFS task with overridden maximum range length (see
@@ -324,8 +324,8 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport {
* @return Task result.
* @throws GridException If execution failed.
*/
- public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
+ public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) throws GridException;
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java
index 287e93c..45a009a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java
@@ -9,7 +9,6 @@
package org.gridgain.grid.ggfs;
-import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -762,7 +761,7 @@ public class IgniteFsConfiguration {
* length is smaller than file block size. In this case maximum task range size will be overridden and set to file
* block size.
* <p>
- * Note that this parameter is applied when task is split into jobs before {@link GridGgfsRecordResolver} is
+ * Note that this parameter is applied when task is split into jobs before {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsRecordResolver} is
* applied. Therefore, final file ranges being assigned to particular jobs could be greater than value of this
* parameter depending on file data layout and selected resolver type.
* <p>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java
deleted file mode 100644
index ac96c68..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Decorator for regular {@link org.gridgain.grid.ggfs.IgniteFsInputStream} which streams only data within the given range.
- * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create
- * jobs which will be working only with the assigned range. You can also use it explicitly when
- * working with {@link IgniteFsJob} directly.
- */
-public final class GridGgfsRangeInputStream extends IgniteFsInputStream {
- /** Base input stream. */
- private final IgniteFsInputStream is;
-
- /** Start position. */
- private final long start;
-
- /** Maximum stream length. */
- private final long maxLen;
-
- /** Current position within the stream. */
- private long pos;
-
- /**
- * Constructor.
- *
- * @param is Base input stream.
- * @param start Start position.
- * @param maxLen Maximum stream length.
- * @throws IOException In case of exception.
- */
- public GridGgfsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
- if (is == null)
- throw new IllegalArgumentException("Input stream cannot be null.");
-
- if (start < 0)
- throw new IllegalArgumentException("Start position cannot be negative.");
-
- if (start >= is.length())
- throw new IllegalArgumentException("Start position cannot be greater that file length.");
-
- if (maxLen < 0)
- throw new IllegalArgumentException("Length cannot be negative.");
-
- if (start + maxLen > is.length())
- throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length.");
-
- this.is = is;
- this.start = start;
- this.maxLen = maxLen;
-
- is.seek(start);
- }
-
- /** {@inheritDoc} */
- @Override public long length() {
- return is.length();
- }
-
- /**
- * Constructor.
- *
- * @param is Base input stream.
- * @param range File range.
- * @throws IOException In case of exception.
- */
- public GridGgfsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException {
- this(is, range.start(), range.length());
- }
-
- /** {@inheritDoc} */
- @Override public int read() throws IOException {
- if (pos < maxLen) {
- int res = is.read();
-
- if (res != -1)
- pos++;
-
- return res;
- }
- else
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public int read(@NotNull byte[] b, int off, int len) throws IOException {
- if (pos < maxLen) {
- len = (int)Math.min(len, maxLen - pos);
-
- int res = is.read(b, off, len);
-
- if (res != -1)
- pos += res;
-
- return res;
- }
- else
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
- seek(pos);
-
- return read(buf, off, len);
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf) throws IOException {
- readFully(pos, buf, 0, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
- seek(pos);
-
- for (int readBytes = 0; readBytes < len;) {
- int read = read(buf, off + readBytes, len - readBytes);
-
- if (read == -1)
- throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos +
- ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
-
- readBytes += read;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void seek(long pos) throws IOException {
- if (pos < 0)
- throw new IOException("Seek position cannot be negative: " + pos);
-
- is.seek(start + pos);
-
- this.pos = pos;
- }
-
- /** {@inheritDoc} */
- @Override public long position() {
- return pos;
- }
-
- /**
- * Since range input stream represents a part of larger file stream, there is an offset at which this
- * range input stream starts in original input stream. This method returns start offset of this input
- * stream relative to original input stream.
- *
- * @return Start offset in original input stream.
- */
- public long startOffset() {
- return start;
- }
-
- /** {@inheritDoc} */
- @Override public int available() {
- long l = maxLen - pos;
-
- if (l < 0)
- return 0;
-
- if (l > Integer.MAX_VALUE)
- return Integer.MAX_VALUE;
-
- return (int)l;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- is.close();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsRangeInputStream.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java
deleted file mode 100644
index e128776..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.ggfs.mapreduce.records.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * GGFS record resolver. When {@link GridGgfsTask} is split into {@link IgniteFsJob}s each produced job will obtain
- * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual
- * execution in order to adjust record boundaries in a way consistent with user data.
- * <p>
- * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file.
- * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver
- * you can adjust job range so that it covers the whole line(s).
- * <p>
- * The following record resolvers are available out of the box:
- * <ul>
- * <li>{@link GridGgfsFixedLengthRecordResolver}</li>
- * <li>{@link GridGgfsByteDelimiterRecordResolver}</li>
- * <li>{@link GridGgfsStringDelimiterRecordResolver}</li>
- * <li>{@link GridGgfsNewLineRecordResolver}</li>
- * </ul>
- */
-public interface GridGgfsRecordResolver extends Serializable {
- /**
- * Adjusts record start offset and length.
- *
- * @param ggfs GGFS instance to use.
- * @param stream Input stream for split file.
- * @param suggestedRecord Suggested file system record.
- * @return New adjusted record. If this method returns {@code null}, original record is ignored.
- * @throws GridException If resolve failed.
- * @throws IOException If resolve failed.
- */
- @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
- IgniteFsFileRange suggestedRecord) throws GridException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java
deleted file mode 100644
index 35e16de..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.resources.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.processors.ggfs.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
- * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
- * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
- * {@link GridGgfsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method.
- * <p>
- * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
- * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size
- * is provided (either through {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()}
- * argument), then ranges could be further divided into smaller chunks.
- * <p>
- * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a
- * {@link IgniteFsJob}.
- * <p>
- * Finally all generated jobs are sent to Grid nodes for execution.
- * <p>
- * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step.
- * <p>
- * Here is an example of such a task:
- * <pre name="code" class="java">
- * public class WordCountTask extends GridGgfsTask<String, Integer> {
- * @Override
- * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException {
- * // New job will be created for each range within each file.
- * // We pass user-provided argument (which is essentially a word to look for) to that job.
- * return new WordCountJob(args.userArgument());
- * }
- *
- * // Aggregate results into one compound result.
- * public Integer reduce(List<GridComputeJobResult> results) throws GridException {
- * Integer total = 0;
- *
- * for (GridComputeJobResult res : results) {
- * Integer cnt = res.getData();
- *
- * // Null can be returned for non-existent file in case we decide to ignore such situations.
- * if (cnt != null)
- * total += cnt;
- * }
- *
- * return total;
- * }
- * }
- * </pre>
- */
-public abstract class GridGgfsTask<T, R> extends ComputeTaskAdapter<GridGgfsTaskArgs<T>, R> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Injected grid. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** {@inheritDoc} */
- @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable GridGgfsTaskArgs<T> args) throws GridException {
- assert ignite != null;
- assert args != null;
-
- IgniteFs ggfs = ignite.fileSystem(args.ggfsName());
- GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs();
-
- Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();
-
- Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid);
-
- for (IgniteFsPath path : args.paths()) {
- IgniteFsFile file = ggfs.info(path);
-
- if (file == null) {
- if (args.skipNonExistentFiles())
- continue;
- else
- throw new GridException("Failed to process GGFS file because it doesn't exist: " + path);
- }
-
- Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength());
-
- long totalLen = 0;
-
- for (IgniteFsBlockLocation loc : aff) {
- ClusterNode node = null;
-
- for (UUID nodeId : loc.nodeIds()) {
- node = nodes.get(nodeId);
-
- if (node != null)
- break;
- }
-
- if (node == null)
- throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc +
- ", subgrid=" + subgrid + ']');
-
- IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
-
- if (job != null) {
- ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(),
- loc.length(), args.recordResolver());
-
- splitMap.put(jobImpl, node);
- }
-
- totalLen += loc.length();
- }
-
- assert totalLen == file.length();
- }
-
- return splitMap;
- }
-
- /**
- * Callback invoked during task map procedure to create job that will process specified split
- * for GGFS file.
- *
- * @param path Path.
- * @param range File range based on consecutive blocks. This range will be further
- * realigned to record boundaries on destination node.
- * @param args Task argument.
- * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped.
- * @throws GridException If job creation failed.
- */
- @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
- GridGgfsTaskArgs<T> args) throws GridException;
-
- /**
- * Maps list by node ID.
- *
- * @param subgrid Subgrid.
- * @return Map.
- */
- private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) {
- Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size());
-
- for (ClusterNode node : subgrid)
- res.put(node.id(), node);
-
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
index fd36ffe..caa0b44 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
@@ -16,13 +16,13 @@ import java.util.*;
/**
* GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods,
* all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is
- * passed to {@link GridGgfsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method.
+ * passed to {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method.
* <p>
* Task arguments encapsulates the following data:
* <ul>
* <li>GGFS name</li>
* <li>File paths passed to {@code GridGgfs.execute()} method</li>
- * <li>{@link GridGgfsRecordResolver} for that task</li>
+ * <li>{@link IgniteFsRecordResolver} for that task</li>
* <li>Flag indicating whether to skip non-existent file paths or throw an exception</li>
* <li>User-defined task argument</li>
* <li>Maximum file range length for that task (see {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li>
@@ -48,7 +48,7 @@ public interface GridGgfsTaskArgs<T> {
*
* @return Record resolver.
*/
- public GridGgfsRecordResolver recordResolver();
+ public IgniteFsRecordResolver recordResolver();
/**
* Flag indicating whether to fail or simply skip non-existent files.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
index 9131b09..802b7a5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
@@ -14,10 +14,10 @@ import org.apache.ignite.compute.*;
import java.util.*;
/**
- * Convenient {@link GridGgfsTask} adapter with empty reduce step. Use this adapter in case you are not interested in
+ * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in
* results returned by jobs.
*/
-public abstract class GridGgfsTaskNoReduceAdapter<T, R> extends GridGgfsTask<T, R> {
+public abstract class GridGgfsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
index ddc04b3..28c0890 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java
@@ -29,7 +29,7 @@ public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
throws GridException, IOException {
in.seek(range.start());
- return execute(ggfs, new GridGgfsRangeInputStream(in, range));
+ return execute(ggfs, new IgniteFsRangeInputStream(in, range));
}
/**
@@ -41,5 +41,5 @@ public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
* @throws GridException If execution failed.
* @throws IOException If IO exception encountered while working with stream.
*/
- public abstract Object execute(IgniteFs ggfs, GridGgfsRangeInputStream in) throws GridException, IOException;
+ public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
index 4aa33ec..cfd69a9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java
@@ -16,8 +16,8 @@ import org.gridgain.grid.ggfs.*;
import java.io.*;
/**
- * Defines executable unit for {@link GridGgfsTask}. Before this job is executed, it is assigned one of the
- * ranges provided by the {@link GridGgfsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
+ * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the
+ * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
* <p>
* {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.gridgain.grid.ggfs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this
* job is expected to operate on, and already opened {@link org.gridgain.grid.ggfs.IgniteFsInputStream} for the file this range belongs to.
@@ -28,8 +28,8 @@ import java.io.*;
* <p>
* In majority of the cases, when you want to process only provided range, you should explicitly control amount
* of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates
- * on {@link GridGgfsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
- * {@link GridGgfsRangeInputStream}.
+ * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
+ * {@link IgniteFsRangeInputStream}.
* <p>
* You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java
new file mode 100644
index 0000000..8687798
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java
@@ -0,0 +1,189 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce;
+
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Decorator for regular {@link org.gridgain.grid.ggfs.IgniteFsInputStream} which streams only data within the given range.
+ * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create
+ * jobs which will be working only with the assigned range. You can also use it explicitly when
+ * working with {@link IgniteFsJob} directly.
+ */
+public final class IgniteFsRangeInputStream extends IgniteFsInputStream {
+ /** Base input stream. */
+ private final IgniteFsInputStream is;
+
+ /** Start position. */
+ private final long start;
+
+ /** Maximum stream length. */
+ private final long maxLen;
+
+ /** Current position within the stream. */
+ private long pos;
+
+ /**
+ * Constructor.
+ *
+ * @param is Base input stream.
+ * @param start Start position.
+ * @param maxLen Maximum stream length.
+ * @throws IOException In case of exception.
+ */
+ public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
+ if (is == null)
+ throw new IllegalArgumentException("Input stream cannot be null.");
+
+ if (start < 0)
+ throw new IllegalArgumentException("Start position cannot be negative.");
+
+ if (start >= is.length())
+ throw new IllegalArgumentException("Start position cannot be greater that file length.");
+
+ if (maxLen < 0)
+ throw new IllegalArgumentException("Length cannot be negative.");
+
+ if (start + maxLen > is.length())
+ throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length.");
+
+ this.is = is;
+ this.start = start;
+ this.maxLen = maxLen;
+
+ is.seek(start);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long length() {
+ return is.length();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param is Base input stream.
+ * @param range File range.
+ * @throws IOException In case of exception.
+ */
+ public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException {
+ this(is, range.start(), range.length());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ if (pos < maxLen) {
+ int res = is.read();
+
+ if (res != -1)
+ pos++;
+
+ return res;
+ }
+ else
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(@NotNull byte[] b, int off, int len) throws IOException {
+ if (pos < maxLen) {
+ len = (int)Math.min(len, maxLen - pos);
+
+ int res = is.read(b, off, len);
+
+ if (res != -1)
+ pos += res;
+
+ return res;
+ }
+ else
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+ seek(pos);
+
+ return read(buf, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf) throws IOException {
+ readFully(pos, buf, 0, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+ seek(pos);
+
+ for (int readBytes = 0; readBytes < len;) {
+ int read = read(buf, off + readBytes, len - readBytes);
+
+ if (read == -1)
+ throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos +
+ ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
+
+ readBytes += read;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seek(long pos) throws IOException {
+ if (pos < 0)
+ throw new IOException("Seek position cannot be negative: " + pos);
+
+ is.seek(start + pos);
+
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long position() {
+ return pos;
+ }
+
+ /**
+ * Since range input stream represents a part of larger file stream, there is an offset at which this
+ * range input stream starts in original input stream. This method returns start offset of this input
+ * stream relative to original input stream.
+ *
+ * @return Start offset in original input stream.
+ */
+ public long startOffset() {
+ return start;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int available() {
+ long l = maxLen - pos;
+
+ if (l < 0)
+ return 0;
+
+ if (l > Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+
+ return (int)l;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ is.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsRangeInputStream.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
new file mode 100644
index 0000000..fdddc06
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
@@ -0,0 +1,50 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.ggfs.mapreduce.records.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain
+ * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual
+ * execution in order to adjust record boundaries in a way consistent with user data.
+ * <p>
+ * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file.
+ * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver
+ * you can adjust job range so that it covers the whole line(s).
+ * <p>
+ * The following record resolvers are available out of the box:
+ * <ul>
+ * <li>{@link GridGgfsFixedLengthRecordResolver}</li>
+ * <li>{@link GridGgfsByteDelimiterRecordResolver}</li>
+ * <li>{@link GridGgfsStringDelimiterRecordResolver}</li>
+ * <li>{@link GridGgfsNewLineRecordResolver}</li>
+ * </ul>
+ */
+public interface IgniteFsRecordResolver extends Serializable {
+ /**
+ * Adjusts record start offset and length.
+ *
+ * @param ggfs GGFS instance to use.
+ * @param stream Input stream for split file.
+ * @param suggestedRecord Suggested file system record.
+ * @return New adjusted record. If this method returns {@code null}, original record is ignored.
+ * @throws GridException If resolve failed.
+ * @throws IOException If resolve failed.
+ */
+ @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ IgniteFsFileRange suggestedRecord) throws GridException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
new file mode 100644
index 0000000..0721d0b
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
@@ -0,0 +1,165 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.resources.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.ggfs.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
+ * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
+ * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
+ * {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method.
+ * <p>
+ * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
+ * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size
+ * is provided (either through {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()}
+ * argument), then ranges could be further divided into smaller chunks.
+ * <p>
+ * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a
+ * {@link IgniteFsJob}.
+ * <p>
+ * Finally all generated jobs are sent to Grid nodes for execution.
+ * <p>
+ * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step.
+ * <p>
+ * Here is an example of such a task:
+ * <pre name="code" class="java">
+ * public class WordCountTask extends GridGgfsTask<String, Integer> {
+ * @Override
+ * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException {
+ * // New job will be created for each range within each file.
+ * // We pass user-provided argument (which is essentially a word to look for) to that job.
+ * return new WordCountJob(args.userArgument());
+ * }
+ *
+ * // Aggregate results into one compound result.
+ * public Integer reduce(List<GridComputeJobResult> results) throws GridException {
+ * Integer total = 0;
+ *
+ * for (GridComputeJobResult res : results) {
+ * Integer cnt = res.getData();
+ *
+ * // Null can be returned for non-existent file in case we decide to ignore such situations.
+ * if (cnt != null)
+ * total += cnt;
+ * }
+ *
+ * return total;
+ * }
+ * }
+ * </pre>
+ */
+public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTaskArgs<T>, R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Injected grid. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable GridGgfsTaskArgs<T> args) throws GridException {
+ assert ignite != null;
+ assert args != null;
+
+ IgniteFs ggfs = ignite.fileSystem(args.ggfsName());
+ GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs();
+
+ Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();
+
+ Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid);
+
+ for (IgniteFsPath path : args.paths()) {
+ IgniteFsFile file = ggfs.info(path);
+
+ if (file == null) {
+ if (args.skipNonExistentFiles())
+ continue;
+ else
+ throw new GridException("Failed to process GGFS file because it doesn't exist: " + path);
+ }
+
+ Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength());
+
+ long totalLen = 0;
+
+ for (IgniteFsBlockLocation loc : aff) {
+ ClusterNode node = null;
+
+ for (UUID nodeId : loc.nodeIds()) {
+ node = nodes.get(nodeId);
+
+ if (node != null)
+ break;
+ }
+
+ if (node == null)
+ throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc +
+ ", subgrid=" + subgrid + ']');
+
+ IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
+
+ if (job != null) {
+ ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(),
+ loc.length(), args.recordResolver());
+
+ splitMap.put(jobImpl, node);
+ }
+
+ totalLen += loc.length();
+ }
+
+ assert totalLen == file.length();
+ }
+
+ return splitMap;
+ }
+
+ /**
+ * Callback invoked during task map procedure to create job that will process specified split
+ * for GGFS file.
+ *
+ * @param path Path.
+ * @param range File range based on consecutive blocks. This range will be further
+ * realigned to record boundaries on destination node.
+ * @param args Task argument.
+ * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped.
+ * @throws GridException If job creation failed.
+ */
+ @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
+ GridGgfsTaskArgs<T> args) throws GridException;
+
+ /**
+ * Maps list by node ID.
+ *
+ * @param subgrid Subgrid.
+ * @return Map.
+ */
+ private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) {
+ Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size());
+
+ for (ClusterNode node : subgrid)
+ res.put(node.id(), node);
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
index eb9de8d..808092e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
@@ -28,7 +28,7 @@ import java.util.*;
* <p>
* Note that you can use {@link GridGgfsStringDelimiterRecordResolver} if your delimiter is a plain string.
*/
-public class GridGgfsByteDelimiterRecordResolver implements GridGgfsRecordResolver, Externalizable {
+public class GridGgfsByteDelimiterRecordResolver implements IgniteFsRecordResolver, Externalizable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
index 3230062..1edeb1a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
@@ -21,7 +21,7 @@ import java.io.*;
* Record resolver which adjusts records to fixed length. That is, start offset of the record is shifted to the
* nearest position so that {@code newStart % length == 0}.
*/
-public class GridGgfsFixedLengthRecordResolver implements GridGgfsRecordResolver, Externalizable {
+public class GridGgfsFixedLengthRecordResolver implements IgniteFsRecordResolver, Externalizable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
index 6e4ad66..407249b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java
@@ -46,27 +46,27 @@ public class GridGgfsAsyncImpl extends IgniteAsyncSupportAdapter implements Grid
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg));
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws GridException {
return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
+ @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg));
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
+ @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) throws GridException {
return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
index 5145abd..dff827e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
@@ -1667,27 +1667,27 @@ public final class GridGgfsImpl implements GridGgfsEx {
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
return executeAsync(task, rslvr, paths, arg).get();
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws GridException {
return executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get();
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
+ @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
return executeAsync(taskCls, rslvr, paths, arg).get();
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
+ @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeSize, @Nullable T arg) throws GridException {
return executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get();
}
@@ -1701,7 +1701,7 @@ public final class GridGgfsImpl implements GridGgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteFuture<R> executeAsync(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ <T, R> IgniteFuture<R> executeAsync(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, @Nullable T arg) {
return executeAsync(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg);
}
@@ -1720,7 +1720,7 @@ public final class GridGgfsImpl implements GridGgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteFuture<R> executeAsync(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ <T, R> IgniteFuture<R> executeAsync(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
return ggfsCtx.kernalContext().task().execute(task, new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr,
skipNonExistentFiles, maxRangeLen, arg));
@@ -1735,8 +1735,8 @@ public final class GridGgfsImpl implements GridGgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteFuture<R> executeAsync(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) {
+ <T, R> IgniteFuture<R> executeAsync(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) {
return executeAsync(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg);
}
@@ -1753,10 +1753,10 @@ public final class GridGgfsImpl implements GridGgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteFuture<R> executeAsync(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
+ <T, R> IgniteFuture<R> executeAsync(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) {
- return ggfsCtx.kernalContext().task().execute((Class<GridGgfsTask<T, R>>)taskCls,
+ return ggfsCtx.kernalContext().task().execute((Class<IgniteFsTask<T, R>>)taskCls,
new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
index 2abbef7..4869103 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java
@@ -42,7 +42,7 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs
private long len;
/** Split resolver. */
- private GridGgfsRecordResolver rslvr;
+ private IgniteFsRecordResolver rslvr;
/** Injected grid. */
@IgniteInstanceResource
@@ -61,7 +61,7 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs
* @param rslvr GGFS split resolver.
*/
public GridGgfsJobImpl(IgniteFsJob job, String ggfsName, IgniteFsPath path, long start, long len,
- GridGgfsRecordResolver rslvr) {
+ IgniteFsRecordResolver rslvr) {
this.job = job;
this.ggfsName = ggfsName;
this.path = path;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
index 8a73665..99f817a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
@@ -223,7 +223,7 @@ public class GridGgfsProcessor extends GridGgfsProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path,
- long start, long length, GridGgfsRecordResolver recRslv) {
+ long start, long length, IgniteFsRecordResolver recRslv) {
return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
index 85ef324..066cd5c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java
@@ -68,5 +68,5 @@ public abstract class GridGgfsProcessorAdapter extends GridProcessorAdapter {
* @return Compute job.
*/
@Nullable public abstract ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path,
- long start, long length, GridGgfsRecordResolver recRslv);
+ long start, long length, IgniteFsRecordResolver recRslv);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
index 516e5a4..44cac9d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
@@ -30,7 +30,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz
private Collection<IgniteFsPath> paths;
/** Record resolver. */
- private GridGgfsRecordResolver recRslvr;
+ private IgniteFsRecordResolver recRslvr;
/** Skip non existent files flag. */
private boolean skipNonExistentFiles;
@@ -58,7 +58,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz
* @param maxRangeLen Maximum range length.
* @param usrArg User argument.
*/
- public GridGgfsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, GridGgfsRecordResolver recRslvr,
+ public GridGgfsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr,
boolean skipNonExistentFiles, long maxRangeLen, T usrArg) {
this.ggfsName = ggfsName;
this.paths = paths;
@@ -79,7 +79,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz
}
/** {@inheritDoc} */
- @Override public GridGgfsRecordResolver recordResolver() {
+ @Override public IgniteFsRecordResolver recordResolver() {
return recRslvr;
}
@@ -119,7 +119,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz
ggfsName = U.readString(in);
paths = U.readCollection(in);
- recRslvr = (GridGgfsRecordResolver)in.readObject();
+ recRslvr = (IgniteFsRecordResolver)in.readObject();
skipNonExistentFiles = in.readBoolean();
maxRangeLen = in.readLong();
usrArg = (T)in.readObject();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
index 5654a95..e73779a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java
@@ -57,7 +57,7 @@ public class GridNoopGgfsProcessor extends GridGgfsProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path,
- long start, long length, GridGgfsRecordResolver recRslv) {
+ long start, long length, IgniteFsRecordResolver recRslv) {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
index 9e3838a..9443ee7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
@@ -35,7 +35,7 @@ import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
- * Tests for {@link GridGgfsTask}.
+ * Tests for {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask}.
*/
public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest {
/** Predefined words dictionary. */
@@ -228,7 +228,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest {
/**
* Task.
*/
- private static class Task extends GridGgfsTask<String, IgniteBiTuple<Long, Integer>> {
+ private static class Task extends IgniteFsTask<String, IgniteBiTuple<Long, Integer>> {
/** {@inheritDoc} */
@Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
GridGgfsTaskArgs<String> args) throws GridException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
index b131f20..eca987f 100644
--- a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
@@ -872,28 +872,28 @@ public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstrac
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException {
return null;
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr,
+ @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws GridException {
return null;
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg)
+ @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg)
throws GridException {
return null;
}
/** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls,
- @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
+ @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+ @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) throws GridException {
return null;
}