You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/23 09:25:04 UTC
incubator-ignite git commit: # some refactoring + comments.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-980 f27987549 -> 484de67c1
# some refactoring + comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/484de67c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/484de67c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/484de67c
Branch: refs/heads/ignite-980
Commit: 484de67c11bce70f61db49f737d7aa0e981fae06
Parents: f279875
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 23 10:24:38 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 23 10:24:38 2015 +0300
----------------------------------------------------------------------
.../hadoop/fs/HadoopFileSystemCache.java | 242 -------------------
.../hadoop/fs/HadoopFileSystemCacheUtil.java | 241 ++++++++++++++++++
.../processors/hadoop/v2/HadoopV2Job.java | 6 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 23 +-
.../hadoop/v2/HadoopV2TaskContext.java | 13 +-
5 files changed, 258 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
deleted file mode 100644
index 96b32db..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Static caches of file systems used by Map-Reduce tasks and jobs.
- * This class
- */
-public class HadoopFileSystemCache {
- /**
- * Creates HadoopLazyConcurrentMap.
- * @return a new HadoopLazyConcurrentMap.
- */
- public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
- return new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
- @Override public FileSystem createValue(FsCacheKey key) {
- try {
- assert key != null;
-
- // Explicitly disable FileSystem caching:
- URI uri = key.uri();
-
- String scheme = uri.getScheme();
-
- // Copy the configuration to avoid altering the external object.
- Configuration cfg = new Configuration(key.configuration());
-
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
-
- cfg.setBoolean(prop, true);
-
- return FileSystem.get(uri, cfg, key.user());
- }
- catch (IOException | InterruptedException ioe) {
- throw new IgniteException(ioe);
- }
- }
- }
- );
- }
-
- /**
- * Gets non-null user name as per the Hadoop viewpoint.
- * @param cfg the Hadoop job configuration, may be null.
- * @return the user name, never null.
- */
- private static String getMrHadoopUser(Configuration cfg) throws IOException {
- String user = cfg.get(MRJobConfig.USER_NAME);
-
- if (user == null)
- user = IgniteHadoopFileSystem.getFsHadoopUser();
-
- return user;
- }
-
- /**
- * Common method to get the V1 file system in MapRed engine.
- * It gets the filesystem for the user specified in the
- * configuration with {@link MRJobConfig#USER_NAME} property.
- * The file systems are created and cached in the given map upon first request.
- *
- * @param uri The file system uri.
- * @param cfg The configuration.
- * @param map The caching map.
- * @return The file system.
- * @throws IOException On error.
- */
- public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg,
- HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
- throws IOException {
- assert map != null;
- assert cfg != null;
-
- final String usr = getMrHadoopUser(cfg);
-
- assert usr != null;
-
- if (uri == null)
- uri = FileSystem.getDefaultUri(cfg);
-
- final FileSystem fs;
-
- try {
- final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
- fs = map.getOrCreate(key);
- }
- catch (IgniteException ie) {
- throw new IOException(ie);
- }
-
- assert fs != null;
- assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
- return fs;
- }
-
- /**
- * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
- * @param uri0 The uri.
- * @param cfg The cfg.
- * @return Correct URI.
- */
- private static URI fixUri(URI uri0, Configuration cfg) {
- if (uri0 == null)
- return FileSystem.getDefaultUri(cfg);
-
- String scheme = uri0.getScheme();
- String authority = uri0.getAuthority();
-
- if (authority == null) {
- URI dfltUri = FileSystem.getDefaultUri(cfg);
-
- if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
- return dfltUri;
- }
-
- return uri0;
- }
-
- /**
- * Note that configuration is not a part of the key.
- * It is used solely to initialize the first instance
- * that is created for the key.
- */
- public static final class FsCacheKey {
- /** */
- private final URI uri;
-
- /** */
- private final String usr;
-
- /** */
- private final String equalityKey;
-
- /** */
- private final Configuration cfg;
-
- /**
- * Constructor
- */
- public FsCacheKey(URI uri, String usr, Configuration cfg) {
- assert uri != null;
- assert usr != null;
- assert cfg != null;
-
- this.uri = fixUri(uri, cfg);
- this.usr = usr;
- this.cfg = cfg;
-
- this.equalityKey = createEqualityKey();
- }
-
- /**
- * Creates String key used for equality and hashing.
- */
- private String createEqualityKey() {
- GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
-
- if (uri.getScheme() != null)
- sb.a(uri.getScheme().toLowerCase());
-
- sb.a("://");
-
- if (uri.getAuthority() != null)
- sb.a(uri.getAuthority().toLowerCase());
-
- return sb.toString();
- }
-
- /**
- * The URI.
- */
- public URI uri() {
- return uri;
- }
-
- /**
- * The User.
- */
- public String user() {
- return usr;
- }
-
- /**
- * The Configuration.
- */
- public Configuration configuration() {
- return cfg;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean equals(Object obj) {
- if (obj == this)
- return true;
-
- if (obj == null || getClass() != obj.getClass())
- return false;
-
- return equalityKey.equals(((FsCacheKey)obj).equalityKey);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return equalityKey.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return equalityKey;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
new file mode 100644
index 0000000..397b13e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * File system cache utility methods used by Map-Reduce tasks and jobs.
+ */
+public class HadoopFileSystemCacheUtil {
+ /**
+ * A common static factory method. Creates new HadoopLazyConcurrentMap.
+ * @return a new HadoopLazyConcurrentMap.
+ */
+ public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
+ return new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+ @Override public FileSystem createValue(FsCacheKey key) {
+ try {
+ assert key != null;
+
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
+
+ String scheme = uri.getScheme();
+
+ // Copy the configuration to avoid altering the external object.
+ Configuration cfg = new Configuration(key.configuration());
+
+ String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+
+ cfg.setBoolean(prop, true);
+
+ return FileSystem.get(uri, cfg, key.user());
+ }
+ catch (IOException | InterruptedException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+ }
+
+ /**
+ * Gets non-null user name as per the Hadoop viewpoint.
+ * @param cfg the Hadoop job configuration, may be null.
+ * @return the user name, never null.
+ */
+ private static String getMrHadoopUser(Configuration cfg) throws IOException {
+ String user = cfg.get(MRJobConfig.USER_NAME);
+
+ if (user == null)
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ return user;
+ }
+
+ /**
+ * Common method to get the V1 file system in MapRed engine.
+ * It gets the filesystem for the user specified in the
+ * configuration with {@link MRJobConfig#USER_NAME} property.
+ * The file systems are created and cached in the given map upon first request.
+ *
+ * @param uri The file system uri.
+ * @param cfg The configuration.
+ * @param map The caching map.
+ * @return The file system.
+ * @throws IOException On error.
+ */
+ public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
+ HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
+ throws IOException {
+ assert map != null;
+ assert cfg != null;
+
+ final String usr = getMrHadoopUser(cfg);
+
+ assert usr != null;
+
+ if (uri == null)
+ uri = FileSystem.getDefaultUri(cfg);
+
+ final FileSystem fs;
+
+ try {
+ final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+ fs = map.getOrCreate(key);
+ }
+ catch (IgniteException ie) {
+ throw new IOException(ie);
+ }
+
+ assert fs != null;
+ assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+ return fs;
+ }
+
+ /**
+ * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+ * @param uri0 The uri.
+ * @param cfg The cfg.
+ * @return Correct URI.
+ */
+ private static URI fixUri(URI uri0, Configuration cfg) {
+ if (uri0 == null)
+ return FileSystem.getDefaultUri(cfg);
+
+ String scheme = uri0.getScheme();
+ String authority = uri0.getAuthority();
+
+ if (authority == null) {
+ URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+ if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+ return dfltUri;
+ }
+
+ return uri0;
+ }
+
+ /**
+ * Note that configuration is not a part of the key.
+ * It is used solely to initialize the first instance
+ * that is created for the key.
+ */
+ public static final class FsCacheKey {
+ /** */
+ private final URI uri;
+
+ /** */
+ private final String usr;
+
+ /** */
+ private final String equalityKey;
+
+ /** */
+ private final Configuration cfg;
+
+ /**
+ * Constructor
+ */
+ public FsCacheKey(URI uri, String usr, Configuration cfg) {
+ assert uri != null;
+ assert usr != null;
+ assert cfg != null;
+
+ this.uri = fixUri(uri, cfg);
+ this.usr = usr;
+ this.cfg = cfg;
+
+ this.equalityKey = createEqualityKey();
+ }
+
+ /**
+ * Creates String key used for equality and hashing.
+ */
+ private String createEqualityKey() {
+ GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+ if (uri.getScheme() != null)
+ sb.a(uri.getScheme().toLowerCase());
+
+ sb.a("://");
+
+ if (uri.getAuthority() != null)
+ sb.a(uri.getAuthority().toLowerCase());
+
+ return sb.toString();
+ }
+
+ /**
+ * The URI.
+ */
+ public URI uri() {
+ return uri;
+ }
+
+ /**
+ * The User.
+ */
+ public String user() {
+ return usr;
+ }
+
+ /**
+ * The Configuration.
+ */
+ public Configuration configuration() {
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return equalityKey.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return equalityKey;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 319640d..d1765a8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -43,7 +43,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*;
/**
* Hadoop job implementation for v2 API.
@@ -84,7 +84,7 @@ public class HadoopV2Job implements HadoopJob {
private volatile byte[] jobConfData;
/** File system cache map. */
- private final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap
+ private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
= createHadoopLazyConcurrentMap();
/** Disposal guard. */
@@ -399,6 +399,6 @@ public class HadoopV2Job implements HadoopJob {
* @throws IOException On error.
*/
public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException {
- return HadoopFileSystemCache.fileSystemForMrUser(uri, cfg, fsMap);
+ return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 55a31c6..97ad179 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -61,8 +61,8 @@ class HadoopV2JobResourceManager {
/** Staging directory to delivery job jar and config to the work nodes. */
private Path stagingDir;
- /** TODO */
- private final HadoopV2Job fsProvider;
+ /** The job. */
+ private final HadoopV2Job job;
/**
* Creates new instance.
@@ -70,11 +70,11 @@ class HadoopV2JobResourceManager {
* @param ctx Hadoop job context.
* @param log Logger.
*/
- public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job fsProvider) {
+ public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job job) {
this.jobId = jobId;
this.ctx = ctx;
this.log = log.getLogger(HadoopV2JobResourceManager.class);
- this.fsProvider = fsProvider;
+ this.job = job;
}
/**
@@ -119,10 +119,7 @@ class HadoopV2JobResourceManager {
stagingDir = new Path(new URI(mrDir));
if (download) {
-// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-// .equals(HadoopClassLoader.nameForJob(locNodeId));
-
- FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), cfg);
+ FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg);
if (!fs.exists(stagingDir))
throw new IgniteCheckedException("Failed to find map-reduce submission " +
@@ -217,10 +214,7 @@ class HadoopV2JobResourceManager {
FileSystem dstFs = FileSystem.getLocal(cfg);
-// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-// .equals(HadoopClassLoader.nameForJob(locNodeId));
-
- FileSystem srcFs = fsProvider.fileSystem(srcPath.toUri(), cfg);
+ FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg);
if (extract) {
File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -303,10 +297,7 @@ class HadoopV2JobResourceManager {
public void cleanupStagingDirectory() {
try {
if (stagingDir != null) {
-// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-// .equals(HadoopClassLoader.nameForJob(locNodeId));
-
- FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), ctx.getJobConf());
+ FileSystem fs = job.fileSystem(stagingDir.toUri(), ctx.getJobConf());
fs.delete(stagingDir, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index f007038..90b0e43 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -45,7 +45,7 @@ import java.security.*;
import java.util.*;
import java.util.concurrent.*;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -56,13 +56,15 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
/** */
private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
- /** Lazy per-user file system cache used by Hadoop tasks. */
- private static final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap
+ /** Lazy per-user file system cache used by the Hadoop task. */
+ private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
= createHadoopLazyConcurrentMap();
/**
* This method is called with reflection upon Job finish with class loader of each task.
* This will clean up all the Fs created for specific task.
+ * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
+ * are different.
*
* @throws IgniteCheckedException On error.
*/
@@ -444,12 +446,11 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
try {
// This assertion uses .startsWith() instead of .equals() because task class loaders may
// be reused between tasks of the same job.
- assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+ assert ((HadoopClassLoader)getClass().getClassLoader()).name()
.startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
- // Task class loader.
// We also cache Fs there, all them will be cleared explicitly upon the Job end.
- fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf(), fsMap);
+ fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
}
catch (IOException e) {
throw new IgniteCheckedException(e);