You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/23 11:57:27 UTC
incubator-ignite git commit: # IG-980: corrections by review results.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-980 0544ef014 -> 4f98ee1b4
# IG-980: corrections by review results.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f98ee1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f98ee1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f98ee1b
Branch: refs/heads/ignite-980
Commit: 4f98ee1b49636fe434e721d423b81d1c5ad12411
Parents: 0544ef0
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 23 12:50:31 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 23 12:50:31 2015 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopJobInfo.java | 5 +-
.../processors/hadoop/HadoopDefaultJobInfo.java | 6 +-
.../hadoop/fs/HadoopFileSystemCacheUtil.java | 242 -------------------
.../hadoop/fs/HadoopFileSystemCacheUtils.java | 241 ++++++++++++++++++
.../hadoop/jobtracker/HadoopJobTracker.java | 4 +-
.../processors/hadoop/v2/HadoopV2Job.java | 41 ++--
.../hadoop/v2/HadoopV2TaskContext.java | 2 +-
.../collections/HadoopAbstractMapTest.java | 2 +-
8 files changed, 270 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 1be5a06..d3735d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -55,14 +55,13 @@ public interface HadoopJobInfo extends Serializable {
* This method will be called once for the same ID on one node, though it can be called on the same host
* multiple times from different processes (in case of multiple nodes on the same host or external execution).
*
+ * @param jobCls The job class.
* @param jobId Job ID.
* @param log Logger.
* @return Job.
* @throws IgniteCheckedException If failed.
*/
- // TODO: Docs
- // TODO: jobCls0 => jobCls
- public HadoopJob createJob(Class<? extends HadoopJob> jobCls0,
+ public HadoopJob createJob(Class<? extends HadoopJob> jobCls,
HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d8cb3ae..95e03c8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -78,12 +78,12 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
}
/** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls0,
+ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls,
HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
- assert jobCls0 != null;
+ assert jobCls != null;
try {
- Constructor<? extends HadoopJob> constructor = jobCls0.getConstructor(HadoopJobId.class,
+ Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
HadoopDefaultJobInfo.class, IgniteLogger.class);
return constructor.newInstance(jobId, this, log);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
deleted file mode 100644
index 4abb3c2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * File system cache utility methods used by Map-Reduce tasks and jobs.
- */
-// TODO: Util -> Utils
-public class HadoopFileSystemCacheUtil {
- /**
- * A common static factory method. Creates new HadoopLazyConcurrentMap.
- * @return a new HadoopLazyConcurrentMap.
- */
- public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
- return new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
- @Override public FileSystem createValue(FsCacheKey key) {
- try {
- assert key != null;
-
- // Explicitly disable FileSystem caching:
- URI uri = key.uri();
-
- String scheme = uri.getScheme();
-
- // Copy the configuration to avoid altering the external object.
- Configuration cfg = new Configuration(key.configuration());
-
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
-
- cfg.setBoolean(prop, true);
-
- return FileSystem.get(uri, cfg, key.user());
- }
- catch (IOException | InterruptedException ioe) {
- throw new IgniteException(ioe);
- }
- }
- }
- );
- }
-
- /**
- * Gets non-null user name as per the Hadoop viewpoint.
- * @param cfg the Hadoop job configuration, may be null.
- * @return the user name, never null.
- */
- private static String getMrHadoopUser(Configuration cfg) throws IOException {
- String user = cfg.get(MRJobConfig.USER_NAME);
-
- if (user == null)
- user = IgniteHadoopFileSystem.getFsHadoopUser();
-
- return user;
- }
-
- /**
- * Common method to get the V1 file system in MapRed engine.
- * It gets the filesystem for the user specified in the
- * configuration with {@link MRJobConfig#USER_NAME} property.
- * The file systems are created and cached in the given map upon first request.
- *
- * @param uri The file system uri.
- * @param cfg The configuration.
- * @param map The caching map.
- * @return The file system.
- * @throws IOException On error.
- */
- public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
- HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
- throws IOException {
- assert map != null;
- assert cfg != null;
-
- final String usr = getMrHadoopUser(cfg);
-
- assert usr != null;
-
- if (uri == null)
- uri = FileSystem.getDefaultUri(cfg);
-
- final FileSystem fs;
-
- try {
- final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
- fs = map.getOrCreate(key);
- }
- catch (IgniteException ie) {
- throw new IOException(ie);
- }
-
- assert fs != null;
- assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
- return fs;
- }
-
- /**
- * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
- * @param uri0 The uri.
- * @param cfg The cfg.
- * @return Correct URI.
- */
- private static URI fixUri(URI uri0, Configuration cfg) {
- if (uri0 == null)
- return FileSystem.getDefaultUri(cfg);
-
- String scheme = uri0.getScheme();
- String authority = uri0.getAuthority();
-
- if (authority == null) {
- URI dfltUri = FileSystem.getDefaultUri(cfg);
-
- if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
- return dfltUri;
- }
-
- return uri0;
- }
-
- /**
- * Note that configuration is not a part of the key.
- * It is used solely to initialize the first instance
- * that is created for the key.
- */
- public static final class FsCacheKey {
- /** */
- private final URI uri;
-
- /** */
- private final String usr;
-
- /** */
- private final String equalityKey;
-
- /** */
- private final Configuration cfg;
-
- /**
- * Constructor
- */
- public FsCacheKey(URI uri, String usr, Configuration cfg) {
- assert uri != null;
- assert usr != null;
- assert cfg != null;
-
- this.uri = fixUri(uri, cfg);
- this.usr = usr;
- this.cfg = cfg;
-
- this.equalityKey = createEqualityKey();
- }
-
- /**
- * Creates String key used for equality and hashing.
- */
- private String createEqualityKey() {
- GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
-
- if (uri.getScheme() != null)
- sb.a(uri.getScheme().toLowerCase());
-
- sb.a("://");
-
- if (uri.getAuthority() != null)
- sb.a(uri.getAuthority().toLowerCase());
-
- return sb.toString();
- }
-
- /**
- * The URI.
- */
- public URI uri() {
- return uri;
- }
-
- /**
- * The User.
- */
- public String user() {
- return usr;
- }
-
- /**
- * The Configuration.
- */
- public Configuration configuration() {
- return cfg;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean equals(Object obj) {
- if (obj == this)
- return true;
-
- if (obj == null || getClass() != obj.getClass())
- return false;
-
- return equalityKey.equals(((FsCacheKey)obj).equalityKey);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return equalityKey.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return equalityKey;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
new file mode 100644
index 0000000..f94b932
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * File system cache utility methods used by Map-Reduce tasks and jobs.
+ */
+public class HadoopFileSystemCacheUtils {
+ /**
+ * A common static factory method. Creates new HadoopLazyConcurrentMap.
+ * @return a new HadoopLazyConcurrentMap.
+ */
+ public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
+ return new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+ @Override public FileSystem createValue(FsCacheKey key) {
+ try {
+ assert key != null;
+
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
+
+ String scheme = uri.getScheme();
+
+ // Copy the configuration to avoid altering the external object.
+ Configuration cfg = new Configuration(key.configuration());
+
+ String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+
+ cfg.setBoolean(prop, true);
+
+ return FileSystem.get(uri, cfg, key.user());
+ }
+ catch (IOException | InterruptedException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+ }
+
+ /**
+ * Gets non-null user name as per the Hadoop viewpoint.
+ * @param cfg the Hadoop job configuration, may be null.
+ * @return the user name, never null.
+ */
+ private static String getMrHadoopUser(Configuration cfg) throws IOException {
+ String user = cfg.get(MRJobConfig.USER_NAME);
+
+ if (user == null)
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ return user;
+ }
+
+ /**
+ * Common method to get the V1 file system in MapRed engine.
+ * It gets the filesystem for the user specified in the
+ * configuration with {@link MRJobConfig#USER_NAME} property.
+ * The file systems are created and cached in the given map upon first request.
+ *
+ * @param uri The file system uri.
+ * @param cfg The configuration.
+ * @param map The caching map.
+ * @return The file system.
+ * @throws IOException On error.
+ */
+ public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
+ HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
+ throws IOException {
+ assert map != null;
+ assert cfg != null;
+
+ final String usr = getMrHadoopUser(cfg);
+
+ assert usr != null;
+
+ if (uri == null)
+ uri = FileSystem.getDefaultUri(cfg);
+
+ final FileSystem fs;
+
+ try {
+ final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+ fs = map.getOrCreate(key);
+ }
+ catch (IgniteException ie) {
+ throw new IOException(ie);
+ }
+
+ assert fs != null;
+ assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+ return fs;
+ }
+
+ /**
+ * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+ * @param uri0 The uri.
+ * @param cfg The cfg.
+ * @return Correct URI.
+ */
+ private static URI fixUri(URI uri0, Configuration cfg) {
+ if (uri0 == null)
+ return FileSystem.getDefaultUri(cfg);
+
+ String scheme = uri0.getScheme();
+ String authority = uri0.getAuthority();
+
+ if (authority == null) {
+ URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+ if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+ return dfltUri;
+ }
+
+ return uri0;
+ }
+
+ /**
+ * Note that configuration is not a part of the key.
+ * It is used solely to initialize the first instance
+ * that is created for the key.
+ */
+ public static final class FsCacheKey {
+ /** */
+ private final URI uri;
+
+ /** */
+ private final String usr;
+
+ /** */
+ private final String equalityKey;
+
+ /** */
+ private final Configuration cfg;
+
+ /**
+ * Constructor
+ */
+ public FsCacheKey(URI uri, String usr, Configuration cfg) {
+ assert uri != null;
+ assert usr != null;
+ assert cfg != null;
+
+ this.uri = fixUri(uri, cfg);
+ this.usr = usr;
+ this.cfg = cfg;
+
+ this.equalityKey = createEqualityKey();
+ }
+
+ /**
+ * Creates String key used for equality and hashing.
+ */
+ private String createEqualityKey() {
+ GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+ if (uri.getScheme() != null)
+ sb.a(uri.getScheme().toLowerCase());
+
+ sb.a("://");
+
+ if (uri.getAuthority() != null)
+ sb.a(uri.getAuthority().toLowerCase());
+
+ return sb.toString();
+ }
+
+ /**
+ * The URI.
+ */
+ public URI uri() {
+ return uri;
+ }
+
+ /**
+ * The User.
+ */
+ public String user() {
+ return usr;
+ }
+
+ /**
+ * The Configuration.
+ */
+ public Configuration configuration() {
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return equalityKey.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return equalityKey;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index e809fde..194ae33 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -117,8 +117,8 @@ public class HadoopJobTracker extends HadoopComponent {
jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName());
}
catch (Exception ioe) {
- // TODO: IgniteCheckedException, message
- throw new IgniteException(ioe);
+ throw new IgniteCheckedException("Failed to load job class [class="
+ + HadoopV2Job.class.getName() + ']', ioe);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index ac0ba78..33a218d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -40,10 +40,9 @@ import java.net.*;
import java.util.*;
import java.util.Queue;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.*;
/**
* Hadoop job implementation for v2 API.
@@ -77,19 +76,15 @@ public class HadoopV2Job implements HadoopJob {
/** All created contexts. */
private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
+ /** File system cache map. */
+ private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap();
+
/** Local node ID */
private volatile UUID locNodeId;
/** Serialized JobConf. */
private volatile byte[] jobConfData;
- /** File system cache map. */
- private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap();
-
- /** Disposal guard. */
- // TODO: Remove.
- private final AtomicBoolean disposed = new AtomicBoolean();
-
/**
* @param jobId Job ID.
* @param jobInfo Job info.
@@ -276,12 +271,6 @@ public class HadoopV2Job implements HadoopJob {
/** {@inheritDoc} */
@SuppressWarnings("ThrowFromFinallyBlock")
@Override public void dispose(boolean external) throws IgniteCheckedException {
- // TODO: Remove.
- boolean dsp = disposed.compareAndSet(false, true);
-
- if (!dsp)
- return;
-
try {
if (rsrcMgr != null && !external) {
File jobLocDir = jobLocalDir(locNodeId, jobId);
@@ -306,12 +295,17 @@ public class HadoopV2Job implements HadoopJob {
try {
final ClassLoader ldr = cls.getClassLoader();
- // Stop Hadoop daemons for this *task*:
- stopHadoopFsDaemons(ldr);
+ try {
+ // Stop Hadoop daemons for this *task*:
+ stopHadoopFsDaemons(ldr);
+ }
+ catch (Exception e) {
+ if (err == null)
+ err = e;
+ }
// Also close all the FileSystems cached in
// HadoopLazyConcurrentMap for this *task* class loader:
- // TODO: Ensure it is called even in case of Exception from stopHadoopFsDaemons.
closeCachedTaskFileSystems(ldr);
}
catch (Throwable e) {
@@ -325,9 +319,14 @@ public class HadoopV2Job implements HadoopJob {
assert fullCtxClsQueue.isEmpty();
- // Close all cached file systems for this *Job*:
- // TODO: Wrap into try-catch.
- fsMap.close();
+ try {
+ // Close all cached file systems for this *Job*:
+ fsMap.close();
+ }
+ catch (Exception e) {
+ if (err == null)
+ err = e;
+ }
if (err != null)
throw U.cast(err);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 90b0e43..6eff475 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -45,7 +45,7 @@ import java.security.*;
import java.util.*;
import java.util.concurrent.*;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f98ee1b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index 3231134..b5e2ab5 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -136,7 +136,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls0,
+ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls,
HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
assert false;