You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/02/02 11:45:13 UTC
incubator-ignite git commit: # ignite-26
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-26 c1ac5a5e6 -> 367ab94bb
# ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/367ab94b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/367ab94b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/367ab94b
Branch: refs/heads/ignite-26
Commit: 367ab94bbc34255c81de43b622b86c14de4b9ea9
Parents: c1ac5a5
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 2 13:43:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 2 13:43:02 2015 +0300
----------------------------------------------------------------------
.../fs/mapreduce/IgniteFsRecordResolver.java | 4 +-
.../ignite/fs/mapreduce/IgniteFsTask.java | 8 +-
.../IgniteFsByteDelimiterRecordResolver.java | 2 +-
.../IgniteFsFixedLengthRecordResolver.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 8 +-
.../internal/processors/fs/GridGgfsImpl.java | 3 +-
.../processors/fs/GridGgfsIpcHandler.java | 171 ++++++++++---------
.../internal/processors/fs/GridGgfsJobImpl.java | 8 +-
.../processors/fs/GridGgfsMetaManager.java | 112 ++++++------
.../processors/fs/IgniteFsProcessor.java | 4 +-
.../internal/visor/util/VisorTaskUtils.java | 2 +-
.../fs/hadoop/GridGgfsHadoopInProc.java | 43 +++++
.../fs/hadoop/GridGgfsHadoopWrapper.java | 10 +-
.../GridHadoopDefaultMapReducePlanner.java | 10 +-
14 files changed, 230 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
index 8d2c07e..c34c304 100644
--- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java
@@ -44,13 +44,13 @@ public interface IgniteFsRecordResolver extends Serializable {
/**
* Adjusts record start offset and length.
*
- * @param ggfs GGFS instance to use.
+ * @param fs IgniteFs instance to use.
* @param stream Input stream for split file.
* @param suggestedRecord Suggested file system record.
* @return New adjusted record. If this method returns {@code null}, original record is ignored.
* @throws IgniteException If resolve failed.
* @throws IOException If resolve failed.
*/
- @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ @Nullable public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream,
IgniteFsFileRange suggestedRecord) throws IgniteException, IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
index fe57135..1005194 100644
--- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java
@@ -88,7 +88,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask
assert ignite != null;
assert args != null;
- IgniteFs ggfs = ignite.fileSystem(args.ggfsName());
+ IgniteFs fs = ignite.fileSystem(args.ggfsName());
IgniteFsProcessorAdapter ggfsProc = ((IgniteKernal) ignite).context().ggfs();
Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();
@@ -96,7 +96,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask
Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid);
for (IgniteFsPath path : args.paths()) {
- IgniteFsFile file = ggfs.info(path);
+ IgniteFsFile file = fs.info(path);
if (file == null) {
if (args.skipNonExistentFiles())
@@ -105,7 +105,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask
throw new IgniteException("Failed to process IgniteFs file because it doesn't exist: " + path);
}
- Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength());
+ Collection<IgniteFsBlockLocation> aff = fs.affinity(path, 0, file.length(), args.maxRangeLength());
long totalLen = 0;
@@ -126,7 +126,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask
IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
if (job != null) {
- ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(),
+ ComputeJob jobImpl = ggfsProc.createJob(job, fs.name(), file.path(), loc.start(),
loc.length(), args.recordResolver());
splitMap.put(jobImpl, node);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
index f959438..75228fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java
@@ -77,7 +77,7 @@ public class IgniteFsByteDelimiterRecordResolver implements IgniteFsRecordResolv
}
/** {@inheritDoc} */
- @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ @Override public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream,
IgniteFsFileRange suggestedRecord) throws IgniteException, IOException {
long suggestedStart = suggestedRecord.start();
long suggestedEnd = suggestedStart + suggestedRecord.length();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
index 76d9e84..e3c64d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
+++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java
@@ -52,7 +52,7 @@ public class IgniteFsFixedLengthRecordResolver implements IgniteFsRecordResolver
}
/** {@inheritDoc} */
- @Override public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream,
+ @Override public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream,
IgniteFsFileRange suggestedRecord)
throws IgniteException, IOException {
long suggestedEnd = suggestedRecord.start() + suggestedRecord.length();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index ab5f843..c0577b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3074,12 +3074,12 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit
guard();
try{
- IgniteFs ggfs = ctx.ggfs().ggfs(name);
+ IgniteFs fs = ctx.ggfs().ggfs(name);
- if (ggfs == null)
- throw new IllegalArgumentException("GGFS is not configured: " + name);
+ if (fs == null)
+ throw new IllegalArgumentException("IgniteFs is not configured: " + name);
- return ggfs;
+ return fs;
}
finally {
unguard();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java
index 2dc2980..63f3e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsImpl.java
@@ -1171,7 +1171,6 @@ public final class GridGgfsImpl implements GridGgfsEx {
* @param props Properties.
* @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method.
* @return Output stream.
- * @throws IgniteCheckedException If file creation failed.
*/
private IgniteFsOutputStream create0(
final IgniteFsPath path,
@@ -1196,7 +1195,7 @@ public final class GridGgfsImpl implements GridGgfsEx {
GridGgfsFileWorkerBatch batch = null;
if (mode == PROXY)
- throw new IgniteCheckedException("PROXY mode cannot be used in GGFS directly: " + path);
+ throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path);
else if (mode != PRIMARY) {
assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
index dd9a13c..a4218a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
@@ -67,6 +67,9 @@ class GridGgfsIpcHandler implements GridGgfsServerHandler {
/**
* Constructs GGFS IPC handler.
+ *
+ * @param ggfsCtx Context.
+ * @param mgmt Management connection flag.
*/
GridGgfsIpcHandler(GridGgfsContext ggfsCtx, boolean mgmt) {
assert ggfsCtx != null;
@@ -228,6 +231,7 @@ class GridGgfsIpcHandler implements GridGgfsServerHandler {
* Processes status request.
*
* @return Status response.
+ * @throws IgniteCheckedException If failed.
*/
private GridGgfsMessage processStatusRequest() throws IgniteCheckedException {
GridGgfsStatus status = ggfs.globalSpace();
@@ -257,127 +261,132 @@ class GridGgfsIpcHandler implements GridGgfsServerHandler {
GridGgfsControlResponse res = new GridGgfsControlResponse();
- switch (cmd) {
- case EXISTS:
- res.response(ggfs.exists(req.path()));
+ try {
+ switch (cmd) {
+ case EXISTS:
+ res.response(ggfs.exists(req.path()));
- break;
+ break;
- case INFO:
- res.response(ggfs.info(req.path()));
+ case INFO:
+ res.response(ggfs.info(req.path()));
- break;
+ break;
- case PATH_SUMMARY:
- res.response(ggfs.summary(req.path()));
+ case PATH_SUMMARY:
+ res.response(ggfs.summary(req.path()));
- break;
+ break;
- case UPDATE:
- res.response(ggfs.update(req.path(), req.properties()));
+ case UPDATE:
+ res.response(ggfs.update(req.path(), req.properties()));
- break;
+ break;
- case RENAME:
- ggfs.rename(req.path(), req.destinationPath());
+ case RENAME:
+ ggfs.rename(req.path(), req.destinationPath());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case DELETE:
- res.response(ggfs.delete(req.path(), req.flag()));
+ case DELETE:
+ res.response(ggfs.delete(req.path(), req.flag()));
- break;
+ break;
- case MAKE_DIRECTORIES:
- ggfs.mkdirs(req.path(), req.properties());
+ case MAKE_DIRECTORIES:
+ ggfs.mkdirs(req.path(), req.properties());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case LIST_PATHS:
- res.paths(ggfs.listPaths(req.path()));
+ case LIST_PATHS:
+ res.paths(ggfs.listPaths(req.path()));
- break;
+ break;
- case LIST_FILES:
- res.files(ggfs.listFiles(req.path()));
+ case LIST_FILES:
+ res.files(ggfs.listFiles(req.path()));
- break;
+ break;
- case SET_TIMES:
- ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+ case SET_TIMES:
+ ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case AFFINITY:
- res.locations(ggfs.affinity(req.path(), req.start(), req.length()));
+ case AFFINITY:
+ res.locations(ggfs.affinity(req.path(), req.start(), req.length()));
- break;
+ break;
- case OPEN_READ: {
- GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) :
- ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+ case OPEN_READ: {
+ GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) :
+ ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
- long streamId = registerResource(ses, ggfsIn);
+ long streamId = registerResource(ses, ggfsIn);
- if (log.isDebugEnabled())
- log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null,
- ggfsIn.fileInfo().modificationTime());
+ GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null,
+ ggfsIn.fileInfo().modificationTime());
- res.response(new GridGgfsInputStreamDescriptor(streamId, info.length()));
+ res.response(new GridGgfsInputStreamDescriptor(streamId, info.length()));
- break;
- }
+ break;
+ }
- case OPEN_CREATE: {
- long streamId = registerResource(ses, ggfs.create(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Overwrite if exists.
- affinityKey(req), // Affinity key based on replication factor.
- req.replication(),// Replication factor.
- req.blockSize(), // Block size.
- req.properties() // File properties.
- ));
+ case OPEN_CREATE: {
+ long streamId = registerResource(ses, ggfs.create(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Overwrite if exists.
+ affinityKey(req), // Affinity key based on replication factor.
+ req.replication(),// Replication factor.
+ req.blockSize(), // Block size.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- case OPEN_APPEND: {
- long streamId = registerResource(ses, ggfs.append(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Create if absent.
- req.properties() // File properties.
- ));
+ case OPEN_APPEND: {
+ long streamId = registerResource(ses, ggfs.append(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Create if absent.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- default:
- assert false : "Unhandled path control request command: " + cmd;
+ default:
+ assert false : "Unhandled path control request command: " + cmd;
- break;
+ break;
+ }
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
index d1ff698..70ca713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
@@ -79,13 +79,13 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs
/** {@inheritDoc} */
@Override public Object execute() {
- IgniteFs ggfs = ignite.fileSystem(ggfsName);
+ IgniteFs fs = ignite.fileSystem(ggfsName);
- try (IgniteFsInputStream in = ggfs.open(path)) {
+ try (IgniteFsInputStream in = fs.open(path)) {
IgniteFsFileRange split = new IgniteFsFileRange(path, start, len);
if (rslvr != null) {
- split = rslvr.resolveRecords(ggfs, in, split);
+ split = rslvr.resolveRecords(fs, in, split);
if (split == null) {
log.warning("No data found for split on local node after resolver is applied " +
@@ -97,7 +97,7 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs
in.seek(split.start());
- return job.execute(ggfs, new IgniteFsFileRange(path, split.start(), split.length()), in);
+ return job.execute(fs, new IgniteFsFileRange(path, split.start(), split.length()), in);
}
catch (IOException e) {
throw new IgniteException("Failed to execute GGFS job for file split [ggfsName=" + ggfsName +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java
index b640703..dca0327 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java
@@ -757,7 +757,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
IgniteUuid fileId = newFileInfo.id();
if (!id2InfoPrj.putxIfAbsent(fileId, newFileInfo))
- throw new IgniteFsException("Failed to add file details into cache: " + newFileInfo);
+ throw new IgniteCheckedException("Failed to add file details into cache: " + newFileInfo);
assert metaCache.get(parentId) != null;
@@ -974,7 +974,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert parentInfo.isDirectory();
if (!rmvLocked && fileInfo.lockId() != null)
- throw new IgniteFsException("Failed to remove file (file is opened for writing) [fileName=" +
+ throw new IgniteCheckedException("Failed to remove file (file is opened for writing) [fileName=" +
fileName + ", fileId=" + fileId + ", lockId=" + fileInfo.lockId() + ']');
// Validate own directory listing.
@@ -1478,15 +1478,15 @@ public class GridGgfsMetaManager extends GridGgfsManager {
GridGgfsFileInfo newInfo = c.apply(oldInfo);
if (newInfo == null)
- throw new IgniteFsException("Failed to update file info with null value" +
+ throw new IgniteCheckedException("Failed to update file info with null value" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
if (!oldInfo.id().equals(newInfo.id()))
- throw new IgniteFsException("Failed to update file info (file IDs differ)" +
+ throw new IgniteCheckedException("Failed to update file info (file IDs differ)" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
if (oldInfo.isDirectory() != newInfo.isDirectory())
- throw new IgniteFsException("Failed to update file info (file types differ)" +
+ throw new IgniteCheckedException("Failed to update file info (file types differ)" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
boolean b = metaCache.replace(fileId, oldInfo, newInfo);
@@ -1588,10 +1588,17 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* @return Output stream descriptor.
* @throws IgniteCheckedException If file creation failed.
*/
- public GridGgfsSecondaryOutputStreamDescriptor createDual(final IgniteFsFileSystem fs, final IgniteFsPath path,
- final boolean simpleCreate, @Nullable final Map<String, String> props, final boolean overwrite, final int bufSize,
- final short replication, final long blockSize, final IgniteUuid affKey)
- throws IgniteCheckedException {
+ public GridGgfsSecondaryOutputStreamDescriptor createDual(final IgniteFsFileSystem fs,
+ final IgniteFsPath path,
+ final boolean simpleCreate,
+ @Nullable final Map<String, String> props,
+ final boolean overwrite,
+ final int bufSize,
+ final short replication,
+ final long blockSize,
+ final IgniteUuid affKey)
+ throws IgniteCheckedException
+ {
if (busyLock.enterBusy()) {
try {
assert fs != null;
@@ -1651,10 +1658,10 @@ public class GridGgfsMetaManager extends GridGgfsManager {
IgniteFsFile status = fs.info(path);
if (status == null)
- throw new IgniteFsException("Failed to open output stream to the file created in " +
+ throw new IgniteCheckedException("Failed to open output stream to the file created in " +
"the secondary file system because it no longer exists: " + path);
else if (status.isDirectory())
- throw new IgniteFsException("Failed to open output stream to the file created in " +
+ throw new IgniteCheckedException("Failed to open output stream to the file created in " +
"the secondary file system because the path points to a directory: " + path);
GridGgfsFileInfo newInfo = new GridGgfsFileInfo(status.blockSize(), status.length(), affKey,
@@ -1715,11 +1722,8 @@ public class GridGgfsMetaManager extends GridGgfsManager {
simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
- if (err instanceof IgniteFsException)
- throw (IgniteFsException)err;
- else
- throw new IgniteFsException("Failed to create the file due to secondary file system " +
- "exception: " + path, err);
+ throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+ "exception: " + path, err);
}
};
@@ -1765,7 +1769,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
GridGgfsFileInfo info = infos.get(path);
if (info.isDirectory())
- throw new IgniteFsException("Failed to open output stream to the file in the " +
+ throw new IgniteCheckedException("Failed to open output stream to the file in the " +
"secondary file system because the path points to a directory: " + path);
out = fs.append(path, bufSize, false, null);
@@ -1804,11 +1808,8 @@ public class GridGgfsMetaManager extends GridGgfsManager {
U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize +
']', err);
- if (err instanceof IgniteFsException)
- throw (IgniteFsException)err;
- else
- throw new IgniteCheckedException("Failed to append to the file due to secondary file system " +
- "exception: " + path, err);
+ throw new IgniteCheckedException("Failed to append to the file due to secondary file system " +
+ "exception: " + path, err);
}
};
@@ -1869,11 +1870,8 @@ public class GridGgfsMetaManager extends GridGgfsManager {
U.error(log, "File open in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize +
']', err);
- if (err instanceof IgniteFsException)
- throw (IgniteCheckedException)err;
- else
- throw new IgniteCheckedException("Failed to open the path due to secondary file system " +
- "exception: " + path, err);
+ throw new IgniteCheckedException("Failed to open the path due to secondary file system " +
+ "exception: " + path, err);
}
};
@@ -1917,11 +1915,8 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
@Override public GridGgfsFileInfo onFailure(@Nullable Exception err) throws IgniteCheckedException {
- if (err instanceof IgniteFsException)
- throw (IgniteCheckedException)err;
- else
- throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " +
- "exception: " + path, err);
+ throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " +
+ "exception: " + path, err);
}
};
@@ -2072,7 +2067,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
else {
// Move.
if (destInfo.isFile())
- throw new IgniteFsException("Failed to rename the path in the local file system " +
+ throw new IgniteCheckedException("Failed to rename the path in the local file system " +
"because destination path already exists and it is a file: " + dest);
else
moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), src.name(), destInfo.id());
@@ -2097,11 +2092,8 @@ public class GridGgfsMetaManager extends GridGgfsManager {
U.error(log, "Path rename in DUAL mode failed [source=" + src + ", destination=" + dest + ']',
err);
- if (err instanceof IgniteFsException)
- throw (IgniteCheckedException)err;
- else
- throw new IgniteCheckedException("Failed to rename the path due to secondary file system " +
- "exception: " + src, err);
+ throw new IgniteCheckedException("Failed to rename the path due to secondary file system " +
+ "exception: " + src, err);
}
};
@@ -2250,9 +2242,14 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* @return File info of the end path.
* @throws IgniteCheckedException If failed.
*/
- private GridGgfsFileInfo synchronize(IgniteFsFileSystem fs, IgniteFsPath startPath, GridGgfsFileInfo startPathInfo,
- IgniteFsPath endPath, boolean strict, @Nullable Map<IgniteFsPath, GridGgfsFileInfo> created)
- throws IgniteCheckedException {
+ private GridGgfsFileInfo synchronize(IgniteFsFileSystem fs,
+ IgniteFsPath startPath,
+ GridGgfsFileInfo startPathInfo,
+ IgniteFsPath endPath,
+ boolean strict,
+ @Nullable Map<IgniteFsPath, GridGgfsFileInfo> created)
+ throws IgniteCheckedException
+ {
assert fs != null;
assert startPath != null && startPathInfo != null && endPath != null;
@@ -2272,7 +2269,14 @@ public class GridGgfsMetaManager extends GridGgfsManager {
parentInfo = created.get(curPath);
else {
// Get file status from the secondary file system.
- IgniteFsFile status = fs.info(curPath);
+ IgniteFsFile status;
+
+ try {
+ status = fs.info(curPath);
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException("Failed to get path information: " + e, e);
+ }
if (status != null) {
if (!status.isDirectory() && !curPath.equals(endPath))
@@ -2322,9 +2326,12 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* @return Result of task execution.
* @throws IgniteCheckedException If failed.
*/
- private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgniteFsFileSystem fs, boolean strict,
+ private <T> T synchronizeAndExecute(SynchronizationTask<T> task,
+ IgniteFsFileSystem fs,
+ boolean strict,
IgniteFsPath... paths)
- throws IgniteCheckedException {
+ throws IgniteCheckedException
+ {
return synchronizeAndExecute(task, fs, strict, null, paths);
}
@@ -2340,8 +2347,13 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* @return Result of task execution.
* @throws IgniteCheckedException If failed.
*/
- private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgniteFsFileSystem fs, boolean strict,
- @Nullable Collection<IgniteUuid> extraLockIds, IgniteFsPath... paths) throws IgniteCheckedException {
+ private <T> T synchronizeAndExecute(SynchronizationTask<T> task,
+ IgniteFsFileSystem fs,
+ boolean strict,
+ @Nullable Collection<IgniteUuid> extraLockIds,
+ IgniteFsPath... paths)
+ throws IgniteCheckedException
+ {
assert task != null;
assert fs != null;
assert paths != null && paths.length > 0;
@@ -2477,8 +2489,12 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert firstParentPath != null;
assert pathToId.get(firstParentPath) != null;
- GridGgfsFileInfo info = synchronize(fs, firstParentPath,
- idToInfo.get(pathToId.get(firstParentPath)), path, strict, created);
+ GridGgfsFileInfo info = synchronize(fs,
+ firstParentPath,
+ idToInfo.get(pathToId.get(firstParentPath)),
+ path,
+ strict,
+ created);
assert strict && info != null || !strict;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java
index 8087d11..707dc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java
@@ -230,8 +230,8 @@ public class IgniteFsProcessor extends IgniteFsProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path,
- long start, long length, IgniteFsRecordResolver recRslv) {
- return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv);
+ long start, long len, IgniteFsRecordResolver recRslv) {
+ return new GridGgfsJobImpl(job, ggfsName, path, start, len, recRslv);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 0b6b479..f43bea8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -616,7 +616,7 @@ public class VisorTaskUtils {
String logsDir;
if (ggfs instanceof GridGgfsEx)
- logsDir = ((GridGgfsEx) ggfs).clientLogDirectory();
+ logsDir = ((GridGgfsEx)ggfs).clientLogDirectory();
else if (ggfs == null)
throw new IgniteCheckedException("Failed to get profiler log folder (GGFS instance not found)");
else
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java
index 19ec955..b229b09 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopInProc.java
@@ -49,6 +49,7 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
* Constructor.
*
* @param ggfs Target GGFS.
+ * @param log Log.
*/
public GridGgfsHadoopInProc(GridGgfsEx ggfs, Log log) {
this.ggfs = ggfs;
@@ -84,6 +85,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.info(path);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path);
}
@@ -94,6 +98,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.update(path, props);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path);
}
@@ -106,6 +113,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return true;
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " +
path);
@@ -119,6 +129,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return true;
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src);
}
@@ -129,6 +142,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.delete(path, recursive);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path);
}
@@ -150,6 +166,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.listPaths(path);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path);
}
@@ -160,6 +179,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.listFiles(path);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path);
}
@@ -172,6 +194,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return true;
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " +
path);
@@ -183,6 +208,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.summary(path);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " +
path);
@@ -195,6 +223,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
try {
return ggfs.affinity(path, start, len);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path);
}
@@ -207,6 +238,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return new GridGgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
}
@@ -220,6 +254,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return new GridGgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
}
@@ -234,6 +271,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return new GridGgfsHadoopStreamDelegate(this, stream);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path);
}
@@ -247,6 +287,9 @@ public class GridGgfsHadoopInProc implements GridGgfsHadoopEx {
return new GridGgfsHadoopStreamDelegate(this, stream);
}
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
catch (IllegalStateException e) {
throw new GridGgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java
index 71a3d81..c597899 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopWrapper.java
@@ -339,7 +339,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop {
try {
Ignite ignite = G.ignite();
- ggfs = (GridGgfsEx) ignite.fileSystem(endpoint.ggfs());
+ ggfs = (GridGgfsEx)ignite.fileSystem(endpoint.ggfs());
}
catch (Exception e) {
err = e;
@@ -348,7 +348,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop {
else {
for (Ignite ignite : G.allGrids()) {
try {
- ggfs = (GridGgfsEx) ignite.fileSystem(endpoint.ggfs());
+ ggfs = (GridGgfsEx)ignite.fileSystem(endpoint.ggfs());
break;
}
@@ -401,9 +401,9 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop {
}
// 4. Try local TCP connection.
- boolean skipLocalTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
+ boolean skipLocTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
- if (!skipLocalTcp) {
+ if (!skipLocTcp) {
if (curDelegate == null) {
GridGgfsHadoopEx hadoop = null;
@@ -426,7 +426,7 @@ public class GridGgfsHadoopWrapper implements GridGgfsHadoop {
}
// 5. Try remote TCP connection.
- if (curDelegate == null && (skipLocalTcp || !F.eq(LOCALHOST, endpoint.host()))) {
+ if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
GridGgfsHadoopEx hadoop = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/367ab94b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
index d98d8f4..fcb4bd6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
@@ -161,8 +161,14 @@ public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePla
ggfs = (GridGgfsEx)((IgniteEx)ignite).ggfsx(endpoint.ggfs());
if (ggfs != null && !ggfs.isProxy(split0.file())) {
- Collection<IgniteFsBlockLocation> blocks = ggfs.affinity(new IgniteFsPath(split0.file()),
- split0.start(), split0.length());
+ Collection<IgniteFsBlockLocation> blocks;
+
+ try {
+ blocks = ggfs.affinity(new IgniteFsPath(split0.file()), split0.start(), split0.length());
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
assert blocks != null;