You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 09:44:54 UTC
[27/38] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/243e521e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/243e521e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/243e521e
Branch: refs/heads/master
Commit: 243e521ec2f9ab5a01860fe0ce983d91a9d683d2
Parents: 7cd638f
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 11:26:58 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 11:26:58 2014 +0300
----------------------------------------------------------------------
.../examples/ggfs/GgfsMapReduceExample.java | 4 +-
.../grid/ggfs/mapreduce/GridGgfsTaskArgs.java | 74 ----
.../mapreduce/GridGgfsTaskNoReduceAdapter.java | 34 --
.../ggfs/mapreduce/IgniteFsRecordResolver.java | 9 +-
.../grid/ggfs/mapreduce/IgniteFsTask.java | 8 +-
.../grid/ggfs/mapreduce/IgniteFsTaskArgs.java | 74 ++++
.../mapreduce/IgniteFsTaskNoReduceAdapter.java | 34 ++
.../GridGgfsByteDelimiterRecordResolver.java | 340 -------------------
.../GridGgfsFixedLengthRecordResolver.java | 79 -----
.../records/GridGgfsNewLineRecordResolver.java | 58 ----
.../GridGgfsStringDelimiterRecordResolver.java | 76 -----
.../IgniteFsByteDelimiterRecordResolver.java | 340 +++++++++++++++++++
.../IgniteFsFixedLengthRecordResolver.java | 79 +++++
.../records/IgniteFsNewLineRecordResolver.java | 58 ++++
.../IgniteFsStringDelimiterRecordResolver.java | 76 +++++
.../kernal/processors/ggfs/GridGgfsImpl.java | 4 +-
.../processors/ggfs/GridGgfsTaskArgsImpl.java | 127 -------
.../processors/ggfs/IgniteFsTaskArgsImpl.java | 127 +++++++
.../processors/ggfs/GridGgfsTaskSelfTest.java | 6 +-
...GgfsByteDelimiterRecordResolverSelfTest.java | 8 +-
...idGgfsFixedLengthRecordResolverSelfTest.java | 8 +-
...sNewLineDelimiterRecordResolverSelfTest.java | 10 +-
...fsStringDelimiterRecordResolverSelfTest.java | 8 +-
23 files changed, 820 insertions(+), 821 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
index edd3f70..454a54e 100644
--- a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
+++ b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java
@@ -64,7 +64,7 @@ public class GgfsMapReduceExample {
writeFile(fs, fsPath, file);
- Collection<Line> lines = fs.execute(new GrepTask(), GridGgfsNewLineRecordResolver.NEW_LINE,
+ Collection<Line> lines = fs.execute(new GrepTask(), IgniteFsNewLineRecordResolver.NEW_LINE,
Collections.singleton(fsPath), regexStr);
if (lines.isEmpty()) {
@@ -122,7 +122,7 @@ public class GgfsMapReduceExample {
private static class GrepTask extends IgniteFsTask<String, Collection<Line>> {
/** {@inheritDoc} */
@Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
- GridGgfsTaskArgs<String> args) throws GridException {
+ IgniteFsTaskArgs<String> args) throws GridException {
return new GrepJob(args.userArgument());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
deleted file mode 100644
index caa0b44..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.gridgain.grid.ggfs.*;
-
-import java.util.*;
-
-/**
- * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods,
- * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is
- * passed to {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method.
- * <p>
- * Task arguments encapsulates the following data:
- * <ul>
- * <li>GGFS name</li>
- * <li>File paths passed to {@code GridGgfs.execute()} method</li>
- * <li>{@link IgniteFsRecordResolver} for that task</li>
- * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li>
- * <li>User-defined task argument</li>
- * <li>Maximum file range length for that task (see {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li>
- * </ul>
- */
-public interface GridGgfsTaskArgs<T> {
- /**
- * Gets GGFS name.
- *
- * @return GGFS name.
- */
- public String ggfsName();
-
- /**
- * Gets file paths to process.
- *
- * @return File paths to process.
- */
- public Collection<IgniteFsPath> paths();
-
- /**
- * Gets record resolver for the task.
- *
- * @return Record resolver.
- */
- public IgniteFsRecordResolver recordResolver();
-
- /**
- * Flag indicating whether to fail or simply skip non-existent files.
- *
- * @return {@code True} if non-existent files should be skipped.
- */
- public boolean skipNonExistentFiles();
-
- /**
- * User argument provided for task execution.
- *
- * @return User argument.
- */
- public T userArgument();
-
- /**
- * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including
- * all consecutive blocks will be used without any limitations.
- *
- * @return Maximum range length.
- */
- public long maxRangeLength();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
deleted file mode 100644
index 802b7a5..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce;
-
-import org.apache.ignite.compute.*;
-
-import java.util.*;
-
-/**
- * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in
- * results returned by jobs.
- */
-public abstract class GridGgfsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Default implementation which will ignore all results sent from execution nodes.
- *
- * @param results Received results of broadcasted remote executions. Note that if task class has
- * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty.
- * @return Will always return {@code null}.
- */
- @Override public R reduce(List<ComputeJobResult> results) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
index fdddc06..e9d254f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.ggfs.mapreduce;
import org.apache.ignite.*;
import org.gridgain.grid.*;
import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@ -28,10 +27,10 @@ import java.io.*;
* <p>
* The following record resolvers are available out of the box:
* <ul>
- * <li>{@link GridGgfsFixedLengthRecordResolver}</li>
- * <li>{@link GridGgfsByteDelimiterRecordResolver}</li>
- * <li>{@link GridGgfsStringDelimiterRecordResolver}</li>
- * <li>{@link GridGgfsNewLineRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li>
+ * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver}</li>
* </ul>
*/
public interface IgniteFsRecordResolver extends Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
index 0721d0b..edfdf03 100644
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java
@@ -26,7 +26,7 @@ import java.util.*;
* GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
* is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
* {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
- * {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method.
+ * {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
* <p>
* Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
* consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size
@@ -67,7 +67,7 @@ import java.util.*;
* }
* </pre>
*/
-public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTaskArgs<T>, R> {
+public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> {
/** */
private static final long serialVersionUID = 0L;
@@ -77,7 +77,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTask
/** {@inheritDoc} */
@Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable GridGgfsTaskArgs<T> args) throws GridException {
+ @Nullable IgniteFsTaskArgs<T> args) throws GridException {
assert ignite != null;
assert args != null;
@@ -146,7 +146,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTask
* @throws GridException If job creation failed.
*/
@Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
- GridGgfsTaskArgs<T> args) throws GridException;
+ IgniteFsTaskArgs<T> args) throws GridException;
/**
* Maps list by node ID.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java
new file mode 100644
index 0000000..4eb7757
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java
@@ -0,0 +1,74 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce;
+
+import org.gridgain.grid.ggfs.*;
+
+import java.util.*;
+
+/**
+ * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods,
+ * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is
+ * passed to {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
+ * <p>
+ * Task arguments encapsulates the following data:
+ * <ul>
+ * <li>GGFS name</li>
+ * <li>File paths passed to {@code GridGgfs.execute()} method</li>
+ * <li>{@link IgniteFsRecordResolver} for that task</li>
+ * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li>
+ * <li>User-defined task argument</li>
+ * <li>Maximum file range length for that task (see {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li>
+ * </ul>
+ */
+public interface IgniteFsTaskArgs<T> {
+ /**
+ * Gets GGFS name.
+ *
+ * @return GGFS name.
+ */
+ public String ggfsName();
+
+ /**
+ * Gets file paths to process.
+ *
+ * @return File paths to process.
+ */
+ public Collection<IgniteFsPath> paths();
+
+ /**
+ * Gets record resolver for the task.
+ *
+ * @return Record resolver.
+ */
+ public IgniteFsRecordResolver recordResolver();
+
+ /**
+ * Flag indicating whether to fail or simply skip non-existent files.
+ *
+ * @return {@code True} if non-existent files should be skipped.
+ */
+ public boolean skipNonExistentFiles();
+
+ /**
+ * User argument provided for task execution.
+ *
+ * @return User argument.
+ */
+ public T userArgument();
+
+ /**
+ * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including
+ * all consecutive blocks will be used without any limitations.
+ *
+ * @return Maximum range length.
+ */
+ public long maxRangeLength();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java
new file mode 100644
index 0000000..180d7a4
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java
@@ -0,0 +1,34 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce;
+
+import org.apache.ignite.compute.*;
+
+import java.util.*;
+
+/**
+ * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in
+ * results returned by jobs.
+ */
+public abstract class IgniteFsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default implementation which will ignore all results sent from execution nodes.
+ *
+ * @param results Received results of broadcasted remote executions. Note that if task class has
+ * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty.
+ * @return Will always return {@code null}.
+ */
+ @Override public R reduce(List<ComputeJobResult> results) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
deleted file mode 100644
index 808092e..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce.records;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Record resolver which adjusts records based on provided delimiters. Both start position and length are
- * shifted to the right, based on delimiter positions.
- * <p>
- * Note that you can use {@link GridGgfsStringDelimiterRecordResolver} if your delimiter is a plain string.
- */
-public class GridGgfsByteDelimiterRecordResolver implements IgniteFsRecordResolver, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Delimiters. */
- private byte[][] delims;
-
- /** Maximum delimiter length. */
- @GridToStringExclude
- private int maxDelimLen;
-
- /**
- * Empty constructor required for {@link Externalizable} support.
- */
- public GridGgfsByteDelimiterRecordResolver() {
- // No-op.
- }
-
- /**
- * Creates delimiter-based record resolver.
- *
- * @param delims Delimiters.
- */
- public GridGgfsByteDelimiterRecordResolver(byte[]... delims) {
- if (delims == null || delims.length == 0)
- throw new IllegalArgumentException("Delimiters cannot be null or empty.");
-
- this.delims = delims;
-
- int maxDelimLen = 0;
-
- for (byte[] delim : delims) {
- if (delim == null)
- throw new IllegalArgumentException("Delimiter cannot be null.");
- else if (maxDelimLen < delim.length)
- maxDelimLen = delim.length;
- }
-
- this.maxDelimLen = maxDelimLen;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
- IgniteFsFileRange suggestedRecord) throws GridException, IOException {
- long suggestedStart = suggestedRecord.start();
- long suggestedEnd = suggestedStart + suggestedRecord.length();
-
- IgniteBiTuple<State, Delimiter> firstDelim = findFirstDelimiter(stream, suggestedStart);
-
- State state = firstDelim != null ? firstDelim.getKey() : new State();
-
- Delimiter curDelim = firstDelim.getValue();
-
- while (curDelim != null && curDelim.end < suggestedStart)
- curDelim = nextDelimiter(stream, state);
-
- if (curDelim != null && (curDelim.end >= suggestedStart && curDelim.end < suggestedEnd) ||
- suggestedStart == 0 ) {
- // We found start delimiter.
- long start = suggestedStart == 0 ? 0 : curDelim.end;
-
- if (curDelim == null || curDelim.end < suggestedEnd) {
- IgniteBiTuple<State, Delimiter> lastDelim = findFirstDelimiter(stream, suggestedEnd);
-
- state = lastDelim != null ? firstDelim.getKey() : new State();
-
- curDelim = lastDelim.getValue();
-
- while (curDelim != null && curDelim.end < suggestedEnd)
- curDelim = nextDelimiter(stream, state);
- }
-
- long end = curDelim != null ? curDelim.end : stream.position();
-
- return new IgniteFsFileRange(suggestedRecord.path(), start, end - start);
- }
- else
- // We failed to find any delimiters up to the EOS.
- return null;
- }
-
- /**
- * Calculate maximum delimiters length.
- *
- * @param delims Delimiters.
- * @return Maximum delimiter length.
- */
- private int maxDelimiterLength(byte[][] delims) {
- int maxDelimLen = 0;
-
- for (byte[] delim : delims) {
- if (delim == null)
- throw new IllegalArgumentException("Delimiter cannot be null.");
- else if (maxDelimLen < delim.length)
- maxDelimLen = delim.length;
- }
-
- return maxDelimLen;
- }
-
- /**
- * Find first delimiter. In order to achieve this we have to rewind the stream until we find the delimiter
- * which stands at least [maxDelimLen] from the start search position or until we faced stream start.
- * Otherwise we cannot be sure that delimiter position is determined correctly.
- *
- * @param stream GGFS input stream.
- * @param startPos Start search position.
- * @return The first found delimiter.
- * @throws IOException In case of IO exception.
- */
- @Nullable private IgniteBiTuple<State, Delimiter> findFirstDelimiter(IgniteFsInputStream stream, long startPos)
- throws IOException {
- State state;
- Delimiter delim;
-
- long curPos = Math.max(0, startPos - maxDelimLen);
-
- while (true) {
- stream.seek(curPos);
-
- state = new State();
-
- delim = nextDelimiter(stream, state);
-
- if (curPos == 0 || delim == null || delim.start - curPos > maxDelimLen - 1)
- break;
- else
- curPos = Math.max(0, curPos - maxDelimLen);
- }
-
- return F.t(state, delim);
- }
-
- /**
- * Resolve next delimiter.
- *
- * @param is GGFS input stream.
- * @param state Current state.
- * @return Next delimiter and updated map.
- * @throws IOException In case of exception.
- */
- private Delimiter nextDelimiter(IgniteFsInputStream is, State state) throws IOException {
- assert is != null;
- assert state != null;
-
- Map<Integer, Integer> parts = state.parts;
- LinkedList<Delimiter> delimQueue = state.delims;
-
- int nextByte = is.read();
-
- while (nextByte != -1) {
- // Process read byte.
- for (int idx = 0; idx < delims.length; idx++) {
- byte[] delim = delims[idx];
-
- int val = parts.containsKey(idx) ? parts.get(idx) : 0;
-
- if (delim[val] == nextByte) {
- if (val == delim.length - 1) {
- // Full delimiter is found.
- parts.remove(idx);
-
- Delimiter newDelim = new Delimiter(is.position() - delim.length, is.position());
-
- // Read queue from the end looking for the "inner" delimiters.
- boolean ignore = false;
-
- int replaceIdx = -1;
-
- for (int i = delimQueue.size() - 1; i >= 0; i--) {
- Delimiter prevDelim = delimQueue.get(i);
-
- if (prevDelim.start < newDelim.start) {
- if (prevDelim.end > newDelim.start) {
- // Ignore this delimiter.
- ignore = true;
-
- break;
- }
- }
- else if (prevDelim.start == newDelim.start) {
- // Ok, we found matching delimiter.
- replaceIdx = i;
-
- break;
- }
- }
-
- if (!ignore) {
- if (replaceIdx >= 0)
- delimQueue.removeAll(delimQueue.subList(replaceIdx, delimQueue.size()));
-
- delimQueue.add(newDelim);
- }
- }
- else
- parts.put(idx, ++val);
- }
- else if (val != 0) {
- if (delim[0] == nextByte) {
- boolean shift = true;
-
- for (int k = 1; k < val; k++) {
- if (delim[k] != nextByte) {
- shift = false;
-
- break;
- }
- }
-
- if (!shift)
- parts.put(idx, 1);
- }
- else
- // Delimiter sequence is totally broken.
- parts.remove(idx);
- }
- }
-
- // Check whether we can be sure that the first delimiter will not change.
- if (!delimQueue.isEmpty()) {
- Delimiter delim = delimQueue.get(0);
-
- if (is.position() - delim.end >= maxDelimLen)
- return delimQueue.poll();
- }
-
- nextByte = is.read();
- }
-
- return delimQueue.poll();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsByteDelimiterRecordResolver.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- if (delims != null) {
- out.writeBoolean(true);
-
- out.writeInt(delims.length);
-
- for (byte[] delim : delims)
- U.writeByteArray(out, delim);
- }
- else
- out.writeBoolean(false);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- if (in.readBoolean()) {
- int len = in.readInt();
-
- delims = new byte[len][];
-
- for (int i = 0; i < len; i++)
- delims[i] = U.readByteArray(in);
-
- maxDelimLen = maxDelimiterLength(delims);
- }
- }
-
- /**
- * Delimiter descriptor.
- */
- private static class Delimiter {
- /** Delimiter start position. */
- private final long start;
-
- /** Delimiter end position. */
- private final long end;
-
- /**
- * Constructor.
- *
- * @param start Delimiter start position.
- * @param end Delimiter end position.
- */
- private Delimiter(long start, long end) {
- assert start >= 0 && end >= 0 && start <= end;
-
- this.start = start;
- this.end = end;
- }
- }
-
- /**
- * Current resolution state.
- */
- private static class State {
- /** Partially resolved delimiters. */
- private final Map<Integer, Integer> parts;
-
- /** Resolved delimiters which could potentially be merged. */
- private final LinkedList<Delimiter> delims;
-
- /**
- * Constructor.
- */
- private State() {
- parts = new HashMap<>();
-
- delims = new LinkedList<>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
deleted file mode 100644
index 1edeb1a..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce.records;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Record resolver which adjusts records to fixed length. That is, start offset of the record is shifted to the
- * nearest position so that {@code newStart % length == 0}.
- */
-public class GridGgfsFixedLengthRecordResolver implements IgniteFsRecordResolver, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Record length. */
- private long recLen;
-
- /**
- * Empty constructor required for {@link Externalizable} support.
- */
- public GridGgfsFixedLengthRecordResolver() {
- // No-op.
- }
-
- /**
- * Creates fixed-length record resolver.
- *
- * @param recLen Record length.
- */
- public GridGgfsFixedLengthRecordResolver(long recLen) {
- this.recLen = recLen;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
- IgniteFsFileRange suggestedRecord)
- throws GridException, IOException {
- long suggestedEnd = suggestedRecord.start() + suggestedRecord.length();
-
- long startRem = suggestedRecord.start() % recLen;
- long endRem = suggestedEnd % recLen;
-
- long start = Math.min(suggestedRecord.start() + (startRem != 0 ? (recLen - startRem) : 0),
- stream.length());
- long end = Math.min(suggestedEnd + (endRem != 0 ? (recLen - endRem) : 0), stream.length());
-
- assert end >= start;
-
- return start != end ? new IgniteFsFileRange(suggestedRecord.path(), start, end - start) : null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsFixedLengthRecordResolver.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(recLen);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- recLen = in.readLong();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java
deleted file mode 100644
index 808759f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsNewLineRecordResolver.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce.records;
-
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Record resolver based on new line detection. This resolver can detect new lines based on '\n' or '\r\n' sequences.
- * <p>
- * Note that this resolver cannot be created and has one constant implementations: {@link #NEW_LINE}.
- */
-public class GridGgfsNewLineRecordResolver extends GridGgfsByteDelimiterRecordResolver {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Singleton new line resolver. This resolver will resolve records based on new lines
- * regardless if they have '\n' or '\r\n' patterns.
- */
- public static final GridGgfsNewLineRecordResolver NEW_LINE = new GridGgfsNewLineRecordResolver(true);
-
- /** CR symbol. */
- public static final byte SYM_CR = 0x0D;
-
- /** LF symbol. */
- public static final byte SYM_LF = 0x0A;
-
- /**
- * Empty constructor required for {@link Externalizable} support.
- */
- public GridGgfsNewLineRecordResolver() {
- // No-op.
- }
-
- /**
- * Creates new-line record resolver.
- *
- * @param b Artificial flag to differentiate from empty constructor.
- */
- @SuppressWarnings("UnusedParameters")
- private GridGgfsNewLineRecordResolver(boolean b) {
- super(new byte[] { SYM_CR, SYM_LF }, new byte[] { SYM_LF });
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsNewLineRecordResolver.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java
deleted file mode 100644
index 504b7e9..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsStringDelimiterRecordResolver.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.ggfs.mapreduce.records;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.nio.charset.*;
-
-/**
- * Record resolver based on delimiters represented as strings. Works in the same way as
- * {@link GridGgfsByteDelimiterRecordResolver}, but uses strings as delimiters instead of byte arrays.
- */
-public class GridGgfsStringDelimiterRecordResolver extends GridGgfsByteDelimiterRecordResolver {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Converts string delimiters to byte delimiters.
- *
- * @param charset Charset.
- * @param delims String delimiters.
- * @return Byte delimiters.
- */
- @Nullable private static byte[][] toBytes(Charset charset, @Nullable String... delims) {
- byte[][] res = null;
-
- if (delims != null) {
- res = new byte[delims.length][];
-
- for (int i = 0; i < delims.length; i++)
- res[i] = delims[i].getBytes(charset);
- }
-
- return res;
- }
-
- /**
- * Empty constructor required for {@link Externalizable} support.
- */
- public GridGgfsStringDelimiterRecordResolver() {
- // No-op.
- }
-
- /**
- * Creates record resolver from given string and given charset.
- *
- * @param delims Delimiters.
- * @param charset Charset.
- */
- public GridGgfsStringDelimiterRecordResolver(Charset charset, String... delims) {
- super(toBytes(charset, delims));
- }
-
- /**
- * Creates record resolver based on given string with default charset.
- *
- * @param delims Delimiters.
- */
- public GridGgfsStringDelimiterRecordResolver(String... delims) {
- super(toBytes(Charset.defaultCharset(), delims));
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsStringDelimiterRecordResolver.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
new file mode 100644
index 0000000..79f928e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
@@ -0,0 +1,340 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce.records;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.ggfs.mapreduce.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Record resolver which adjusts records based on provided delimiters. Both start position and length are
+ * shifted to the right, based on delimiter positions.
+ * <p>
+ * Note that you can use {@link IgniteFsStringDelimiterRecordResolver} if your delimiter is a plain string.
+ */
+public class IgniteFsByteDelimiterRecordResolver implements IgniteFsRecordResolver, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Delimiters. */
+ private byte[][] delims;
+
+ /** Maximum delimiter length. */
+ @GridToStringExclude
+ private int maxDelimLen;
+
+ /**
+ * Empty constructor required for {@link Externalizable} support.
+ */
+ public IgniteFsByteDelimiterRecordResolver() {
+ // No-op.
+ }
+
+ /**
+ * Creates delimiter-based record resolver.
+ *
+ * @param delims Delimiters.
+ */
+ public IgniteFsByteDelimiterRecordResolver(byte[]... delims) {
+ if (delims == null || delims.length == 0)
+ throw new IllegalArgumentException("Delimiters cannot be null or empty.");
+
+ this.delims = delims;
+
+ int maxDelimLen = 0;
+
+ for (byte[] delim : delims) {
+ if (delim == null)
+ throw new IllegalArgumentException("Delimiter cannot be null.");
+ else if (maxDelimLen < delim.length)
+ maxDelimLen = delim.length;
+ }
+
+ this.maxDelimLen = maxDelimLen;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ IgniteFsFileRange suggestedRecord) throws GridException, IOException {
+ long suggestedStart = suggestedRecord.start();
+ long suggestedEnd = suggestedStart + suggestedRecord.length();
+
+ IgniteBiTuple<State, Delimiter> firstDelim = findFirstDelimiter(stream, suggestedStart);
+
+ State state = firstDelim != null ? firstDelim.getKey() : new State();
+
+ Delimiter curDelim = firstDelim.getValue();
+
+ while (curDelim != null && curDelim.end < suggestedStart)
+ curDelim = nextDelimiter(stream, state);
+
+ if (curDelim != null && (curDelim.end >= suggestedStart && curDelim.end < suggestedEnd) ||
+ suggestedStart == 0 ) {
+ // We found start delimiter.
+ long start = suggestedStart == 0 ? 0 : curDelim.end;
+
+ if (curDelim == null || curDelim.end < suggestedEnd) {
+ IgniteBiTuple<State, Delimiter> lastDelim = findFirstDelimiter(stream, suggestedEnd);
+
+ state = lastDelim != null ? firstDelim.getKey() : new State();
+
+ curDelim = lastDelim.getValue();
+
+ while (curDelim != null && curDelim.end < suggestedEnd)
+ curDelim = nextDelimiter(stream, state);
+ }
+
+ long end = curDelim != null ? curDelim.end : stream.position();
+
+ return new IgniteFsFileRange(suggestedRecord.path(), start, end - start);
+ }
+ else
+ // We failed to find any delimiters up to the EOS.
+ return null;
+ }
+
+ /**
+ * Calculate maximum delimiters length.
+ *
+ * @param delims Delimiters.
+ * @return Maximum delimiter length.
+ */
+ private int maxDelimiterLength(byte[][] delims) {
+ int maxDelimLen = 0;
+
+ for (byte[] delim : delims) {
+ if (delim == null)
+ throw new IllegalArgumentException("Delimiter cannot be null.");
+ else if (maxDelimLen < delim.length)
+ maxDelimLen = delim.length;
+ }
+
+ return maxDelimLen;
+ }
+
+ /**
+ * Find first delimiter. In order to achieve this we have to rewind the stream until we find the delimiter
+ * which stands at least [maxDelimLen] from the start search position or until we faced stream start.
+ * Otherwise we cannot be sure that delimiter position is determined correctly.
+ *
+ * @param stream GGFS input stream.
+ * @param startPos Start search position.
+ * @return The first found delimiter.
+ * @throws IOException In case of IO exception.
+ */
+ @Nullable private IgniteBiTuple<State, Delimiter> findFirstDelimiter(IgniteFsInputStream stream, long startPos)
+ throws IOException {
+ State state;
+ Delimiter delim;
+
+ long curPos = Math.max(0, startPos - maxDelimLen);
+
+ while (true) {
+ stream.seek(curPos);
+
+ state = new State();
+
+ delim = nextDelimiter(stream, state);
+
+ if (curPos == 0 || delim == null || delim.start - curPos > maxDelimLen - 1)
+ break;
+ else
+ curPos = Math.max(0, curPos - maxDelimLen);
+ }
+
+ return F.t(state, delim);
+ }
+
+ /**
+ * Resolve next delimiter.
+ *
+ * @param is GGFS input stream.
+ * @param state Current state.
+ * @return Next delimiter and updated map.
+ * @throws IOException In case of exception.
+ */
+ private Delimiter nextDelimiter(IgniteFsInputStream is, State state) throws IOException {
+ assert is != null;
+ assert state != null;
+
+ Map<Integer, Integer> parts = state.parts;
+ LinkedList<Delimiter> delimQueue = state.delims;
+
+ int nextByte = is.read();
+
+ while (nextByte != -1) {
+ // Process read byte.
+ for (int idx = 0; idx < delims.length; idx++) {
+ byte[] delim = delims[idx];
+
+ int val = parts.containsKey(idx) ? parts.get(idx) : 0;
+
+ if (delim[val] == nextByte) {
+ if (val == delim.length - 1) {
+ // Full delimiter is found.
+ parts.remove(idx);
+
+ Delimiter newDelim = new Delimiter(is.position() - delim.length, is.position());
+
+ // Read queue from the end looking for the "inner" delimiters.
+ boolean ignore = false;
+
+ int replaceIdx = -1;
+
+ for (int i = delimQueue.size() - 1; i >= 0; i--) {
+ Delimiter prevDelim = delimQueue.get(i);
+
+ if (prevDelim.start < newDelim.start) {
+ if (prevDelim.end > newDelim.start) {
+ // Ignore this delimiter.
+ ignore = true;
+
+ break;
+ }
+ }
+ else if (prevDelim.start == newDelim.start) {
+ // Ok, we found matching delimiter.
+ replaceIdx = i;
+
+ break;
+ }
+ }
+
+ if (!ignore) {
+ if (replaceIdx >= 0)
+ delimQueue.removeAll(delimQueue.subList(replaceIdx, delimQueue.size()));
+
+ delimQueue.add(newDelim);
+ }
+ }
+ else
+ parts.put(idx, ++val);
+ }
+ else if (val != 0) {
+ if (delim[0] == nextByte) {
+ boolean shift = true;
+
+ for (int k = 1; k < val; k++) {
+ if (delim[k] != nextByte) {
+ shift = false;
+
+ break;
+ }
+ }
+
+ if (!shift)
+ parts.put(idx, 1);
+ }
+ else
+ // Delimiter sequence is totally broken.
+ parts.remove(idx);
+ }
+ }
+
+ // Check whether we can be sure that the first delimiter will not change.
+ if (!delimQueue.isEmpty()) {
+ Delimiter delim = delimQueue.get(0);
+
+ if (is.position() - delim.end >= maxDelimLen)
+ return delimQueue.poll();
+ }
+
+ nextByte = is.read();
+ }
+
+ return delimQueue.poll();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsByteDelimiterRecordResolver.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ if (delims != null) {
+ out.writeBoolean(true);
+
+ out.writeInt(delims.length);
+
+ for (byte[] delim : delims)
+ U.writeByteArray(out, delim);
+ }
+ else
+ out.writeBoolean(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ if (in.readBoolean()) {
+ int len = in.readInt();
+
+ delims = new byte[len][];
+
+ for (int i = 0; i < len; i++)
+ delims[i] = U.readByteArray(in);
+
+ maxDelimLen = maxDelimiterLength(delims);
+ }
+ }
+
+ /**
+ * Delimiter descriptor.
+ */
+ private static class Delimiter {
+ /** Delimiter start position. */
+ private final long start;
+
+ /** Delimiter end position. */
+ private final long end;
+
+ /**
+ * Constructor.
+ *
+ * @param start Delimiter start position.
+ * @param end Delimiter end position.
+ */
+ private Delimiter(long start, long end) {
+ assert start >= 0 && end >= 0 && start <= end;
+
+ this.start = start;
+ this.end = end;
+ }
+ }
+
+ /**
+ * Current resolution state.
+ */
+ private static class State {
+ /** Partially resolved delimiters. */
+ private final Map<Integer, Integer> parts;
+
+ /** Resolved delimiters which could potentially be merged. */
+ private final LinkedList<Delimiter> delims;
+
+ /**
+ * Constructor.
+ */
+ private State() {
+ parts = new HashMap<>();
+
+ delims = new LinkedList<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
new file mode 100644
index 0000000..6190207
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
@@ -0,0 +1,79 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce.records;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.ggfs.mapreduce.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Record resolver which adjusts records to fixed length. That is, start offset of the record is shifted to the
+ * nearest position so that {@code newStart % length == 0}.
+ */
+public class IgniteFsFixedLengthRecordResolver implements IgniteFsRecordResolver, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Record length. */
+ private long recLen;
+
+ /**
+ * Empty constructor required for {@link Externalizable} support.
+ */
+ public IgniteFsFixedLengthRecordResolver() {
+ // No-op.
+ }
+
+ /**
+ * Creates fixed-length record resolver.
+ *
+ * @param recLen Record length.
+ */
+ public IgniteFsFixedLengthRecordResolver(long recLen) {
+ this.recLen = recLen;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ IgniteFsFileRange suggestedRecord)
+ throws GridException, IOException {
+ long suggestedEnd = suggestedRecord.start() + suggestedRecord.length();
+
+ long startRem = suggestedRecord.start() % recLen;
+ long endRem = suggestedEnd % recLen;
+
+ long start = Math.min(suggestedRecord.start() + (startRem != 0 ? (recLen - startRem) : 0),
+ stream.length());
+ long end = Math.min(suggestedEnd + (endRem != 0 ? (recLen - endRem) : 0), stream.length());
+
+ assert end >= start;
+
+ return start != end ? new IgniteFsFileRange(suggestedRecord.path(), start, end - start) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsFixedLengthRecordResolver.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(recLen);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ recLen = in.readLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java
new file mode 100644
index 0000000..81f359e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsNewLineRecordResolver.java
@@ -0,0 +1,58 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce.records;
+
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Record resolver based on new line detection. This resolver can detect new lines based on '\n' or '\r\n' sequences.
+ * <p>
+ * Note that this resolver cannot be created and has one constant implementations: {@link #NEW_LINE}.
+ */
+public class IgniteFsNewLineRecordResolver extends IgniteFsByteDelimiterRecordResolver {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Singleton new line resolver. This resolver will resolve records based on new lines
+ * regardless if they have '\n' or '\r\n' patterns.
+ */
+ public static final IgniteFsNewLineRecordResolver NEW_LINE = new IgniteFsNewLineRecordResolver(true);
+
+ /** CR symbol. */
+ public static final byte SYM_CR = 0x0D;
+
+ /** LF symbol. */
+ public static final byte SYM_LF = 0x0A;
+
+ /**
+ * Empty constructor required for {@link Externalizable} support.
+ */
+ public IgniteFsNewLineRecordResolver() {
+ // No-op.
+ }
+
+ /**
+ * Creates new-line record resolver.
+ *
+ * @param b Artificial flag to differentiate from empty constructor.
+ */
+ @SuppressWarnings("UnusedParameters")
+ private IgniteFsNewLineRecordResolver(boolean b) {
+ super(new byte[] { SYM_CR, SYM_LF }, new byte[] { SYM_LF });
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsNewLineRecordResolver.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java
new file mode 100644
index 0000000..3a42333
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsStringDelimiterRecordResolver.java
@@ -0,0 +1,76 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.ggfs.mapreduce.records;
+
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.charset.*;
+
+/**
+ * Record resolver based on delimiters represented as strings. Works in the same way as
+ * {@link IgniteFsByteDelimiterRecordResolver}, but uses strings as delimiters instead of byte arrays.
+ */
+public class IgniteFsStringDelimiterRecordResolver extends IgniteFsByteDelimiterRecordResolver {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Converts string delimiters to byte delimiters.
+ *
+ * @param charset Charset.
+ * @param delims String delimiters.
+ * @return Byte delimiters.
+ */
+ @Nullable private static byte[][] toBytes(Charset charset, @Nullable String... delims) {
+ byte[][] res = null;
+
+ if (delims != null) {
+ res = new byte[delims.length][];
+
+ for (int i = 0; i < delims.length; i++)
+ res[i] = delims[i].getBytes(charset);
+ }
+
+ return res;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable} support.
+ */
+ public IgniteFsStringDelimiterRecordResolver() {
+ // No-op.
+ }
+
+ /**
+ * Creates record resolver from given string and given charset.
+ *
+ * @param delims Delimiters.
+ * @param charset Charset.
+ */
+ public IgniteFsStringDelimiterRecordResolver(Charset charset, String... delims) {
+ super(toBytes(charset, delims));
+ }
+
+ /**
+ * Creates record resolver based on given string with default charset.
+ *
+ * @param delims Delimiters.
+ */
+ public IgniteFsStringDelimiterRecordResolver(String... delims) {
+ super(toBytes(Charset.defaultCharset(), delims));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsStringDelimiterRecordResolver.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
index dff827e..5d4f4ad 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java
@@ -1722,7 +1722,7 @@ public final class GridGgfsImpl implements GridGgfsEx {
*/
<T, R> IgniteFuture<R> executeAsync(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
- return ggfsCtx.kernalContext().task().execute(task, new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr,
+ return ggfsCtx.kernalContext().task().execute(task, new IgniteFsTaskArgsImpl<>(cfg.getName(), paths, rslvr,
skipNonExistentFiles, maxRangeLen, arg));
}
@@ -1757,7 +1757,7 @@ public final class GridGgfsImpl implements GridGgfsEx {
@Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) {
return ggfsCtx.kernalContext().task().execute((Class<IgniteFsTask<T, R>>)taskCls,
- new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg));
+ new IgniteFsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
deleted file mode 100644
index 44cac9d..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.ggfs.mapreduce.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * GGFS task arguments implementation.
- */
-public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** GGFS name. */
- private String ggfsName;
-
- /** Paths. */
- private Collection<IgniteFsPath> paths;
-
- /** Record resolver. */
- private IgniteFsRecordResolver recRslvr;
-
- /** Skip non existent files flag. */
- private boolean skipNonExistentFiles;
-
- /** Maximum range length. */
- private long maxRangeLen;
-
- /** User argument. */
- private T usrArg;
-
- /**
- * {@link Externalizable} support.
- */
- public GridGgfsTaskArgsImpl() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param ggfsName GGFS name.
- * @param paths Paths.
- * @param recRslvr Record resolver.
- * @param skipNonExistentFiles Skip non existent files flag.
- * @param maxRangeLen Maximum range length.
- * @param usrArg User argument.
- */
- public GridGgfsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr,
- boolean skipNonExistentFiles, long maxRangeLen, T usrArg) {
- this.ggfsName = ggfsName;
- this.paths = paths;
- this.recRslvr = recRslvr;
- this.skipNonExistentFiles = skipNonExistentFiles;
- this.maxRangeLen = maxRangeLen;
- this.usrArg = usrArg;
- }
-
- /** {@inheritDoc} */
- @Override public String ggfsName() {
- return ggfsName;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgniteFsPath> paths() {
- return paths;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFsRecordResolver recordResolver() {
- return recRslvr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipNonExistentFiles() {
- return skipNonExistentFiles;
- }
-
- /** {@inheritDoc} */
- @Override public long maxRangeLength() {
- return maxRangeLen;
- }
-
- /** {@inheritDoc} */
- @Override public T userArgument() {
- return usrArg;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsTaskArgsImpl.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, ggfsName);
- U.writeCollection(out, paths);
-
- out.writeObject(recRslvr);
- out.writeBoolean(skipNonExistentFiles);
- out.writeLong(maxRangeLen);
- out.writeObject(usrArg);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- ggfsName = U.readString(in);
- paths = U.readCollection(in);
-
- recRslvr = (IgniteFsRecordResolver)in.readObject();
- skipNonExistentFiles = in.readBoolean();
- maxRangeLen = in.readLong();
- usrArg = (T)in.readObject();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
new file mode 100644
index 0000000..1edaac8
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java
@@ -0,0 +1,127 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.ggfs;
+
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.ggfs.mapreduce.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * GGFS task arguments implementation.
+ */
+public class IgniteFsTaskArgsImpl<T> implements IgniteFsTaskArgs<T>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** GGFS name. */
+ private String ggfsName;
+
+ /** Paths. */
+ private Collection<IgniteFsPath> paths;
+
+ /** Record resolver. */
+ private IgniteFsRecordResolver recRslvr;
+
+ /** Skip non existent files flag. */
+ private boolean skipNonExistentFiles;
+
+ /** Maximum range length. */
+ private long maxRangeLen;
+
+ /** User argument. */
+ private T usrArg;
+
+ /**
+ * {@link Externalizable} support.
+ */
+ public IgniteFsTaskArgsImpl() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ggfsName GGFS name.
+ * @param paths Paths.
+ * @param recRslvr Record resolver.
+ * @param skipNonExistentFiles Skip non existent files flag.
+ * @param maxRangeLen Maximum range length.
+ * @param usrArg User argument.
+ */
+ public IgniteFsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr,
+ boolean skipNonExistentFiles, long maxRangeLen, T usrArg) {
+ this.ggfsName = ggfsName;
+ this.paths = paths;
+ this.recRslvr = recRslvr;
+ this.skipNonExistentFiles = skipNonExistentFiles;
+ this.maxRangeLen = maxRangeLen;
+ this.usrArg = usrArg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String ggfsName() {
+ return ggfsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteFsPath> paths() {
+ return paths;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFsRecordResolver recordResolver() {
+ return recRslvr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipNonExistentFiles() {
+ return skipNonExistentFiles;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long maxRangeLength() {
+ return maxRangeLen;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T userArgument() {
+ return usrArg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsTaskArgsImpl.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, ggfsName);
+ U.writeCollection(out, paths);
+
+ out.writeObject(recRslvr);
+ out.writeBoolean(skipNonExistentFiles);
+ out.writeLong(maxRangeLen);
+ out.writeObject(usrArg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ ggfsName = U.readString(in);
+ paths = U.readCollection(in);
+
+ recRslvr = (IgniteFsRecordResolver)in.readObject();
+ skipNonExistentFiles = in.readBoolean();
+ maxRangeLen = in.readLong();
+ usrArg = (T)in.readObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
index 9443ee7..d962109 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java
@@ -148,7 +148,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest {
Long genLen = ggfs.info(FILE).length();
IgniteBiTuple<Long, Integer> taskRes = ggfs.execute(new Task(),
- new GridGgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg);
+ new IgniteFsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg);
assert F.eq(genLen, taskRes.getKey());
assert F.eq(TOTAL_WORDS, taskRes.getValue());
@@ -176,7 +176,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest {
Long genLen = ggfs.info(FILE).length();
assertNull(ggfsAsync.execute(
- new Task(), new GridGgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg));
+ new Task(), new IgniteFsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg));
IgniteFuture<IgniteBiTuple<Long, Integer>> fut = ggfsAsync.future();
@@ -231,7 +231,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest {
private static class Task extends IgniteFsTask<String, IgniteBiTuple<Long, Integer>> {
/** {@inheritDoc} */
@Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
- GridGgfsTaskArgs<String> args) throws GridException {
+ IgniteFsTaskArgs<String> args) throws GridException {
return new Job();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
index b88ccc2..0c20431 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java
@@ -278,7 +278,7 @@ public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstrac
byte[]... delims) throws Exception {
write(data);
- GridGgfsByteDelimiterRecordResolver rslvr = resolver(delims);
+ IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims);
IgniteFsFileRange split;
@@ -304,7 +304,7 @@ public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstrac
throws Exception {
write(data);
- GridGgfsByteDelimiterRecordResolver rslvr = resolver(delims);
+ IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims);
IgniteFsFileRange split;
@@ -321,7 +321,7 @@ public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstrac
* @param delims Delimiters.
* @return Resolver.
*/
- private GridGgfsByteDelimiterRecordResolver resolver(byte[]... delims) {
- return new GridGgfsByteDelimiterRecordResolver(delims);
+ private IgniteFsByteDelimiterRecordResolver resolver(byte[]... delims) {
+ return new IgniteFsByteDelimiterRecordResolver(delims);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
index 287e119..98b0abc 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java
@@ -90,7 +90,7 @@ public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractR
throws Exception {
write(data);
- GridGgfsFixedLengthRecordResolver rslvr = resolver(len);
+ IgniteFsFixedLengthRecordResolver rslvr = resolver(len);
IgniteFsFileRange split;
@@ -116,7 +116,7 @@ public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractR
throws Exception {
write(data);
- GridGgfsFixedLengthRecordResolver rslvr = resolver(len);
+ IgniteFsFixedLengthRecordResolver rslvr = resolver(len);
IgniteFsFileRange split;
@@ -133,7 +133,7 @@ public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractR
* @param len Length.
* @return Resolver.
*/
- private GridGgfsFixedLengthRecordResolver resolver(int len) {
- return new GridGgfsFixedLengthRecordResolver(len);
+ private IgniteFsFixedLengthRecordResolver resolver(int len) {
+ return new IgniteFsFixedLengthRecordResolver(len);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
index 49328bb..4d3aae2 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java
@@ -14,7 +14,7 @@ import org.gridgain.grid.ggfs.mapreduce.*;
import org.gridgain.grid.ggfs.mapreduce.records.*;
import org.gridgain.grid.util.typedef.*;
-import static org.gridgain.grid.ggfs.mapreduce.records.GridGgfsNewLineRecordResolver.*;
+import static org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver.*;
/**
* New line split resolver self test.
@@ -74,7 +74,7 @@ public class GridGgfsNewLineDelimiterRecordResolverSelfTest extends GridGgfsAbst
throws Exception {
write(data);
- GridGgfsNewLineRecordResolver rslvr = resolver();
+ IgniteFsNewLineRecordResolver rslvr = resolver();
IgniteFsFileRange split;
@@ -99,7 +99,7 @@ public class GridGgfsNewLineDelimiterRecordResolverSelfTest extends GridGgfsAbst
throws Exception {
write(data);
- GridGgfsNewLineRecordResolver rslvr = resolver();
+ IgniteFsNewLineRecordResolver rslvr = resolver();
IgniteFsFileRange split;
@@ -115,7 +115,7 @@ public class GridGgfsNewLineDelimiterRecordResolverSelfTest extends GridGgfsAbst
*
* @return Resolver.
*/
- private GridGgfsNewLineRecordResolver resolver() {
- return GridGgfsNewLineRecordResolver.NEW_LINE;
+ private IgniteFsNewLineRecordResolver resolver() {
+ return IgniteFsNewLineRecordResolver.NEW_LINE;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/243e521e/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
index bf31792..51dbf5f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java
@@ -80,7 +80,7 @@ public class GridGgfsStringDelimiterRecordResolverSelfTest extends GridGgfsAbstr
String... delims) throws Exception {
write(data);
- GridGgfsByteDelimiterRecordResolver rslvr = resolver(delims);
+ IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims);
IgniteFsFileRange split;
@@ -106,7 +106,7 @@ public class GridGgfsStringDelimiterRecordResolverSelfTest extends GridGgfsAbstr
throws Exception {
write(data);
- GridGgfsStringDelimiterRecordResolver rslvr = resolver(delims);
+ IgniteFsStringDelimiterRecordResolver rslvr = resolver(delims);
IgniteFsFileRange split;
@@ -123,7 +123,7 @@ public class GridGgfsStringDelimiterRecordResolverSelfTest extends GridGgfsAbstr
* @param delims Delimiters.
* @return Resolver.
*/
- private GridGgfsStringDelimiterRecordResolver resolver(String... delims) {
- return new GridGgfsStringDelimiterRecordResolver(UTF8, delims);
+ private IgniteFsStringDelimiterRecordResolver resolver(String... delims) {
+ return new IgniteFsStringDelimiterRecordResolver(UTF8, delims);
}
}