You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/06 07:27:23 UTC
[1/6] ignite git commit: IGNITE-2258: IGFS: now default path modes
could be optionally disabled using
FileSystemConfiguration.isInitializeDefaultPathModes() property.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5.2 [created] 1d7fb5702
IGNITE-2258: IGFS: now default path modes could be optionally disabled using FileSystemConfiguration.isInitializeDefaultPathModes() property.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e64d0d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e64d0d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e64d0d7
Branch: refs/heads/ignite-1.5.2
Commit: 2e64d0d7cc51552fffc231cbc850cd615076fb85
Parents: cca90c7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Dec 29 09:31:58 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jan 6 09:23:17 2016 +0400
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 54 ++++-
.../internal/processors/igfs/IgfsImpl.java | 12 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 7 +
...condaryFileSystemInitializationSelfTest.java | 214 +++++++++++++++++++
.../testsuites/IgniteHadoopTestSuite.java | 2 +
5 files changed, 278 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e64d0d7/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 888f927..3a9e55e 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -80,6 +80,9 @@ public class FileSystemConfiguration {
/** Default IPC endpoint enabled flag. */
public static final boolean DFLT_IPC_ENDPOINT_ENABLED = true;
+ /** Default value of whether to initialize default path modes. */
+ public static final boolean DFLT_INIT_DFLT_PATH_MODES = true;
+
/** IGFS instance name. */
private String name;
@@ -158,6 +161,9 @@ public class FileSystemConfiguration {
/** Maximum range length. */
private long maxTaskRangeLen;
+ /** Whether to initialize default path modes. */
+ private boolean initDfltPathModes = DFLT_INIT_DFLT_PATH_MODES;
+
/**
* Constructs default configuration.
*/
@@ -189,6 +195,7 @@ public class FileSystemConfiguration {
fragmentizerThrottlingBlockLen = cfg.getFragmentizerThrottlingBlockLength();
fragmentizerThrottlingDelay = cfg.getFragmentizerThrottlingDelay();
secondaryFs = cfg.getSecondaryFileSystem();
+ initDfltPathModes = cfg.isInitializeDefaultPathsModes();
ipcEndpointCfg = cfg.getIpcEndpointConfiguration();
ipcEndpointEnabled = cfg.isIpcEndpointEnabled();
maxSpace = cfg.getMaxSpaceSize();
@@ -507,7 +514,7 @@ public class FileSystemConfiguration {
* Sets the secondary file system. Secondary file system is provided for pass-through, write-through,
* and read-through purposes.
*
- * @param fileSystem
+ * @param fileSystem Secondary file system.
*/
public void setSecondaryFileSystem(IgfsSecondaryFileSystem fileSystem) {
secondaryFs = fileSystem;
@@ -519,13 +526,14 @@ public class FileSystemConfiguration {
* If path doesn't correspond to any specified prefix or mappings are not provided, then
* {@link #getDefaultMode()} is used.
* <p>
- * Several folders under {@code '/apache/ignite'} folder have predefined mappings which cannot be overridden.
- * <li>{@code /apache/ignite/primary} and all it's sub-folders will always work in {@code PRIMARY} mode.</li>
+ * If {@link #isInitializeDefaultPathsModes()} is set to {@code true}, the following path modes will be created
+ * by default:
+ * <li>{@code /ignite/primary} and all it's sub-folders will always work in {@code PRIMARY} mode.</li>
* <p>
* And in case secondary file system URI is provided:
- * <li>{@code /apache/ignite/proxy} and all it's sub-folders will always work in {@code PROXY} mode.</li>
- * <li>{@code /apache/ignite/sync} and all it's sub-folders will always work in {@code DUAL_SYNC} mode.</li>
- * <li>{@code /apache/ignite/async} and all it's sub-folders will always work in {@code DUAL_ASYNC} mode.</li>
+ * <li>{@code /ignite/proxy} and all it's sub-folders will always work in {@code PROXY} mode.</li>
+ * <li>{@code /ignite/sync} and all it's sub-folders will always work in {@code DUAL_SYNC} mode.</li>
+ * <li>{@code /ignite/async} and all it's sub-folders will always work in {@code DUAL_ASYNC} mode.</li>
*
* @return Map of paths to {@code IGFS} modes.
*/
@@ -788,6 +796,40 @@ public class FileSystemConfiguration {
this.maxTaskRangeLen = maxTaskRangeLen;
}
+ /**
+ * Get whether to initialize default path modes.
+ * <p>
+ * When set to {@code true} Ignite will automatically create the following path modes:
+ * <ul>
+ * <li>{@code /ignite/primary} - will work in {@link IgfsMode#PRIMARY} mode;</li>
+ * <li>{@code /ignite/sync} - will work in {@link IgfsMode#DUAL_SYNC} mode (only if secondary file system
+ * is set);</li>
+ * <li>{@code /ignite/async} - will work in {@link IgfsMode#DUAL_ASYNC} mode (only if secondary file system
+ * is set);</li>
+ * <li>{@code /ignite/proxy} - will work in {@link IgfsMode#PROXY} mode (only if secondary file system
+ * is set).</li>
+ * </ul>
+ * See {@link #getPathModes()} for more information about path modes.
+ * <p>
+ * Defaults to {@link #DFLT_INIT_DFLT_PATH_MODES}.
+ *
+ * @return {@code True} if default path modes will be initialized.
+ */
+ public boolean isInitializeDefaultPathsModes() {
+ return initDfltPathModes;
+ }
+
+ /**
+ * Set whether to initialize default path modes.
+ * <p>
+ * See {@link #isInitializeDefaultPathsModes()} for more information.
+ *
+ * @param initDfltPathModes Whether to initialize default path modes.
+ */
+ public void setInitializeDefaultPathsModes(boolean initDfltPathModes) {
+ this.initDfltPathModes = initDfltPathModes;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(FileSystemConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e64d0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 0d5cda3..8534513 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -215,12 +215,14 @@ public final class IgfsImpl implements IgfsEx {
Map<String, IgfsMode> cfgModes = new LinkedHashMap<>();
Map<String, IgfsMode> dfltModes = new LinkedHashMap<>(4, 1.0f);
- dfltModes.put("/ignite/primary", PRIMARY);
+ if (cfg.isInitializeDefaultPathsModes()) {
+ dfltModes.put("/ignite/primary", PRIMARY);
- if (secondaryFs != null) {
- dfltModes.put("/ignite/proxy", PROXY);
- dfltModes.put("/ignite/sync", DUAL_SYNC);
- dfltModes.put("/ignite/async", DUAL_ASYNC);
+ if (secondaryFs != null) {
+ dfltModes.put("/ignite/proxy", PROXY);
+ dfltModes.put("/ignite/sync", DUAL_SYNC);
+ dfltModes.put("/ignite/async", DUAL_ASYNC);
+ }
}
cfgModes.putAll(dfltModes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e64d0d7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 778792a..5dce67f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -1100,6 +1100,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
/**
+ * @return {@code true} If secondary file system is initialized.
+ */
+ public boolean hasSecondaryFileSystem() {
+ return secondaryFs != null;
+ }
+
+ /**
* Convert the given path to path acceptable by the primary file system.
*
* @param path Path.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e64d0d7/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
new file mode 100644
index 0000000..511c4bb
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+import java.net.URI;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+
+/**
+ * Ensures correct modes resolution for SECONDARY paths.
+ */
+public class IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest extends IgfsCommonAbstractTest {
+ /** File system. */
+ private IgniteHadoopFileSystem fs;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ U.closeQuiet(fs);
+
+ fs = null;
+
+ G.stopAll(true);
+ }
+
+ /**
+ * Perform initial startup.
+ *
+ * @param initDfltPathModes WHether to initialize default path modes.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"NullableProblems", "unchecked"})
+ private void startUp(boolean initDfltPathModes) throws Exception {
+ startUpSecondary();
+
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("partitioned");
+ igfsCfg.setMetaCacheName("replicated");
+ igfsCfg.setName("igfs");
+ igfsCfg.setBlockSize(512 * 1024);
+ igfsCfg.setInitializeDefaultPathsModes(initDfltPathModes);
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setPort(10500);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ igfsCfg.setManagementPort(-1);
+ igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+ "igfs://igfs-secondary:igfs-grid-secondary@127.0.0.1:11500/",
+ "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
+
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName("partitioned");
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setNearConfiguration(null);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
+ cacheCfg.setBackups(0);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("replicated");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName("igfs-grid");
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ G.start(cfg);
+
+ Configuration fsCfg = new Configuration();
+
+ fsCfg.addResource(U.resolveIgniteUrl("modules/core/src/test/config/hadoop/core-site-loopback.xml"));
+
+ fsCfg.setBoolean("fs.igfs.impl.disable.cache", true);
+
+ fs = (IgniteHadoopFileSystem)FileSystem.get(new URI("igfs://igfs:igfs-grid@/"), fsCfg);
+ }
+
+ /**
+ * Startup secondary file system.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void startUpSecondary() throws Exception {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("partitioned");
+ igfsCfg.setMetaCacheName("replicated");
+ igfsCfg.setName("igfs-secondary");
+ igfsCfg.setBlockSize(512 * 1024);
+ igfsCfg.setDefaultMode(PRIMARY);
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setPort(11500);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName("partitioned");
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setNearConfiguration(null);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
+ cacheCfg.setBackups(0);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("replicated");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName("igfs-grid-secondary");
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ G.start(cfg);
+ }
+
+ /**
+ * Test scenario when defaults are initialized.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDefaultsInitialized() throws Exception {
+ check(true);
+ }
+
+ /**
+ * Test scenario when defaults are not initialized.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDefaultsNotInitialized() throws Exception {
+ check(false);
+ }
+
+ /**
+ * Actual check.
+ *
+ * @param initDfltPathModes Whether to initialize default path modes.
+ * @throws Exception If failed.
+ */
+ private void check(boolean initDfltPathModes) throws Exception {
+ startUp(initDfltPathModes);
+
+ assertEquals(initDfltPathModes, fs.hasSecondaryFileSystem());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2e64d0d7/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 0216f4b..6641bc8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -52,6 +52,7 @@ import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalDualAsyncSel
import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest;
import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest;
import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest;
+import org.apache.ignite.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest;
import org.apache.ignite.igfs.IgniteHadoopFileSystemSecondaryModeSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopCommandLineTest;
import org.apache.ignite.internal.processors.hadoop.HadoopDefaultMapReducePlannerSelfTest;
@@ -112,6 +113,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemSecondaryModeSelfTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemClientSelfTest.class.getName())));
[2/6] ignite git commit: IGNITE-2258: Fixed type on getter/setter.
Posted by vo...@apache.org.
IGNITE-2258: Fixed type on getter/setter.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4cd3b3dc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4cd3b3dc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4cd3b3dc
Branch: refs/heads/ignite-1.5.2
Commit: 4cd3b3dc2f1fa0f1a9cceb6bf544dd8fb505d7f5
Parents: 2e64d0d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Dec 29 12:52:00 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jan 6 09:23:33 2016 +0400
----------------------------------------------------------------------
.../ignite/configuration/FileSystemConfiguration.java | 10 +++++-----
.../apache/ignite/internal/processors/igfs/IgfsImpl.java | 2 +-
...leSystemSecondaryFileSystemInitializationSelfTest.java | 2 +-
3 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd3b3dc/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 3a9e55e..1a9c0fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -195,7 +195,7 @@ public class FileSystemConfiguration {
fragmentizerThrottlingBlockLen = cfg.getFragmentizerThrottlingBlockLength();
fragmentizerThrottlingDelay = cfg.getFragmentizerThrottlingDelay();
secondaryFs = cfg.getSecondaryFileSystem();
- initDfltPathModes = cfg.isInitializeDefaultPathsModes();
+ initDfltPathModes = cfg.isInitializeDefaultPathModes();
ipcEndpointCfg = cfg.getIpcEndpointConfiguration();
ipcEndpointEnabled = cfg.isIpcEndpointEnabled();
maxSpace = cfg.getMaxSpaceSize();
@@ -526,7 +526,7 @@ public class FileSystemConfiguration {
* If path doesn't correspond to any specified prefix or mappings are not provided, then
* {@link #getDefaultMode()} is used.
* <p>
- * If {@link #isInitializeDefaultPathsModes()} is set to {@code true}, the following path modes will be created
+ * If {@link #isInitializeDefaultPathModes()} is set to {@code true}, the following path modes will be created
* by default:
* <li>{@code /ignite/primary} and all it's sub-folders will always work in {@code PRIMARY} mode.</li>
* <p>
@@ -815,18 +815,18 @@ public class FileSystemConfiguration {
*
* @return {@code True} if default path modes will be initialized.
*/
- public boolean isInitializeDefaultPathsModes() {
+ public boolean isInitializeDefaultPathModes() {
return initDfltPathModes;
}
/**
* Set whether to initialize default path modes.
* <p>
- * See {@link #isInitializeDefaultPathsModes()} for more information.
+ * See {@link #isInitializeDefaultPathModes()} for more information.
*
* @param initDfltPathModes Whether to initialize default path modes.
*/
- public void setInitializeDefaultPathsModes(boolean initDfltPathModes) {
+ public void setInitializeDefaultPathModes(boolean initDfltPathModes) {
this.initDfltPathModes = initDfltPathModes;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd3b3dc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 8534513..680e660 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -215,7 +215,7 @@ public final class IgfsImpl implements IgfsEx {
Map<String, IgfsMode> cfgModes = new LinkedHashMap<>();
Map<String, IgfsMode> dfltModes = new LinkedHashMap<>(4, 1.0f);
- if (cfg.isInitializeDefaultPathsModes()) {
+ if (cfg.isInitializeDefaultPathModes()) {
dfltModes.put("/ignite/primary", PRIMARY);
if (secondaryFs != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd3b3dc/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
index 511c4bb..1b48870 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java
@@ -70,7 +70,7 @@ public class IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest ext
igfsCfg.setMetaCacheName("replicated");
igfsCfg.setName("igfs");
igfsCfg.setBlockSize(512 * 1024);
- igfsCfg.setInitializeDefaultPathsModes(initDfltPathModes);
+ igfsCfg.setInitializeDefaultPathModes(initDfltPathModes);
IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
[4/6] ignite git commit: IGNITE-2218: Fixed a problem with native
Hadoop libraries load. This closes #378.
Posted by vo...@apache.org.
IGNITE-2218: Fixed a problem with native Hadoop libraries load. This closes #378.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83a19179
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83a19179
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83a19179
Branch: refs/heads/ignite-1.5.2
Commit: 83a19179cee2bb15adc36c2265dd0a3c794b60bb
Parents: 5d58fcb
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jan 4 12:14:58 2016 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jan 6 09:26:37 2016 +0400
----------------------------------------------------------------------
.../processors/hadoop/HadoopClassLoader.java | 71 ++++++++++---
.../hadoop/v2/HadoopNativeCodeLoader.java | 74 --------------
.../hadoop/HadoopAbstractWordCountTest.java | 46 +++++++--
.../hadoop/HadoopMapReduceEmbeddedSelfTest.java | 2 +-
.../processors/hadoop/HadoopMapReduceTest.java | 15 ++-
.../hadoop/HadoopSnappyFullMapReduceTest.java | 28 +++++
.../processors/hadoop/HadoopSnappyTest.java | 102 +++++++++++++++++++
.../processors/hadoop/HadoopTasksV2Test.java | 2 +-
.../hadoop/examples/HadoopWordCount1Reduce.java | 1 +
.../hadoop/examples/HadoopWordCount2.java | 18 +++-
.../examples/HadoopWordCount2Reducer.java | 1 +
.../testsuites/IgniteHadoopTestSuite.java | 18 +++-
12 files changed, 279 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 735133f..270b31d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -30,13 +30,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-
+import java.util.Vector;
+import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopNativeCodeLoader;
import org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.objectweb.asm.AnnotationVisitor;
@@ -69,6 +70,9 @@ public class HadoopClassLoader extends URLClassLoader {
/** Name of the Hadoop daemon class. */
public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon";
+ /** Name of libhadoop library. */
+ private static final String LIBHADOOP = "hadoop.";
+
/** */
private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader();
@@ -119,6 +123,51 @@ public class HadoopClassLoader extends URLClassLoader {
assert !(getParent() instanceof HadoopClassLoader);
this.name = name;
+
+ initializeNativeLibraries();
+ }
+
+ /**
+ * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different
+ * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries
+ * to load the same native library over and over again.
+ * <p>
+ * To fix the problem, we force native library load in parent class loader and then "link" handle to this native
+ * library to our class loader. As a result, our class loader will think that the library is already loaded and will
+ * be able to link native methods.
+ *
+ * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version">
+ * JNI specification</a>
+ */
+ private void initializeNativeLibraries() {
+ try {
+ // This must trigger native library load.
+ Class.forName(NativeCodeLoader.class.getName(), true, APP_CLS_LDR);
+
+ final Vector<Object> curVector = U.field(this, "nativeLibraries");
+
+ ClassLoader ldr = APP_CLS_LDR;
+
+ while (ldr != null) {
+ Vector vector = U.field(ldr, "nativeLibraries");
+
+ for (Object lib : vector) {
+ String libName = U.field(lib, "name");
+
+ if (libName.contains(LIBHADOOP)) {
+ curVector.add(lib);
+
+ return;
+ }
+ }
+
+ ldr = ldr.getParent();
+ }
+ }
+ catch (Exception e) {
+ U.quietAndWarn(null, "Failed to initialize Hadoop native library " +
+ "(native Hadoop methods might not work properly): " + e);
+ }
}
/**
@@ -152,8 +201,6 @@ public class HadoopClassLoader extends URLClassLoader {
if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks.
return loadFromBytes(name, HadoopShutdownHookManager.class.getName());
- else if (name.endsWith(".util.NativeCodeLoader"))
- return loadFromBytes(name, HadoopNativeCodeLoader.class.getName());
else if (name.equals(HADOOP_DAEMON_CLASS_NAME))
// We replace this in order to be able to forcibly stop some daemon threads
// that otherwise never stop (e.g. PeerCache runnables):
@@ -274,7 +321,7 @@ public class HadoopClassLoader extends URLClassLoader {
/**
* Check whether class has external dependencies on Hadoop.
- *
+ *
* @param clsName Class name.
* @return {@code True} if class has external dependencies.
*/
@@ -285,15 +332,15 @@ public class HadoopClassLoader extends URLClassLoader {
ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor);
ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor);
ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor);
-
+
return hasExternalDependencies(clsName, ctx);
}
-
+
/**
* Check whether class has external dependencies on Hadoop.
- *
+ *
* @param clsName Class name.
- * @param ctx Context.
+ * @param ctx Context.
* @return {@code true} If the class has external dependencies.
*/
boolean hasExternalDependencies(String clsName, CollectingContext ctx) {
@@ -519,7 +566,7 @@ public class HadoopClassLoader extends URLClassLoader {
/** Field visitor. */
private FieldVisitor fldVisitor;
-
+
/** Class visitor. */
private ClassVisitor clsVisitor;
@@ -627,7 +674,7 @@ public class HadoopClassLoader extends URLClassLoader {
onType(t);
}
}
- }
+ }
/**
* Annotation visitor.
@@ -638,7 +685,7 @@ public class HadoopClassLoader extends URLClassLoader {
/**
* Annotation visitor.
- *
+ *
* @param ctx The collector.
*/
CollectingAnnotationVisitor(CollectingContext ctx) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
deleted file mode 100644
index 4c4840d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A fake helper to load the native hadoop code i.e. libhadoop.so.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class HadoopNativeCodeLoader {
- /**
- * Check if native-hadoop code is loaded for this platform.
- *
- * @return <code>true</code> if native-hadoop is loaded,
- * else <code>false</code>
- */
- public static boolean isNativeCodeLoaded() {
- return false;
- }
-
- /**
- * Returns true only if this build was compiled with support for snappy.
- */
- public static boolean buildSupportsSnappy() {
- return false;
- }
-
- /**
- * @return Library name.
- */
- public static String getLibraryName() {
- throw new IllegalStateException();
- }
-
- /**
- * Return if native hadoop libraries, if present, can be used for this job.
- * @param conf configuration
- *
- * @return <code>true</code> if native hadoop libraries, if present, can be
- * used for this job; <code>false</code> otherwise.
- */
- public boolean getLoadNativeLibraries(Configuration conf) {
- return false;
- }
-
- /**
- * Set if native hadoop libraries, if present, can be used for this job.
- *
- * @param conf configuration
- * @param loadNativeLibraries can native hadoop libraries be loaded
- */
- public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
index a47eaf6..e45c127 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -26,6 +27,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
@@ -118,21 +124,49 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
}
/**
+ * Read w/o decoding (default).
+ *
+ * @param fileName The file.
+ * @return The file contents, human-readable.
+ * @throws Exception On error.
+ */
+ protected String readAndSortFile(String fileName) throws Exception {
+ return readAndSortFile(fileName, null);
+ }
+
+ /**
* Reads whole text file into String.
*
* @param fileName Name of the file to read.
* @return Content of the file as String value.
* @throws Exception If could not read the file.
*/
- protected String readAndSortFile(String fileName) throws Exception {
- BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName))));
+ protected String readAndSortFile(String fileName, Configuration conf) throws Exception {
+ final List<String> list = new ArrayList<>();
+
+ final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false);
+
+ if (snappyDecode) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(new Path(fileName)))) {
+ Text key = new Text();
- List<String> list = new ArrayList<>();
+ IntWritable val = new IntWritable();
- String line;
+ while (reader.next(key, val))
+ list.add(key + "\t" + val);
+ }
+ }
+ else {
+ try (InputStream is0 = igfs.open(new IgfsPath(fileName))) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is0));
+
+ String line;
- while ((line = reader.readLine()) != null)
- list.add(line);
+ while ((line = reader.readLine()) != null)
+ list.add(line);
+ }
+ }
Collections.sort(list);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
index c0eff48..25ef382 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
@@ -106,7 +106,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
Job job = Job.getInstance(jobConf);
- HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI);
+ HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false);
if (useNewAPI) {
job.setPartitionerClass(CustomV2Partitioner.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index d0bd92b..7fd8272 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -183,7 +183,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
Job job = Job.getInstance(jobConf);
- HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer);
+ HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
@@ -207,18 +207,29 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
checkOwner(new IgfsPath(outFile));
+ String actual = readAndSortFile(outFile, job.getConfiguration());
+
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
useNewReducer,
"blue\t" + blue + "\n" +
"green\t" + green + "\n" +
"red\t" + red + "\n" +
"yellow\t" + yellow + "\n",
- readAndSortFile(outFile)
+ actual
);
}
}
/**
+ * Gets if to compress output data with Snappy.
+ *
+ * @return If to compress output data with Snappy.
+ */
+ protected boolean compressOutputSnappy() {
+ return false;
+ }
+
+ /**
* Simple test job statistics.
*
* @param jobId Job id.
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
new file mode 100644
index 0000000..22d33a5
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
+ */
+public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override protected boolean compressOutputSnappy() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
new file mode 100644
index 0000000..014ff1e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests isolated Hadoop Snappy codec usage.
+ */
+public class HadoopSnappyTest extends GridCommonAbstractTest {
+ /** Length of data. */
+ private static final int BYTE_SIZE = 1024 * 50;
+
+ /**
+ * Checks Snappy codec usage.
+ *
+ * @throws Exception On error.
+ */
+ public void testSnappy() throws Throwable {
+ // Run Snappy test in default class loader:
+ checkSnappy();
+
+ // Run the same in several more class loaders simulating jobs and tasks:
+ for (int i = 0; i < 2; i++) {
+ ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i);
+
+ Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
+
+ assertEquals(hadoopClsLdr, cls.getClassLoader());
+
+ U.invoke(cls, null, "checkSnappy");
+ }
+ }
+
+ /**
+ * Internal check routine.
+ *
+ * @throws Throwable If failed.
+ */
+ public static void checkSnappy() throws Throwable {
+ try {
+ byte[] expBytes = new byte[BYTE_SIZE];
+ byte[] actualBytes = new byte[BYTE_SIZE];
+
+ for (int i = 0; i < expBytes.length ; i++)
+ expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
+
+ SnappyCodec codec = new SnappyCodec();
+
+ codec.setConf(new Configuration());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
+ cos.write(expBytes);
+ cos.flush();
+ }
+
+ try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+ int read = cis.read(actualBytes, 0, actualBytes.length);
+
+ assert read == actualBytes.length;
+ }
+
+ assert Arrays.equals(expBytes, actualBytes);
+ }
+ catch (Throwable e) {
+ System.out.println("Snappy check failed:");
+ System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded());
+ System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded());
+
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index 3a964d6..d125deb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -48,7 +48,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
- HadoopWordCount2.setTasksClasses(job, true, true, true);
+ HadoopWordCount2.setTasksClasses(job, true, true, true, false);
Configuration conf = job.getConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
index 120ac19..2335911 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
@@ -47,6 +47,7 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex
output.collect(key, new IntWritable(sum));
}
+ /** {@inheritDoc} */
@Override public void configure(JobConf job) {
super.configure(job);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index 942a908..4b508ca 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
@@ -62,7 +65,7 @@ public class HadoopWordCount2 {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
- setTasksClasses(job, true, true, true);
+ setTasksClasses(job, true, true, true, false);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
@@ -80,7 +83,8 @@ public class HadoopWordCount2 {
* @param setCombiner Option to set combiner class.
* @param setReducer Option to set reducer and output format classes.
*/
- public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) {
+ public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer,
+ boolean outputCompression) {
if (setMapper) {
job.setMapperClass(HadoopWordCount2Mapper.class);
job.setInputFormatClass(TextInputFormat.class);
@@ -93,5 +97,15 @@ public class HadoopWordCount2 {
job.setReducerClass(HadoopWordCount2Reducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
}
+
+ if (outputCompression) {
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
+
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+
+ job.getConfiguration().set(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
index b2be53e..63a9d95 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
@@ -55,6 +55,7 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In
/** {@inheritDoc} */
@Override protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
+
wasSetUp = true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83a19179/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 1831085..6c542b5 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
import junit.framework.TestSuite;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest;
import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopSnappyTest;
import org.apache.ignite.internal.processors.hadoop.HadoopSortingTest;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapperSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest;
@@ -70,6 +73,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test;
import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test;
import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopSnappyFullMapReduceTest;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest;
@@ -96,6 +100,9 @@ public class IgniteHadoopTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName())));
@@ -192,7 +199,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
X.println("Will use Hadoop version: " + ver);
- String downloadPath = "hadoop/common/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz";
+ String downloadPath = "hadoop/core/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz";
download("Hadoop", "HADOOP_HOME", downloadPath, "hadoop-" + ver);
}
@@ -217,6 +224,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
}
List<String> urls = F.asList(
+ "http://archive.apache.org/dist/",
"http://apache-mirror.rbc.ru/pub/apache/",
"http://www.eu.apache.org/dist/",
"http://www.us.apache.org/dist/");
@@ -273,6 +281,14 @@ public class IgniteHadoopTestSuite extends TestSuite {
if (!dest.mkdirs())
throw new IllegalStateException();
}
+ else if (entry.isSymbolicLink()) {
+ // Important: in Hadoop installation there are symlinks, we need to create them:
+ Path theLinkItself = Paths.get(install.getAbsolutePath(), entry.getName());
+
+ Path linkTarget = Paths.get(entry.getLinkName());
+
+ Files.createSymbolicLink(theLinkItself, linkTarget);
+ }
else {
File parent = dest.getParentFile();
[6/6] ignite git commit: IGNITE-2206: Hadoop file system creation is
now abstracted out using factory interface.
Posted by vo...@apache.org.
IGNITE-2206: Hadoop file system creation is now abstracted out using factory interface.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d7fb570
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d7fb570
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d7fb570
Branch: refs/heads/ignite-1.5.2
Commit: 1d7fb5702fa33cf395e797161f3a86a9600921a7
Parents: 83a1917
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jan 5 10:59:31 2016 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jan 6 09:26:53 2016 +0400
----------------------------------------------------------------------
.../org/apache/ignite/igfs/IgfsUserContext.java | 16 +-
.../igfs/secondary/IgfsSecondaryFileSystem.java | 14 -
.../processors/hadoop/HadoopPayloadAware.java | 28 ++
.../ignite/internal/processors/igfs/IgfsEx.java | 13 -
.../internal/processors/igfs/IgfsImpl.java | 16 +-
.../internal/processors/igfs/IgfsPaths.java | 62 +++-
.../igfs/IgfsSecondaryFileSystemImpl.java | 11 -
.../visor/node/VisorIgfsConfiguration.java | 43 ---
.../processors/igfs/IgfsAbstractSelfTest.java | 8 +-
.../igfs/IgfsExUniversalFileSystemAdapter.java | 11 +-
.../igfs/UniversalFileSystemAdapter.java | 5 +-
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 209 ++++++++++++
.../fs/CachingHadoopFileSystemFactory.java | 86 +++++
.../hadoop/fs/HadoopFileSystemFactory.java | 52 +++
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 264 +++++++--------
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 144 +++++---
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 115 ++++---
.../hadoop/SecondaryFileSystemProvider.java | 139 --------
.../hadoop/fs/HadoopFileSystemCacheUtils.java | 8 +-
.../hadoop/fs/HadoopLazyConcurrentMap.java | 5 +-
.../ignite/igfs/Hadoop1DualAbstractTest.java | 14 +-
.../igfs/HadoopFIleSystemFactorySelfTest.java | 326 +++++++++++++++++++
...oopFileSystemUniversalFileSystemAdapter.java | 53 +--
...oopSecondaryFileSystemConfigurationTest.java | 27 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 71 ++--
.../testsuites/IgniteHadoopTestSuite.java | 3 +
26 files changed, 1191 insertions(+), 552 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index 8db4e23..1e1cd31 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -34,24 +34,24 @@ public abstract class IgfsUserContext {
* The main contract of this method is that {@link #currentUser()} method invoked
* inside closure always returns 'user' this callable executed with.
* @param user the user name to invoke closure on behalf of.
- * @param clo the closure to execute
+ * @param c the closure to execute
* @param <T> The type of closure result.
* @return the result of closure execution.
* @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
*/
- public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+ public static <T> T doAs(String user, final IgniteOutClosure<T> c) {
if (F.isEmpty(user))
throw new IllegalArgumentException("Failed to use null or empty user name.");
final String ctxUser = userStackThreadLocal.get();
if (F.eq(ctxUser, user))
- return clo.apply(); // correct context is already there
+ return c.apply(); // correct context is already there
userStackThreadLocal.set(user);
try {
- return clo.apply();
+ return c.apply();
}
finally {
userStackThreadLocal.set(ctxUser);
@@ -81,24 +81,24 @@ public abstract class IgfsUserContext {
* }
* </pre>
* @param user the user name to invoke closure on behalf of.
- * @param clbl the Callable to execute
+ * @param c the Callable to execute
* @param <T> The type of callable result.
* @return the result of closure execution.
* @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
*/
- public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+ public static <T> T doAs(String user, final Callable<T> c) throws Exception {
if (F.isEmpty(user))
throw new IllegalArgumentException("Failed to use null or empty user name.");
final String ctxUser = userStackThreadLocal.get();
if (F.eq(ctxUser, user))
- return clbl.call(); // correct context is already there
+ return c.call(); // correct context is already there
userStackThreadLocal.set(user);
try {
- return clbl.call();
+ return c.call();
}
finally {
userStackThreadLocal.set(ctxUser);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index ca6ecb7..3f124eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -192,18 +192,4 @@ public interface IgfsSecondaryFileSystem {
* @throws IgniteException In case of error.
*/
public long usedSpaceSize() throws IgniteException;
-
- /**
- * Gets the implementation specific properties of file system.
- *
- * @return Map of properties.
- */
- public Map<String,String> properties();
-
-
- /**
- * Closes the secondary file system.
- * @throws IgniteException in case of an error.
- */
- public void close() throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
new file mode 100644
index 0000000..9b79729
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Gets payload for Hadoop secondary file system.
+ */
+public interface HadoopPayloadAware {
+ /**
+ * @return Payload.
+ */
+ public Object getPayload();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 8ff7247..cf268e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -43,19 +43,6 @@ public interface IgfsEx extends IgniteFileSystem {
/** File property: prefer writes to local node. */
public static final String PROP_PREFER_LOCAL_WRITES = "locWrite";
- /** Property name for path to Hadoop configuration. */
- public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-
- /** Property name for URI of file system. */
- public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
-
- /** Property name for default user name of file system.
- * NOTE: for secondary file system this is just a default user name, which is used
- * when the 2ndary filesystem is used outside of any user context.
- * If another user name is set in the context, 2ndary file system will work on behalf
- * of that user, which is different from the default. */
- public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
-
/**
* Stops IGFS cleaning all used resources.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 680e660..38914ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -87,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
@@ -200,6 +202,9 @@ public final class IgfsImpl implements IgfsEx {
data = igfsCtx.data();
secondaryFs = cfg.getSecondaryFileSystem();
+ if (secondaryFs instanceof LifecycleAware)
+ ((LifecycleAware) secondaryFs).start();
+
/* Default IGFS mode. */
IgfsMode dfltMode;
@@ -256,8 +261,12 @@ public final class IgfsImpl implements IgfsEx {
modeRslvr = new IgfsModeResolver(dfltMode, modes);
- secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode,
- modeRslvr.modesOrdered());
+ Object secondaryFsPayload = null;
+
+ if (secondaryFs instanceof HadoopPayloadAware)
+ secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload();
+
+ secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered());
// Check whether IGFS LRU eviction policy is set on data cache.
String dataCacheName = igfsCtx.configuration().getDataCacheName();
@@ -305,7 +314,8 @@ public final class IgfsImpl implements IgfsEx {
batch.cancel();
try {
- secondaryFs.close();
+ if (secondaryFs instanceof LifecycleAware)
+ ((LifecycleAware)secondaryFs).stop();
}
catch (Exception e) {
log.error("Failed to close secondary file system.", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index fbf89ce..4a79259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -17,17 +17,21 @@
package org.apache.ignite.internal.processors.igfs;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -37,8 +41,8 @@ public class IgfsPaths implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
- /** Additional secondary file system properties. */
- private Map<String, String> props;
+ /** */
+ private byte[] payloadBytes;
/** Default IGFS mode. */
private IgfsMode dfltMode;
@@ -56,22 +60,25 @@ public class IgfsPaths implements Externalizable {
/**
* Constructor.
*
- * @param props Additional secondary file system properties.
+ * @param payload Payload.
* @param dfltMode Default IGFS mode.
* @param pathModes Path modes.
+ * @throws IgniteCheckedException If failed.
*/
- public IgfsPaths(Map<String, String> props, IgfsMode dfltMode, @Nullable List<T2<IgfsPath,
- IgfsMode>> pathModes) {
- this.props = props;
+ public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes)
+ throws IgniteCheckedException {
this.dfltMode = dfltMode;
this.pathModes = pathModes;
- }
- /**
- * @return Secondary file system properties.
- */
- public Map<String, String> properties() {
- return props;
+ if (payload == null)
+ payloadBytes = null;
+ else {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ new JdkMarshaller().marshal(payload, out);
+
+ payloadBytes = out.toByteArray();
+ }
}
/**
@@ -88,9 +95,25 @@ public class IgfsPaths implements Externalizable {
return pathModes;
}
+ /**
+ * @return Payload.
+ *
+ * @throws IgniteCheckedException If failed to deserialize the payload.
+ */
+ @Nullable public Object getPayload(ClassLoader clsLdr) throws IgniteCheckedException {
+ if (payloadBytes == null)
+ return null;
+ else {
+ ByteArrayInputStream in = new ByteArrayInputStream(payloadBytes);
+
+ return new JdkMarshaller().unmarshal(in, clsLdr);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeStringMap(out, props);
+ U.writeByteArray(out, payloadBytes);
+
U.writeEnum(out, dfltMode);
if (pathModes != null) {
@@ -98,7 +121,10 @@ public class IgfsPaths implements Externalizable {
out.writeInt(pathModes.size());
for (T2<IgfsPath, IgfsMode> pathMode : pathModes) {
+ assert pathMode.getKey() != null;
+
pathMode.getKey().writeExternal(out);
+
U.writeEnum(out, pathMode.getValue());
}
}
@@ -108,7 +134,8 @@ public class IgfsPaths implements Externalizable {
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- props = U.readStringMap(in);
+ payloadBytes = U.readByteArray(in);
+
dfltMode = IgfsMode.fromOrdinal(in.readByte());
if (in.readBoolean()) {
@@ -118,11 +145,10 @@ public class IgfsPaths implements Externalizable {
for (int i = 0; i < size; i++) {
IgfsPath path = new IgfsPath();
- path.readExternal(in);
- T2<IgfsPath, IgfsMode> entry = new T2<>(path, IgfsMode.fromOrdinal(in.readByte()));
+ path.readExternal(in);
- pathModes.add(entry);
+ pathModes.add(new T2<>(path, IgfsMode.fromOrdinal(in.readByte())));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 23d6322..44e858f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
import java.io.OutputStream;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import org.apache.ignite.IgniteException;
import org.apache.ignite.igfs.IgfsFile;
@@ -116,14 +115,4 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public long usedSpaceSize() throws IgniteException {
return igfs.usedSpaceSize();
}
-
- /** {@inheritDoc} */
- @Override public Map<String, String> properties() {
- return Collections.emptyMap();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- // No-op.
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
index e85484d..ea0e721 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
@@ -29,9 +29,6 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.compactClass;
/**
@@ -65,15 +62,6 @@ public class VisorIgfsConfiguration implements Serializable {
/** Number of batches that can be concurrently sent to remote node. */
private int perNodeParallelBatchCnt;
- /** URI of the secondary Hadoop file system. */
- private String secondaryHadoopFileSysUri;
-
- /** Path for the secondary hadoop file system config. */
- private String secondaryHadoopFileSysCfgPath;
-
- /** User name for the secondary hadoop file system config. */
- private String secondaryHadoopFileSysUserName;
-
/** IGFS instance mode. */
private IgfsMode dfltMode;
@@ -141,16 +129,6 @@ public class VisorIgfsConfiguration implements Serializable {
cfg.perNodeBatchSize = igfs.getPerNodeBatchSize();
cfg.perNodeParallelBatchCnt = igfs.getPerNodeParallelBatchCount();
- IgfsSecondaryFileSystem secFs = igfs.getSecondaryFileSystem();
-
- if (secFs != null) {
- Map<String, String> props = secFs.properties();
-
- cfg.secondaryHadoopFileSysUri = props.get(SECONDARY_FS_URI);
- cfg.secondaryHadoopFileSysCfgPath = props.get(SECONDARY_FS_CONFIG_PATH);
- cfg.secondaryHadoopFileSysUserName = props.get(SECONDARY_FS_USER_NAME);
- }
-
cfg.dfltMode = igfs.getDefaultMode();
cfg.pathModes = igfs.getPathModes();
cfg.dualModePutExecutorSrvc = compactClass(igfs.getDualModePutExecutorService());
@@ -251,27 +229,6 @@ public class VisorIgfsConfiguration implements Serializable {
}
/**
- * @return URI of the secondary Hadoop file system.
- */
- @Nullable public String secondaryHadoopFileSystemUri() {
- return secondaryHadoopFileSysUri;
- }
-
- /**
- * @return User name of the secondary Hadoop file system.
- */
- @Nullable public String secondaryHadoopFileSystemUserName() {
- return secondaryHadoopFileSysUserName;
- }
-
- /**
- * @return Path for the secondary hadoop file system config.
- */
- @Nullable public String secondaryHadoopFileSystemConfigPath() {
- return secondaryHadoopFileSysCfgPath;
- }
-
- /**
* @return IGFS instance mode.
*/
public IgfsMode defaultMode() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index b290303..015b992 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -2744,7 +2744,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
finally {
U.closeQuiet(os);
- IgfsEx igfsEx = uni.getAdapter(IgfsEx.class);
+ IgfsEx igfsEx = uni.unwrap(IgfsEx.class);
if (igfsEx != null)
awaitFileClose(igfsEx.asSecondary(), file);
@@ -2868,7 +2868,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws IgniteCheckedException If failed.
*/
protected void checkExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws IgniteCheckedException {
- IgfsEx ex = uni.getAdapter(IgfsEx.class);
+ IgfsEx ex = uni.unwrap(IgfsEx.class);
for (IgfsPath path : paths) {
if (ex != null)
assert ex.context().meta().fileId(path) != null : "Path doesn't exist [igfs=" + ex.name() +
@@ -2921,7 +2921,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
protected void checkNotExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws Exception {
- IgfsEx ex = uni.getAdapter(IgfsEx.class);
+ IgfsEx ex = uni.unwrap(IgfsEx.class);
for (IgfsPath path : paths) {
if (ex != null)
assert ex.context().meta().fileId(path) == null : "Path exists [igfs=" + ex.name() + ", path=" +
@@ -3222,7 +3222,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
*/
@SuppressWarnings("unchecked")
public static void clear(UniversalFileSystemAdapter uni) throws Exception {
- IgfsEx igfsEx = uni.getAdapter(IgfsEx.class);
+ IgfsEx igfsEx = uni.unwrap(IgfsEx.class);
if (igfsEx != null)
clear(igfsEx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java
index 7583364..c6bef72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java
@@ -28,7 +28,6 @@ import org.apache.ignite.igfs.IgfsPath;
* Universal adapter over {@link IgfsEx} filesystem.
*/
public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdapter {
-
/** The wrapped igfs. */
private final IgfsEx igfsEx;
@@ -69,18 +68,14 @@ public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdap
@Override public boolean delete(String path, boolean recursive) throws IOException {
IgfsPath igfsPath = new IgfsPath(path);
- boolean del = igfsEx.delete(igfsPath, recursive);
-
- return del;
+ return igfsEx.delete(igfsPath, recursive);
}
/** {@inheritDoc} */
@Override public InputStream openInputStream(String path) throws IOException {
IgfsPath igfsPath = new IgfsPath(path);
- IgfsInputStreamAdapter adapter = igfsEx.open(igfsPath);
-
- return adapter;
+ return igfsEx.open(igfsPath);
}
/** {@inheritDoc} */
@@ -97,7 +92,7 @@ public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdap
}
/** {@inheritDoc} */
- @Override public <T> T getAdapter(Class<T> clazz) {
+ @Override public <T> T unwrap(Class<T> clazz) {
if (clazz == IgfsEx.class)
return (T)igfsEx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java
index ba8c164..eef0057 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java
@@ -31,8 +31,9 @@ public interface UniversalFileSystemAdapter {
/**
* Gets name of the FS.
* @return name of this file system.
+ * @throws IOException in case of failure.
*/
- String name();
+ String name() throws IOException;
/**
* Answers if a file denoted by path exists.
@@ -93,5 +94,5 @@ public interface UniversalFileSystemAdapter {
* @param <T> The type we need to adapt to.
* @return the adapter object of the given type.
*/
- <T> T getAdapter(Class<T> clazz);
+ <T> T unwrap(Class<T> clazz);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
new file mode 100644
index 0000000..1e2bbf2
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
+
+/**
+ * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call.
+ * <p>
+ * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop.
+ */
+public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** File system URI. */
+ protected String uri;
+
+ /** File system config paths. */
+ protected String[] cfgPaths;
+
+ /** Configuration of the secondary filesystem, never null. */
+ protected transient Configuration cfg;
+
+ /** Resulting URI. */
+ protected transient URI fullUri;
+
+ /**
+ * Constructor.
+ */
+ public BasicHadoopFileSystemFactory() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String usrName) throws IOException {
+ return create0(IgfsUtils.fixUserName(usrName));
+ }
+
+ /**
+ * Internal file system create routine.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ */
+ protected FileSystem create0(String usrName) throws IOException {
+ assert cfg != null;
+
+ try {
+ return FileSystem.get(fullUri, cfg, usrName);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", e);
+ }
+ }
+
+ /**
+ * Gets file system URI.
+ * <p>
+ * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
+ * <p>
+ * If not set, default URI will be picked from file system configuration using
+ * {@link FileSystem#getDefaultUri(Configuration)} method.
+ *
+ * @return File system URI.
+ */
+ @Nullable public String getUri() {
+ return uri;
+ }
+
+ /**
+ * Sets file system URI. See {@link #getUri()} for more information.
+ *
+ * @param uri File system URI.
+ */
+ public void setUri(@Nullable String uri) {
+ this.uri = uri;
+ }
+
+ /**
+ * Gets paths to additional file system configuration files (e.g. core-site.xml).
+ * <p>
+ * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable.
+ * <p>
+ * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means
+ * that path order might be important in some cases.
+ * <p>
+ * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
+ * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well.
+ *
+ * @return Paths to file system configuration files.
+ */
+ @Nullable public String[] getConfigPaths() {
+ return cfgPaths;
+ }
+
+ /**
+ * Set paths to additional file system configuration files (e.g. core-site.xml). See {@link #getConfigPaths()} for
+ * more information.
+ *
+ * @param cfgPaths Paths to file system configuration files.
+ */
+ public void setConfigPaths(String... cfgPaths) {
+ this.cfgPaths = cfgPaths;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ cfg = HadoopUtils.safeCreateConfiguration();
+
+ if (cfgPaths != null) {
+ for (String cfgPath : cfgPaths) {
+ if (cfgPath == null)
+ throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths));
+ else {
+ URL url = U.resolveIgniteUrl(cfgPath);
+
+ if (url == null) {
+ // If secConfPath is given, it should be resolvable:
+ throw new IgniteException("Failed to resolve secondary file system configuration path " +
+ "(ensure that it exists locally and you have read access to it): " + cfgPath);
+ }
+
+ cfg.addResource(url);
+ }
+ }
+ }
+
+ // If secondary fs URI is not given explicitly, try to get it from the configuration:
+ if (uri == null)
+ fullUri = FileSystem.getDefaultUri(cfg);
+ else {
+ try {
+ fullUri = new URI(uri);
+ }
+ catch (URISyntaxException use) {
+ throw new IgniteException("Failed to resolve secondary file system URI: " + uri);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, uri);
+
+ if (cfgPaths != null) {
+ out.writeInt(cfgPaths.length);
+
+ for (String cfgPath : cfgPaths)
+ U.writeString(out, cfgPath);
+ }
+ else
+ out.writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ uri = U.readString(in);
+
+ int cfgPathsCnt = in.readInt();
+
+ if (cfgPathsCnt != -1) {
+ cfgPaths = new String[cfgPathsCnt];
+
+ for (int i = 0; i < cfgPathsCnt; i++)
+ cfgPaths[i] = U.readString(in);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
new file mode 100644
index 0000000..91f7777
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on
+ * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each
+ * user instead.
+ * <p>
+ * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user
+ * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to
+ * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation
+ * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either
+ * use {@link BasicHadoopFileSystemFactory} or implement your own factory.
+ */
+public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Per-user file system cache. */
+ private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
+ @Override public FileSystem createValue(String key) throws IOException {
+ return create0(key);
+ }
+ }
+ );
+
+ /**
+ * Public non-arg constructor.
+ */
+ public CachingHadoopFileSystemFactory() {
+ // noop
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String usrName) throws IOException {
+ return cache.getOrCreate(IgfsUtils.fixUserName(usrName));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ super.start();
+
+ // Disable caching.
+ cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ super.stop();
+
+ try {
+ cache.close();
+ }
+ catch (IgniteCheckedException ice) {
+ throw new IgniteException(ice);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
new file mode 100644
index 0000000..5ad08ab
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}.
+ * <p>
+ * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required.
+ * <p>
+ * It is implementation dependent whether to rely on built-in Hadoop file system cache, implement own caching facility
+ * or doesn't cache file systems at all.
+ * <p>
+ * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be
+ * performed by Ignite. You may want to implement some initialization or cleanup there.
+ * <p>
+ * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the
+ * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file
+ * system paths.
+ */
+public interface HadoopFileSystemFactory extends Serializable {
+ /**
+ * Gets file system for the given user name.
+ *
+ * @param usrName User name
+ * @return File system.
+ * @throws IOException In case of error.
+ */
+ public FileSystem get(String usrName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 1ca6938..9f544c1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -17,15 +17,7 @@
package org.apache.ignite.hadoop.fs;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -35,6 +27,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
@@ -45,71 +38,59 @@ import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.ValueFactory;
+import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
import org.apache.ignite.internal.processors.igfs.IgfsFileInfo;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME;
/**
- * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
- * In fact, this class deals with different FileSystems depending on the user context,
- * see {@link IgfsUserContext#currentUser()}.
+ * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}.
+ * <p>
+ * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}.
*/
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem {
- /** Properties of file system, see {@link #properties()}
- *
- * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
- * See {@link IgfsEx#SECONDARY_FS_URI}
- * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
- * */
- private final Map<String, String> props = new HashMap<>();
-
- /** Secondary file system provider. */
- private final SecondaryFileSystemProvider secProvider;
-
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware,
+ HadoopPayloadAware {
/** The default user name. It is used if no user context is set. */
- private final String dfltUserName;
+ private String dfltUsrName;
- /** FileSystem instance created for the default user.
- * Stored outside the fileSysLazyMap due to performance reasons. */
- private final FileSystem dfltFs;
+ /** Factory. */
+ private HadoopFileSystemFactory fsFactory;
- /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
- private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
- new ValueFactory<String, FileSystem>() {
- @Override public FileSystem createValue(String key) {
- try {
- assert !F.isEmpty(key);
-
- return secProvider.createFileSystem(key);
- }
- catch (IOException ioe) {
- throw new IgniteException(ioe);
- }
- }
- }
- );
+ /**
+ * Default constructor for Spring.
+ */
+ public IgniteHadoopIgfsSecondaryFileSystem() {
+ // No-op.
+ }
/**
* Simple constructor that is to be used by default.
*
* @param uri URI of file system.
* @throws IgniteCheckedException In case of error.
+ * @deprecated Use {@link #getFileSystemFactory()} instead.
*/
+ @Deprecated
public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException {
this(uri, null, null);
}
@@ -120,7 +101,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @param uri URI of file system.
* @param cfgPath Additional path to Hadoop configuration.
* @throws IgniteCheckedException In case of error.
+ * @deprecated Use {@link #getFileSystemFactory()} instead.
*/
+ @Deprecated
public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath)
throws IgniteCheckedException {
this(uri, cfgPath, null);
@@ -131,46 +114,73 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
*
* @param uri URI of file system.
* @param cfgPath Additional path to Hadoop configuration.
- * @param userName User name.
+ * @param usrName User name.
* @throws IgniteCheckedException In case of error.
+ * @deprecated Use {@link #getFileSystemFactory()} instead.
*/
+ @Deprecated
public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
- @Nullable String userName) throws IgniteCheckedException {
- // Treat empty uri and userName arguments as nulls to improve configuration usability:
- if (F.isEmpty(uri))
- uri = null;
-
- if (F.isEmpty(cfgPath))
- cfgPath = null;
-
- if (F.isEmpty(userName))
- userName = null;
+ @Nullable String usrName) throws IgniteCheckedException {
+ setDefaultUserName(usrName);
- this.dfltUserName = IgfsUtils.fixUserName(userName);
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
- try {
- this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
+ fac.setUri(uri);
- // File system creation for the default user name.
- // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
- this.dfltFs = secProvider.createFileSystem(dfltUserName);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
+ if (cfgPath != null)
+ fac.setConfigPaths(cfgPath);
- assert dfltFs != null;
+ setFileSystemFactory(fac);
+ }
- uri = secProvider.uri().toString();
+ /**
+ * Gets default user name.
+ * <p>
+ * Defines user name which will be used during file system invocation in case no user name is defined explicitly
+ * through {@link FileSystem#get(URI, Configuration, String)}.
+ * <p>
+ * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name
+ * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or
+ * {@link IgfsUserContext#doAs(String, Callable)} methods.
+ * <p>
+ * If not set value of system property {@code "user.name"} will be used. If this property is not set either,
+ * {@code "anonymous"} will be used.
+ *
+ * @return Default user name.
+ */
+ @Nullable public String getDefaultUserName() {
+ return dfltUsrName;
+ }
- if (!uri.endsWith("/"))
- uri += "/";
+ /**
+ * Sets default user name. See {@link #getDefaultUserName()} for details.
+ *
+ * @param dfltUsrName Default user name.
+ */
+ public void setDefaultUserName(@Nullable String dfltUsrName) {
+ this.dfltUsrName = dfltUsrName;
+ }
- if (cfgPath != null)
- props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+ /**
+ * Gets secondary file system factory.
+ * <p>
+ * This factory will be used whenever a call to a target {@link FileSystem} is required.
+ * <p>
+ * If not set, {@link CachingHadoopFileSystemFactory} will be used.
+ *
+ * @return Secondary file system factory.
+ */
+ public HadoopFileSystemFactory getFileSystemFactory() {
+ return fsFactory;
+ }
- props.put(SECONDARY_FS_URI, uri);
- props.put(SECONDARY_FS_USER_NAME, dfltUserName);
+ /**
+ * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details.
+ *
+ * @param factory Secondary file system factory.
+ */
+ public void setFileSystemFactory(HadoopFileSystemFactory factory) {
+ this.fsFactory = factory;
}
/**
@@ -180,7 +190,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @return Hadoop path.
*/
private Path convert(IgfsPath path) {
- URI uri = fileSysForUser().getUri();
+ URI uri = fileSystemForUser().getUri();
return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
}
@@ -234,7 +244,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
try {
- return fileSysForUser().exists(convert(path));
+ return fileSystemForUser().exists(convert(path));
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -245,7 +255,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
- final FileSystem fileSys = fileSysForUser();
+ final FileSystem fileSys = fileSystemForUser();
try {
if (props0.userName() != null || props0.groupName() != null)
@@ -266,7 +276,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public void rename(IgfsPath src, IgfsPath dest) {
// Delegate to the secondary file system.
try {
- if (!fileSysForUser().rename(convert(src), convert(dest)))
+ if (!fileSystemForUser().rename(convert(src), convert(dest)))
throw new IgfsException("Failed to rename (secondary file system returned false) " +
"[src=" + src + ", dest=" + dest + ']');
}
@@ -278,7 +288,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
try {
- return fileSysForUser().delete(convert(path), recursive);
+ return fileSystemForUser().delete(convert(path), recursive);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -288,7 +298,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
try {
- if (!fileSysForUser().mkdirs(convert(path)))
+ if (!fileSystemForUser().mkdirs(convert(path)))
throw new IgniteException("Failed to make directories [path=" + path + "]");
}
catch (IOException e) {
@@ -299,7 +309,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
try {
- if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+ if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
}
catch (IOException e) {
@@ -310,7 +320,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
try {
- FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
+ FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -333,7 +343,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
try {
- FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
+ FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -360,13 +370,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
- return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
+ return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
try {
- return fileSysForUser().create(convert(path), overwrite);
+ return fileSystemForUser().create(convert(path), overwrite);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -380,8 +390,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
try {
- return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
- (short)replication, blockSize, null);
+ return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+ (short) replication, blockSize, null);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -394,7 +404,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
- return fileSysForUser().append(convert(path), bufSize);
+ return fileSystemForUser().append(convert(path), bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -404,7 +414,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
try {
- final FileStatus status = fileSysForUser().getFileStatus(convert(path));
+ final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
if (status == null)
return null;
@@ -479,65 +489,61 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
- return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
+ return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
}
}
- /** {@inheritDoc} */
- @Override public Map<String, String> properties() {
- return props;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- Exception e = null;
-
- try {
- dfltFs.close();
- }
- catch (Exception e0) {
- e = e0;
- }
-
- try {
- fileSysLazyMap.close();
- }
- catch (IgniteCheckedException ice) {
- if (e == null)
- e = ice;
- }
-
- if (e != null)
- throw new IgniteException(e);
- }
-
/**
* Gets the underlying {@link FileSystem}.
* This method is used solely for testing.
* @return the underlying Hadoop {@link FileSystem}.
*/
public FileSystem fileSystem() {
- return fileSysForUser();
+ return fileSystemForUser();
}
/**
* Gets the FileSystem for the current context user.
* @return the FileSystem instance, never null.
*/
- private FileSystem fileSysForUser() {
+ private FileSystem fileSystemForUser() {
String user = IgfsUserContext.currentUser();
if (F.isEmpty(user))
- user = dfltUserName; // default is never empty.
+ user = IgfsUtils.fixUserName(dfltUsrName);
assert !F.isEmpty(user);
- if (F.eq(user, dfltUserName))
- return dfltFs; // optimization
+ try {
+ return fsFactory.get(user);
+ }
+ catch (IOException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ dfltUsrName = IgfsUtils.fixUserName(dfltUsrName);
+
+ if (fsFactory == null)
+ fsFactory = new CachingHadoopFileSystemFactory();
+
+ if (fsFactory instanceof LifecycleAware)
+ ((LifecycleAware) fsFactory).start();
+ }
- return fileSysLazyMap.getOrCreate(user);
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (fsFactory instanceof LifecycleAware)
+ ((LifecycleAware)fsFactory).stop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopFileSystemFactory getPayload() {
+ return fsFactory;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 5dce67f..71f6435 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -17,19 +17,6 @@
package org.apache.ignite.hadoop.fs.v1;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -43,7 +30,9 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
@@ -51,7 +40,6 @@ import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
@@ -68,8 +56,23 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -85,8 +88,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
/**
* {@code IGFS} Hadoop 1.x file system driver over file system API. To use
@@ -165,8 +166,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** IGFS mode resolver. */
private IgfsModeResolver modeRslvr;
- /** Secondary file system instance. */
- private FileSystem secondaryFs;
+ /** The secondary file system factory. */
+ private HadoopFileSystemFactory factory;
/** Management connection flag. */
private boolean mgmt;
@@ -327,21 +328,28 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
if (initSecondary) {
- Map<String, String> props = paths.properties();
+ try {
+ factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to get secondary file system factory.", e);
+ }
+
+ assert factory != null;
- String secUri = props.get(SECONDARY_FS_URI);
- String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware) factory).start();
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+ FileSystem secFs = factory.get(user);
- secondaryFs = secProvider.createFileSystem(user);
+ secondaryUri = secFs.getUri();
- secondaryUri = secProvider.uri();
+ A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
}
catch (IOException e) {
if (!mgmt)
- throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
+ throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
else
LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " +
"will have no effect): " + e.getMessage());
@@ -409,8 +417,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (clientLog.isLogEnabled())
clientLog.close();
- if (secondaryFs != null)
- U.closeQuiet(secondaryFs);
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware) factory).stop();
// Reset initialized resources.
uri = null;
@@ -425,6 +433,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
A.notNull(p, "p");
if (mode(p) == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -453,6 +463,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
A.notNull(p, "p");
if (mode(p) == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -482,6 +494,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
try {
if (mode(p) == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -490,8 +504,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
secondaryFs.setOwner(toSecondary(p), username, grpName);
- }
- else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null)
+ } else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null)
throw new IOException("Failed to set file permission (file not found?)" +
" [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']');
}
@@ -511,6 +524,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgfsMode mode = mode(path);
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -583,6 +598,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -664,6 +681,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
", path=" + path + ", bufSize=" + bufSize + ']');
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -727,6 +746,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgfsMode mode = mode(srcPath);
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -787,6 +808,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgfsMode mode = mode(path);
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -832,6 +855,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgfsMode mode = mode(path);
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -896,26 +921,35 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** {@inheritDoc} */
@Override public void setWorkingDirectory(Path newPath) {
- if (newPath == null) {
- Path homeDir = getHomeDirectory();
+ try {
+ if (newPath == null) {
+ Path homeDir = getHomeDirectory();
- if (secondaryFs != null)
- secondaryFs.setWorkingDirectory(toSecondary(homeDir));
+ FileSystem secondaryFs = secondaryFileSystem();
- workingDir = homeDir;
- }
- else {
- Path fixedNewPath = fixRelativePart(newPath);
+ if (secondaryFs != null)
+ secondaryFs.setWorkingDirectory(toSecondary(homeDir));
+
+ workingDir = homeDir;
+ }
+ else {
+ Path fixedNewPath = fixRelativePart(newPath);
- String res = fixedNewPath.toUri().getPath();
+ String res = fixedNewPath.toUri().getPath();
- if (!DFSUtil.isValidName(res))
- throw new IllegalArgumentException("Invalid DFS directory name " + res);
+ if (!DFSUtil.isValidName(res))
+ throw new IllegalArgumentException("Invalid DFS directory name " + res);
- if (secondaryFs != null)
- secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
+ FileSystem secondaryFs = secondaryFileSystem();
- workingDir = fixedNewPath;
+ if (secondaryFs != null)
+ secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
+
+ workingDir = fixedNewPath;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to obtain secondary file system instance.", e);
}
}
@@ -936,6 +970,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgfsMode mode = mode(path);
if (mode == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -977,6 +1013,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
try {
if (mode(f) == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -1007,6 +1045,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
try {
if (mode(f) == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -1038,6 +1078,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
IgfsPath path = convert(status.getPath());
if (mode(status.getPath()) == PROXY) {
+ final FileSystem secondaryFs = secondaryFileSystem();
+
if (secondaryFs == null) {
assert mgmt;
@@ -1103,7 +1145,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @return {@code true} If secondary file system is initialized.
*/
public boolean hasSecondaryFileSystem() {
- return secondaryFs != null;
+ return factory != null;
}
/**
@@ -1123,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @return Secondary file system path.
*/
private Path toSecondary(Path path) {
- assert secondaryFs != null;
+ assert factory != null;
assert secondaryUri != null;
return convertPath(path, secondaryUri);
@@ -1298,4 +1340,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
public String user() {
return user;
}
+
+ /**
+ * Gets cached or creates a {@link FileSystem}.
+ *
+ * @return The secondary file system.
+ */
+ private @Nullable FileSystem secondaryFileSystem() throws IOException{
+ if (factory == null)
+ return null;
+
+ return factory.get(user);
+ }
}
\ No newline at end of file
[5/6] ignite git commit: IGNITE-2206: Hadoop file system creation is
now abstracted out using factory interface.
Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 99ca1ec..0d7de86 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -17,22 +17,6 @@
package org.apache.ignite.hadoop.fs.v2;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.InvalidPathException;
@@ -51,13 +36,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
@@ -74,8 +60,26 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
@@ -92,8 +96,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
/**
* {@code IGFS} Hadoop 2.x file system driver over file system API. To use
@@ -168,8 +170,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** Mode resolver. */
private IgfsModeResolver modeRslvr;
- /** Secondary file system instance. */
- private AbstractFileSystem secondaryFs;
+ /** The secondary file system factory. */
+ private HadoopFileSystemFactory factory;
/** Whether custom sequential reads before prefetch value is provided. */
private boolean seqReadsBeforePrefetchOverride;
@@ -335,20 +337,27 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
}
if (initSecondary) {
- Map<String, String> props = paths.properties();
+ try {
+ factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to get secondary file system factory.", e);
+ }
- String secUri = props.get(SECONDARY_FS_URI);
- String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
+ assert factory != null;
+
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware) factory).start();
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+ FileSystem secFs = factory.get(user);
- secondaryFs = secProvider.createAbstractFileSystem(user);
+ secondaryUri = secFs.getUri();
- secondaryUri = secProvider.uri();
+ A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
}
catch (IOException e) {
- throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
+ throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
}
}
}
@@ -368,6 +377,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.close();
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware) factory).stop();
+
// Reset initialized resources.
rmtClient = null;
}
@@ -391,13 +403,13 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** {@inheritDoc} */
@Override public boolean setReplication(Path f, short replication) throws IOException {
- return mode(f) == PROXY && secondaryFs.setReplication(f, replication);
+ return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
}
/** {@inheritDoc} */
@Override public void setTimes(Path f, long mtime, long atime) throws IOException {
if (mode(f) == PROXY)
- secondaryFs.setTimes(f, mtime, atime);
+ secondaryFileSystem().setTimes(f, mtime, atime);
else {
if (mtime == -1 && atime == -1)
return;
@@ -421,7 +433,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
A.notNull(p, "p");
if (mode(p) == PROXY)
- secondaryFs.setPermission(toSecondary(p), perm);
+ secondaryFileSystem().setPermission(toSecondary(p), perm);
else {
if (rmtClient.update(convert(p), permission(perm)) == null)
throw new IOException("Failed to set file permission (file not found?)" +
@@ -443,7 +455,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
if (mode(p) == PROXY)
- secondaryFs.setOwner(toSecondary(p), usr, grp);
+ secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null)
throw new IOException("Failed to set file permission (file not found?)" +
" [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
@@ -464,11 +476,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
IgfsMode mode = modeRslvr.resolveMode(path);
if (mode == PROXY) {
- FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
+ FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
if (clientLog.isLogEnabled()) {
// At this point we do not know file size, so we perform additional request to remote FS to get it.
- FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
+ FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
long size = status != null ? status.getLen() : -1;
@@ -543,8 +555,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
if (mode == PROXY) {
- FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize,
- replication, blockSize, progress, checksumOpt, createParent);
+ FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
+ replication, blockSize, progress);
if (clientLog.isLogEnabled()) {
long logId = IgfsLogger.nextId();
@@ -641,7 +653,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.logRename(srcPath, PROXY, dstPath);
- secondaryFs.renameInternal(toSecondary(src), toSecondary(dst));
+ secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
}
else {
if (clientLog.isLogEnabled())
@@ -671,7 +683,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.logDelete(path, PROXY, recursive);
- return secondaryFs.delete(toSecondary(f), recursive);
+ return secondaryFileSystem().delete(toSecondary(f), recursive);
}
boolean res = rmtClient.delete(path, recursive);
@@ -689,14 +701,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** {@inheritDoc} */
@Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
// Checksum has effect for secondary FS only.
- if (secondaryFs != null)
- secondaryFs.setVerifyChecksum(verifyChecksum);
+ if (factory != null)
+ secondaryFileSystem().setVerifyChecksum(verifyChecksum);
}
/** {@inheritDoc} */
@Override public FileChecksum getFileChecksum(Path f) throws IOException {
if (mode(f) == PROXY)
- return secondaryFs.getFileChecksum(f);
+ return secondaryFileSystem().getFileChecksum(f);
return null;
}
@@ -712,7 +724,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
IgfsMode mode = modeRslvr.resolveMode(path);
if (mode == PROXY) {
- FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
+ FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
if (arr == null)
throw new FileNotFoundException("File " + f + " does not exist.");
@@ -775,7 +787,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.logMakeDirectory(path, PROXY);
- secondaryFs.mkdir(toSecondary(f), perm, createParent);
+ secondaryFileSystem().mkdirs(toSecondary(f), perm);
}
else {
rmtClient.mkdirs(path, permission(perm));
@@ -797,7 +809,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
if (mode(f) == PROXY)
- return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
+ return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
else {
IgfsFile info = rmtClient.info(convert(f));
@@ -822,7 +834,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
if (modeRslvr.resolveMode(igfsPath) == PROXY)
- return secondaryFs.getFileBlockLocations(path, start, len);
+ return secondaryFileSystem().getFileBlockLocations(path, start, len);
else {
long now = System.currentTimeMillis();
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
* @return Secondary file system path.
*/
private Path toSecondary(Path path) {
- assert secondaryFs != null;
+ assert factory != null;
assert secondaryUri != null;
return convertPath(path, secondaryUri);
@@ -1045,4 +1057,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
public String user() {
return user;
}
+
+ /**
+ * Gets cached or creates a {@link FileSystem}.
+ *
+ * @return The secondary file system.
+ */
+ private FileSystem secondaryFileSystem() throws IOException{
+ assert factory != null;
+
+ return factory.get(user);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
deleted file mode 100644
index d5be074..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Encapsulates logic of secondary filesystem creation.
- */
-public class SecondaryFileSystemProvider {
- /** Configuration of the secondary filesystem, never null. */
- private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
-
- /** The secondary filesystem URI, never null. */
- private final URI uri;
-
- /**
- * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
- * specified either explicitly or in the configuration provided.
- *
- * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS"
- * property in the provided configuration.
- * @param secConfPath the secondary Fs path (file path on the local file system, optional).
- * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
- * @throws IOException
- */
- public SecondaryFileSystemProvider(final @Nullable String secUri,
- final @Nullable String secConfPath) throws IOException {
- if (secConfPath != null) {
- URL url = U.resolveIgniteUrl(secConfPath);
-
- if (url == null) {
- // If secConfPath is given, it should be resolvable:
- throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
- "(ensure that it exists locally and you have read access to it): " + secConfPath);
- }
-
- cfg.addResource(url);
- }
-
- // if secondary fs URI is not given explicitly, try to get it from the configuration:
- if (secUri == null)
- uri = FileSystem.getDefaultUri(cfg);
- else {
- try {
- uri = new URI(secUri);
- }
- catch (URISyntaxException use) {
- throw new IOException("Failed to resolve secondary file system URI: " + secUri);
- }
- }
-
- // Disable caching:
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
-
- cfg.setBoolean(prop, true);
- }
-
- /**
- * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs.
- * @throws IOException
- */
- public FileSystem createFileSystem(String userName) throws IOException {
- userName = IgfsUtils.fixUserName(userName);
-
- final FileSystem fileSys;
-
- try {
- fileSys = FileSystem.get(uri, cfg, userName);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
-
- return fileSys;
- }
-
- /**
- * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
- * @throws IOException in case of error.
- */
- public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
- userName = IgfsUtils.fixUserName(userName);
-
- String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-
- UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
-
- try {
- return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
- @Override public AbstractFileSystem run() throws IOException {
- return AbstractFileSystem.get(uri, cfg);
- }
- });
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", ie);
- }
- }
-
- /**
- * @return the secondary fs URI, never null.
- */
- public URI uri() {
- return uri;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
index 48ade79..1ecbee5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
@@ -39,7 +39,7 @@ public class HadoopFileSystemCacheUtils {
public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
return new HadoopLazyConcurrentMap<>(
new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
- @Override public FileSystem createValue(FsCacheKey key) {
+ @Override public FileSystem createValue(FsCacheKey key) throws IOException {
try {
assert key != null;
@@ -57,8 +57,10 @@ public class HadoopFileSystemCacheUtils {
return FileSystem.get(uri, cfg, key.user());
}
- catch (IOException | InterruptedException ioe) {
- throw new IgniteException(ioe);
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index 89eaf73..681cddb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.hadoop.fs;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
@@ -204,8 +205,8 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
*
* @param key the key to create value for
* @return the value.
- * @throws IgniteException on failure.
+ * @throws IOException On failure.
*/
- public V createValue(K key);
+ public V createValue(K key) throws IOException;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
index ea65464..10b1bcd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.igfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest;
@@ -74,12 +74,16 @@ public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest {
prepareConfiguration();
- IgniteHadoopIgfsSecondaryFileSystem second =
- new IgniteHadoopIgfsSecondaryFileSystem(secondaryUri, secondaryConfFullPath);
+ CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory();
- FileSystem fileSystem = second.fileSystem();
+ factory.setUri(secondaryUri);
+ factory.setConfigPaths(secondaryConfFullPath);
- igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(fileSystem);
+ IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem();
+
+ second.setFileSystemFactory(factory);
+
+ igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(factory);
return second;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
new file mode 100644
index 0000000..1d02f0f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import java.io.Externalizable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests for Hadoop file system factory.
+ */
+public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
+ /** Amount of "start" invocations */
+ private static final AtomicInteger START_CNT = new AtomicInteger();
+
+ /** Amount of "stop" invocations */
+ private static final AtomicInteger STOP_CNT = new AtomicInteger();
+
+ /** Path to secondary file system configuration. */
+ private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml";
+
+ /** IGFS path for DUAL mode. */
+ private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir");
+
+ /** IGFS path for PROXY mode. */
+ private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir");
+
+ /** IGFS path for DUAL mode. */
+ private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir");
+
+ /** IGFS path for PROXY mode. */
+ private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir");
+
+ /** Secondary IGFS. */
+ private IgfsEx secondary;
+
+ /** Primary IGFS. */
+ private IgfsEx primary;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ START_CNT.set(0);
+ STOP_CNT.set(0);
+
+ secondary = startSecondary();
+ primary = startPrimary();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ secondary = null;
+ primary = null;
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test custom factory.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public void testCustomFactory() throws Exception {
+ assert START_CNT.get() == 1;
+ assert STOP_CNT.get() == 0;
+
+ // Use IGFS directly.
+ primary.mkdirs(IGFS_PATH_DUAL);
+
+ assert primary.exists(IGFS_PATH_DUAL);
+ assert secondary.exists(IGFS_PATH_DUAL);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ primary.mkdirs(IGFS_PATH_PROXY);
+
+ return null;
+ }
+ }, IgfsInvalidPathException.class, null);
+
+ // Create remote instance.
+ FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration());
+
+ // Ensure lifecycle callback was invoked.
+ assert START_CNT.get() == 2;
+ assert STOP_CNT.get() == 0;
+
+ // Check file system operations.
+ assert fs.exists(PATH_DUAL);
+
+ assert fs.delete(PATH_DUAL, true);
+ assert !primary.exists(IGFS_PATH_DUAL);
+ assert !secondary.exists(IGFS_PATH_DUAL);
+ assert !fs.exists(PATH_DUAL);
+
+ assert fs.mkdirs(PATH_DUAL);
+ assert primary.exists(IGFS_PATH_DUAL);
+ assert secondary.exists(IGFS_PATH_DUAL);
+ assert fs.exists(PATH_DUAL);
+
+ assert fs.mkdirs(PATH_PROXY);
+ assert secondary.exists(IGFS_PATH_PROXY);
+ assert fs.exists(PATH_PROXY);
+
+ // Close file system and ensure that associated factory was notified.
+ fs.close();
+
+ assert START_CNT.get() == 2;
+ assert STOP_CNT.get() == 1;
+
+ // Stop primary node and ensure that base factory was notified.
+ G.stop(primary.context().kernalContext().grid().name(), true);
+
+ assert START_CNT.get() == 2;
+ assert STOP_CNT.get() == 2;
+ }
+
+ /**
+ * Start secondary IGFS.
+ *
+ * @return IGFS.
+ * @throws Exception If failed.
+ */
+ private static IgfsEx startSecondary() throws Exception {
+ return start("secondary", 11500, IgfsMode.PRIMARY, null);
+ }
+
+ /**
+ * Start primary IGFS.
+ *
+ * @return IGFS.
+ * @throws Exception If failed.
+ */
+ private static IgfsEx startPrimary() throws Exception {
+ // Prepare configuration.
+ Configuration conf = baseConfiguration();
+
+ conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/");
+
+ writeConfigurationToFile(conf);
+
+ // Configure factory.
+ TestFactory factory = new TestFactory();
+
+ factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/");
+ factory.setConfigPaths(SECONDARY_CFG_PATH);
+
+ // Configure file system.
+ IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem();
+
+ fs.setFileSystemFactory(factory);
+
+ // Start.
+ return start("primary", 10500, IgfsMode.PRIMARY, fs);
+ }
+
+ /**
+ * Start Ignite node with IGFS instance.
+ *
+ * @param name Node and IGFS name.
+ * @param endpointPort Endpoint port.
+ * @param dfltMode Default path mode.
+ * @param secondaryFs Secondary file system.
+ * @return Igfs instance.
+ */
+ private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs) {
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setHost("127.0.0.1");
+ endpointCfg.setPort(endpointPort);
+
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(name);
+ igfsCfg.setDefaultMode(dfltMode);
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(name);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ return (IgfsEx)G.start(cfg).fileSystem(name);
+ }
+
+ /**
+ * Create base FileSystem configuration.
+ *
+ * @return Configuration.
+ */
+ private static Configuration baseConfiguration() {
+ Configuration conf = new Configuration();
+
+ conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ return conf;
+ }
+
+ /**
+ * Write configuration to file.
+ *
+ * @param conf Configuration.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static void writeConfigurationToFile(Configuration conf) throws Exception {
+ final String path = U.getIgniteHome() + SECONDARY_CFG_PATH;
+
+ File file = new File(path);
+
+ file.delete();
+
+ assertFalse(file.exists());
+
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ conf.writeXml(fos);
+ }
+
+ assertTrue(file.exists());
+ }
+
+ /**
+ * Test factory.
+ */
+ private static class TestFactory extends CachingHadoopFileSystemFactory {
+ /**
+ * {@link Externalizable} support.
+ */
+ public TestFactory() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ START_CNT.incrementAndGet();
+
+ super.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ STOP_CNT.incrementAndGet();
+
+ super.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
index 608bd25..5b6fd81 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
@@ -34,55 +36,55 @@ import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
* Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance.
*/
public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFileSystemAdapter {
- /** The wrapped filesystem. */
- private final FileSystem fileSys;
+ /** File system factory. */
+ private final HadoopFileSystemFactory factory;
/**
* Constructor.
- * @param fs the filesystem to be wrapped.
+ * @param factory File system factory.
*/
- public HadoopFileSystemUniversalFileSystemAdapter(FileSystem fs) {
- this.fileSys = fs;
+ public HadoopFileSystemUniversalFileSystemAdapter(HadoopFileSystemFactory factory) {
+ assert factory != null;
+
+ this.factory = factory;
}
/** {@inheritDoc} */
- @Override public String name() {
- return fileSys.getUri().toString();
+ @Override public String name() throws IOException {
+ return get().getUri().toString();
}
/** {@inheritDoc} */
@Override public boolean exists(String path) throws IOException {
- return fileSys.exists(new Path(path));
+ return get().exists(new Path(path));
}
/** {@inheritDoc} */
@Override public boolean delete(String path, boolean recursive) throws IOException {
- boolean ok = fileSys.delete(new Path(path), recursive);
- return ok;
+ return get().delete(new Path(path), recursive);
}
/** {@inheritDoc} */
@Override public void mkdirs(String path) throws IOException {
- boolean ok = fileSys.mkdirs(new Path(path));
+ boolean ok = get().mkdirs(new Path(path));
if (!ok)
throw new IOException("Failed to mkdirs: " + path);
}
/** {@inheritDoc} */
@Override public void format() throws IOException {
- HadoopIgfsUtils.clear(fileSys);
+ HadoopIgfsUtils.clear(get());
}
/** {@inheritDoc} */
@Override public Map<String, String> properties(String path) throws IOException {
Path p = new Path(path);
- FileStatus status = fileSys.getFileStatus(p);
+ FileStatus status = get().getFileStatus(p);
Map<String,String> m = new HashMap<>(3); // max size == 4
m.put(IgfsEx.PROP_USER_NAME, status.getOwner());
-
m.put(IgfsEx.PROP_GROUP_NAME, status.getGroup());
FsPermission perm = status.getPermission();
@@ -95,7 +97,7 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
/** {@inheritDoc} */
@Override public InputStream openInputStream(String path) throws IOException {
- return fileSys.open(new Path(path));
+ return get().open(new Path(path));
}
/** {@inheritDoc} */
@@ -103,16 +105,27 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
Path p = new Path(path);
if (append)
- return fileSys.append(p);
+ return get().append(p);
else
- return fileSys.create(p, true/*overwrite*/);
+ return get().create(p, true/*overwrite*/);
}
/** {@inheritDoc} */
- @Override public <T> T getAdapter(Class<T> clazz) {
- if (clazz == FileSystem.class)
- return (T)fileSys;
+ @SuppressWarnings("unchecked")
+ @Override public <T> T unwrap(Class<T> cls) {
+ if (HadoopFileSystemFactory.class.isAssignableFrom(cls))
+ return (T)factory;
return null;
}
+
+ /**
+ * Create file system.
+ *
+ * @return File system.
+ * @throws IOException If failed.
+ */
+ private FileSystem get() throws IOException {
+ return factory.get(FileSystemConfiguration.DFLT_USER_NAME);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 4ddfb0d..d9b5d66 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -17,12 +17,6 @@
package org.apache.ignite.igfs;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -34,9 +28,9 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
import org.apache.ignite.internal.util.typedef.G;
@@ -48,6 +42,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Callable;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -173,12 +174,16 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
else
primaryConfFullPath = null;
- SecondaryFileSystemProvider provider =
- new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+ fac.setConfigPaths(primaryConfFullPath);
+ fac.setUri(primaryFsUriStr);
+
+ fac.start();
- primaryFs = provider.createFileSystem(null);
+ primaryFs = fac.get(null); //provider.createFileSystem(null);
- primaryFsUri = provider.uri();
+ primaryFsUri = primaryFs.getUri();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index d368955..6617127 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -17,29 +17,6 @@
package org.apache.ignite.igfs;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -59,6 +36,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx;
@@ -70,6 +48,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -79,6 +58,30 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ThreadLocalRandom8;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -380,9 +383,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
cfg.setPrefetchBlocks(1);
cfg.setDefaultMode(mode);
- if (mode != PRIMARY)
- cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
- SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
+ if (mode != PRIMARY) {
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+ fac.setUri(SECONDARY_URI);
+ fac.setConfigPaths(SECONDARY_CFG_PATH);
+
+ IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem();
+
+ sec.setFileSystemFactory(fac);
+ sec.setDefaultUserName(SECONDARY_FS_USER);
+
+ // NB: start() will be invoked upon IgfsImpl init.
+ cfg.setSecondaryFileSystem(sec);
+ }
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
@@ -398,7 +412,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
@Override public Object call() throws Exception {
return new IgniteHadoopFileSystem().getUri();
}
- }, IllegalStateException.class, "URI is null (was IgniteHadoopFileSystem properly initialized?).");
+ }, IllegalStateException.class,
+ "URI is null (was IgniteHadoopFileSystem properly initialized?)");
}
/** @throws Exception If failed. */
@@ -506,7 +521,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
// Ensure that IO is stopped when nobody else is need it.
fs.close();
- assertEquals(initSize - 1, cache.size());
+ assert initSize >= cache.size();
assert (Boolean)stopField.get(io);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7fb570/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 6c542b5..9092f32 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest;
import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
+import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest;
import org.apache.ignite.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest;
import org.apache.ignite.igfs.HadoopIgfsDualAsyncSelfTest;
import org.apache.ignite.igfs.HadoopIgfsDualSyncSelfTest;
@@ -113,6 +114,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName())));
[3/6] ignite git commit: IGNITE-2308: Fixed HadoopClassLoader
dependency resolution. This closes #391.
Posted by vo...@apache.org.
IGNITE-2308: Fixed HadoopClassLoader dependency resolution. This closes #391.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d58fcbf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d58fcbf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d58fcbf
Branch: refs/heads/ignite-1.5.2
Commit: 5d58fcbf40fdb9114e4cbb32b72dd9bce7fa38ca
Parents: 4cd3b3d
Author: iveselovskiy <iv...@gridgain.com>
Authored: Mon Jan 4 10:47:28 2016 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jan 6 09:26:22 2016 +0400
----------------------------------------------------------------------
.../processors/hadoop/HadoopClassLoader.java | 636 ++++++++++++++-----
.../hadoop/HadoopClassLoaderTest.java | 101 ++-
.../hadoop/deps/CircularWIthHadoop.java | 32 +
.../hadoop/deps/CircularWithoutHadoop.java | 27 +
.../processors/hadoop/deps/WithCast.java | 41 ++
.../hadoop/deps/WithClassAnnotation.java | 28 +
.../hadoop/deps/WithConstructorInvocation.java | 31 +
.../processors/hadoop/deps/WithExtends.java | 27 +
.../processors/hadoop/deps/WithField.java | 29 +
.../processors/hadoop/deps/WithImplements.java | 36 ++
.../hadoop/deps/WithIndirectField.java | 27 +
.../processors/hadoop/deps/WithInitializer.java | 33 +
.../processors/hadoop/deps/WithInnerClass.java | 31 +
.../hadoop/deps/WithLocalVariable.java | 38 ++
.../hadoop/deps/WithMethodAnnotation.java | 32 +
.../hadoop/deps/WithMethodArgument.java | 31 +
.../hadoop/deps/WithMethodCheckedException.java | 31 +
.../hadoop/deps/WithMethodInvocation.java | 31 +
.../hadoop/deps/WithMethodReturnType.java | 31 +
.../hadoop/deps/WithMethodRuntimeException.java | 31 +
.../processors/hadoop/deps/WithOuterClass.java | 38 ++
.../hadoop/deps/WithParameterAnnotation.java | 31 +
.../processors/hadoop/deps/WithStaticField.java | 29 +
.../hadoop/deps/WithStaticInitializer.java | 34 +
.../processors/hadoop/deps/Without.java | 25 +
.../testsuites/IgniteHadoopTestSuite.java | 3 +
26 files changed, 1279 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index f12af46..735133f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -30,7 +30,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon;
import org.apache.ignite.internal.processors.hadoop.v2.HadoopNativeCodeLoader;
@@ -40,13 +40,16 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.objectweb.asm.AnnotationVisitor;
+import org.objectweb.asm.Attribute;
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.ClassWriter;
import org.objectweb.asm.FieldVisitor;
+import org.objectweb.asm.Handle;
import org.objectweb.asm.Label;
import org.objectweb.asm.MethodVisitor;
import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
import org.objectweb.asm.commons.Remapper;
import org.objectweb.asm.commons.RemappingClassAdapter;
@@ -125,10 +128,14 @@ public class HadoopClassLoader extends URLClassLoader {
* @return {@code true} if we need to check this class.
*/
private static boolean isHadoopIgfs(String cls) {
- String ignitePackagePrefix = "org.apache.ignite";
- int len = ignitePackagePrefix.length();
+ String ignitePkgPrefix = "org.apache.ignite";
+
+ int len = ignitePkgPrefix.length();
- return cls.startsWith(ignitePackagePrefix) && (cls.indexOf("igfs.", len) != -1 || cls.indexOf(".fs.", len) != -1 || cls.indexOf("hadoop.", len) != -1);
+ return cls.startsWith(ignitePkgPrefix) && (
+ cls.indexOf("igfs.", len) != -1 ||
+ cls.indexOf(".fs.", len) != -1 ||
+ cls.indexOf("hadoop.", len) != -1);
}
/**
@@ -159,7 +166,7 @@ public class HadoopClassLoader extends URLClassLoader {
Boolean hasDeps = cache.get(name);
if (hasDeps == null) {
- hasDeps = hasExternalDependencies(name, new HashSet<String>());
+ hasDeps = hasExternalDependencies(name);
cache.put(name, hasDeps);
}
@@ -266,10 +273,30 @@ public class HadoopClassLoader extends URLClassLoader {
}
/**
+ * Check whether class has external dependencies on Hadoop.
+ *
* @param clsName Class name.
+ * @return {@code True} if class has external dependencies.
+ */
+ boolean hasExternalDependencies(String clsName) {
+ CollectingContext ctx = new CollectingContext();
+
+ ctx.annVisitor = new CollectingAnnotationVisitor(ctx);
+ ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor);
+ ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor);
+ ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor);
+
+ return hasExternalDependencies(clsName, ctx);
+ }
+
+ /**
+ * Check whether class has external dependencies on Hadoop.
+ *
+ * @param clsName Class name.
+ * @param ctx Context.
* @return {@code true} If the class has external dependencies.
*/
- boolean hasExternalDependencies(final String clsName, final Set<String> visited) {
+ boolean hasExternalDependencies(String clsName, CollectingContext ctx) {
if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external.
return true;
@@ -291,157 +318,14 @@ public class HadoopClassLoader extends URLClassLoader {
throw new RuntimeException("Failed to read class: " + clsName, e);
}
- visited.add(clsName);
-
- final AtomicBoolean hasDeps = new AtomicBoolean();
-
- rdr.accept(new ClassVisitor(Opcodes.ASM4) {
- AnnotationVisitor av = new AnnotationVisitor(Opcodes.ASM4) {
- // TODO
- };
-
- FieldVisitor fv = new FieldVisitor(Opcodes.ASM4) {
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) {
- onType(desc);
-
- return av;
- }
- };
-
- MethodVisitor mv = new MethodVisitor(Opcodes.ASM4) {
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) {
- onType(desc);
-
- return av;
- }
-
- @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) {
- onType(desc);
-
- return av;
- }
-
- @Override public AnnotationVisitor visitAnnotationDefault() {
- return av;
- }
-
- @Override public void visitFieldInsn(int i, String owner, String name, String desc) {
- onType(owner);
- onType(desc);
- }
-
- @Override public void visitFrame(int i, int i2, Object[] locTypes, int i3, Object[] stackTypes) {
- for (Object o : locTypes) {
- if (o instanceof String)
- onType((String)o);
- }
-
- for (Object o : stackTypes) {
- if (o instanceof String)
- onType((String)o);
- }
- }
-
- @Override public void visitLocalVariable(String name, String desc, String signature, Label lb,
- Label lb2, int i) {
- onType(desc);
- }
-
- @Override public void visitMethodInsn(int i, String owner, String name, String desc) {
- onType(owner);
- }
-
- @Override public void visitMultiANewArrayInsn(String desc, int dim) {
- onType(desc);
- }
-
- @Override public void visitTryCatchBlock(Label lb, Label lb2, Label lb3, String e) {
- onType(e);
- }
- };
-
- void onClass(String depCls) {
- assert validateClassName(depCls) : depCls;
-
- if (depCls.startsWith("java.")) // Filter out platform classes.
- return;
-
- if (visited.contains(depCls))
- return;
-
- Boolean res = cache.get(depCls);
-
- if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, visited)))
- hasDeps.set(true);
- }
-
- void onType(String type) {
- if (type == null)
- return;
-
- int off = 0;
-
- while (type.charAt(off) == '[')
- off++; // Handle arrays.
-
- if (off != 0)
- type = type.substring(off);
-
- if (type.length() == 1)
- return; // Get rid of primitives.
-
- if (type.charAt(type.length() - 1) == ';') {
- assert type.charAt(0) == 'L' : type;
-
- type = type.substring(1, type.length() - 1);
- }
-
- type = type.replace('/', '.');
-
- onClass(type);
- }
-
- @Override public void visit(int i, int i2, String name, String signature, String superName,
- String[] ifaces) {
- onType(superName);
-
- if (ifaces != null) {
- for (String iface : ifaces)
- onType(iface);
- }
- }
-
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- onType(desc);
-
- return av;
- }
-
- @Override public void visitInnerClass(String name, String outerName, String innerName, int i) {
- onType(name);
- }
-
- @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) {
- onType(desc);
+ ctx.visited.add(clsName);
- return fv;
- }
+ rdr.accept(ctx.clsVisitor, 0);
- @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature,
- String[] exceptions) {
- if (exceptions != null) {
- for (String e : exceptions)
- onType(e);
- }
-
- return mv;
- }
- }, 0);
-
- if (hasDeps.get()) // We already know that we have dependencies, no need to check parent.
+ if (ctx.found) // We already know that we have dependencies, no need to check parent.
return true;
- // Here we are known to not have any dependencies but possibly we have a parent which have them.
+ // Here we are known to not have any dependencies but possibly we have a parent which has them.
int idx = clsName.lastIndexOf('$');
if (idx == -1) // No parent class.
@@ -449,13 +333,13 @@ public class HadoopClassLoader extends URLClassLoader {
String parentCls = clsName.substring(0, idx);
- if (visited.contains(parentCls))
+ if (ctx.visited.contains(parentCls))
return false;
Boolean res = cache.get(parentCls);
if (res == null)
- res = hasExternalDependencies(parentCls, visited);
+ res = hasExternalDependencies(parentCls, ctx);
return res;
}
@@ -616,4 +500,446 @@ public class HadoopClassLoader extends URLClassLoader {
public String name() {
return name;
}
+
+ /**
+ * Context for dependencies collection.
+ */
+ private class CollectingContext {
+ /** Visited classes. */
+ private final Set<String> visited = new HashSet<>();
+
+ /** Whether dependency found. */
+ private boolean found;
+
+ /** Annotation visitor. */
+ private AnnotationVisitor annVisitor;
+
+ /** Method visitor. */
+ private MethodVisitor mthdVisitor;
+
+ /** Field visitor. */
+ private FieldVisitor fldVisitor;
+
+ /** Class visitor. */
+ private ClassVisitor clsVisitor;
+
+ /**
+ * Processes a method descriptor
+ * @param methDesc The method desc String.
+ */
+ void onMethodsDesc(final String methDesc) {
+ // Process method return type:
+ onType(Type.getReturnType(methDesc));
+
+ if (found)
+ return;
+
+ // Process method argument types:
+ for (Type t: Type.getArgumentTypes(methDesc)) {
+ onType(t);
+
+ if (found)
+ return;
+ }
+ }
+
+ /**
+ * Processes dependencies of a class.
+ *
+ * @param depCls The class name as dot-notated FQN.
+ */
+ void onClass(final String depCls) {
+ assert depCls.indexOf('/') == -1 : depCls; // class name should be fully converted to dot notation.
+ assert depCls.charAt(0) != 'L' : depCls;
+ assert validateClassName(depCls) : depCls;
+
+ if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // Filter out platform classes.
+ return;
+
+ if (visited.contains(depCls))
+ return;
+
+ Boolean res = cache.get(depCls);
+
+ if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, this)))
+ found = true;
+ }
+
+ /**
+ * Analyses dependencies of given type.
+ *
+ * @param t The type to process.
+ */
+ void onType(Type t) {
+ if (t == null)
+ return;
+
+ int sort = t.getSort();
+
+ switch (sort) {
+ case Type.ARRAY:
+ onType(t.getElementType());
+
+ break;
+
+ case Type.OBJECT:
+ onClass(t.getClassName());
+
+ break;
+ }
+ }
+
+ /**
+ * Analyses dependencies of given object type.
+ *
+ * @param objType The object type to process.
+ */
+ void onInternalTypeName(String objType) {
+ if (objType == null)
+ return;
+
+ assert objType.length() > 1 : objType;
+
+ if (objType.charAt(0) == '[')
+ // handle array. In this case this is a type descriptor notation, like "[Ljava/lang/Object;"
+ onType(objType);
+ else {
+ assert objType.indexOf('.') == -1 : objType; // Must be slash-separated FQN.
+
+ String clsName = objType.replace('/', '.'); // Convert it to dot notation.
+
+ onClass(clsName); // Process.
+ }
+ }
+
+ /**
+ * Type description analyser.
+ *
+ * @param desc The description.
+ */
+ void onType(String desc) {
+ if (!F.isEmpty(desc)) {
+ if (desc.length() <= 1)
+ return; // Optimization: filter out primitive types in early stage.
+
+ Type t = Type.getType(desc);
+
+ onType(t);
+ }
+ }
+ }
+
+ /**
+ * Annotation visitor.
+ */
+ private static class CollectingAnnotationVisitor extends AnnotationVisitor {
+ /** */
+ final CollectingContext ctx;
+
+ /**
+ * Annotation visitor.
+ *
+ * @param ctx The collector.
+ */
+ CollectingAnnotationVisitor(CollectingContext ctx) {
+ super(Opcodes.ASM4);
+
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitAnnotation(String name, String desc) {
+ if (ctx.found)
+ return null;
+
+ ctx.onType(desc);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitEnum(String name, String desc, String val) {
+ if (ctx.found)
+ return;
+
+ ctx.onType(desc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitArray(String name) {
+ return ctx.found ? null : this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visit(String name, Object val) {
+ if (ctx.found)
+ return;
+
+ if (val instanceof Type)
+ ctx.onType((Type)val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitEnd() {
+ // No-op.
+ }
+ }
+
+ /**
+ * Field visitor.
+ */
+ private static class CollectingFieldVisitor extends FieldVisitor {
+ /** Collector. */
+ private final CollectingContext ctx;
+
+ /** Annotation visitor. */
+ private final AnnotationVisitor av;
+
+ /**
+ * Constructor.
+ */
+ CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) {
+ super(Opcodes.ASM4);
+
+ this.ctx = ctx;
+ this.av = av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ if (ctx.found)
+ return null;
+
+ ctx.onType(desc);
+
+ return ctx.found ? null : av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitAttribute(Attribute attr) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitEnd() {
+ // No-op.
+ }
+ }
+
+ /**
+ * Class visitor.
+ */
+ private static class CollectingClassVisitor extends ClassVisitor {
+ /** Collector. */
+ private final CollectingContext ctx;
+
+ /** Annotation visitor. */
+ private final AnnotationVisitor av;
+
+ /** Method visitor. */
+ private final MethodVisitor mv;
+
+ /** Field visitor. */
+ private final FieldVisitor fv;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Collector.
+ * @param av Annotation visitor.
+ * @param mv Method visitor.
+ * @param fv Field visitor.
+ */
+ CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, MethodVisitor mv, FieldVisitor fv) {
+ super(Opcodes.ASM4);
+
+ this.ctx = ctx;
+ this.av = av;
+ this.mv = mv;
+ this.fv = fv;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visit(int i, int i2, String name, String signature, String superName, String[] ifaces) {
+ if (ctx.found)
+ return;
+
+ ctx.onInternalTypeName(superName);
+
+ if (ctx.found)
+ return;
+
+ if (ifaces != null) {
+ for (String iface : ifaces) {
+ ctx.onInternalTypeName(iface);
+
+ if (ctx.found)
+ return;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ if (ctx.found)
+ return null;
+
+ ctx.onType(desc);
+
+ return ctx.found ? null : av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitInnerClass(String name, String outerName, String innerName, int i) {
+ if (ctx.found)
+ return;
+
+ ctx.onInternalTypeName(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) {
+ if (ctx.found)
+ return null;
+
+ ctx.onType(desc);
+
+ return ctx.found ? null : fv;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature,
+ String[] exceptions) {
+ if (ctx.found)
+ return null;
+
+ ctx.onMethodsDesc(desc);
+
+ // Process declared method exceptions:
+ if (exceptions != null) {
+ for (String e : exceptions)
+ ctx.onInternalTypeName(e);
+ }
+
+ return ctx.found ? null : mv;
+ }
+ }
+
+ /**
+ * Method visitor.
+ */
+ private static class CollectingMethodVisitor extends MethodVisitor {
+ /** Collector. */
+ private final CollectingContext ctx;
+
+ /** Annotation visitor. */
+ private final AnnotationVisitor av;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Collector.
+ * @param av Annotation visitor.
+ */
+ private CollectingMethodVisitor(CollectingContext ctx, AnnotationVisitor av) {
+ super(Opcodes.ASM4);
+
+ this.ctx = ctx;
+ this.av = av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ if (ctx.found)
+ return null;
+
+ ctx.onType(desc);
+
+ return ctx.found ? null : av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) {
+ if (ctx.found)
+ return null;
+
+ ctx.onType(desc);
+
+ return ctx.found ? null : av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AnnotationVisitor visitAnnotationDefault() {
+ return ctx.found ? null : av;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitFieldInsn(int opcode, String owner, String name, String desc) {
+ if (ctx.found)
+ return;
+
+ ctx.onInternalTypeName(owner);
+
+ if (ctx.found)
+ return;
+
+ ctx.onType(desc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitInvokeDynamicInsn(String name, String desc, Handle bsm, Object... bsmArgs) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitFrame(int type, int nLoc, Object[] locTypes, int nStack, Object[] stackTypes) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitLocalVariable(String name, String desc, String signature, Label lb,
+ Label lb2, int i) {
+ if (ctx.found)
+ return;
+
+ ctx.onType(desc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitMethodInsn(int i, String owner, String name, String desc) {
+ if (ctx.found)
+ return;
+
+ ctx.onInternalTypeName(owner);
+
+ if (ctx.found)
+ return;
+
+ ctx.onMethodsDesc(desc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitMultiANewArrayInsn(String desc, int dim) {
+ if (ctx.found)
+ return;
+
+ ctx.onType(desc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitTryCatchBlock(Label start, Label end, Label hndl, String typeStr) {
+ if (ctx.found)
+ return;
+
+ ctx.onInternalTypeName(typeStr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitTypeInsn(int opcode, String type) {
+ if (ctx.found)
+ return;
+
+ ctx.onInternalTypeName(type);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
index 085dd45..55fac2c 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
@@ -17,53 +17,94 @@
package org.apache.ignite.internal.processors.hadoop;
+import javax.security.auth.AuthPermission;
import junit.framework.TestCase;
-import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.internal.processors.hadoop.deps.CircularWIthHadoop;
+import org.apache.ignite.internal.processors.hadoop.deps.CircularWithoutHadoop;
+import org.apache.ignite.internal.processors.hadoop.deps.WithIndirectField;
+import org.apache.ignite.internal.processors.hadoop.deps.WithCast;
+import org.apache.ignite.internal.processors.hadoop.deps.WithClassAnnotation;
+import org.apache.ignite.internal.processors.hadoop.deps.WithConstructorInvocation;
+import org.apache.ignite.internal.processors.hadoop.deps.WithMethodCheckedException;
+import org.apache.ignite.internal.processors.hadoop.deps.WithMethodRuntimeException;
+import org.apache.ignite.internal.processors.hadoop.deps.WithExtends;
+import org.apache.ignite.internal.processors.hadoop.deps.WithField;
+import org.apache.ignite.internal.processors.hadoop.deps.WithImplements;
+import org.apache.ignite.internal.processors.hadoop.deps.WithInitializer;
+import org.apache.ignite.internal.processors.hadoop.deps.WithInnerClass;
+import org.apache.ignite.internal.processors.hadoop.deps.WithLocalVariable;
+import org.apache.ignite.internal.processors.hadoop.deps.WithMethodAnnotation;
+import org.apache.ignite.internal.processors.hadoop.deps.WithMethodInvocation;
+import org.apache.ignite.internal.processors.hadoop.deps.WithMethodArgument;
+import org.apache.ignite.internal.processors.hadoop.deps.WithMethodReturnType;
+import org.apache.ignite.internal.processors.hadoop.deps.WithOuterClass;
+import org.apache.ignite.internal.processors.hadoop.deps.WithParameterAnnotation;
+import org.apache.ignite.internal.processors.hadoop.deps.WithStaticField;
+import org.apache.ignite.internal.processors.hadoop.deps.WithStaticInitializer;
+import org.apache.ignite.internal.processors.hadoop.deps.Without;
/**
- *
+ * Tests for Hadoop classloader.
*/
public class HadoopClassLoaderTest extends TestCase {
/** */
- HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
+ final HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
/**
* @throws Exception If failed.
*/
public void testClassLoading() throws Exception {
- assertNotSame(Test1.class, ldr.loadClass(Test1.class.getName()));
- assertNotSame(Test2.class, ldr.loadClass(Test2.class.getName()));
- assertSame(Test3.class, ldr.loadClass(Test3.class.getName()));
- }
+ assertNotSame(CircularWIthHadoop.class, ldr.loadClass(CircularWIthHadoop.class.getName()));
+ assertNotSame(CircularWithoutHadoop.class, ldr.loadClass(CircularWithoutHadoop.class.getName()));
-// public void testDependencySearch() {
-// assertTrue(ldr.hasExternalDependencies(Test1.class.getName(), new HashSet<String>()));
-// assertTrue(ldr.hasExternalDependencies(Test2.class.getName(), new HashSet<String>()));
-// }
+ assertSame(Without.class, ldr.loadClass(Without.class.getName()));
+ }
/**
- *
+ * Test dependency search.
*/
- private static class Test1 {
- /** */
- Test2 t2;
+ public void testDependencySearch() {
+ // Positive cases:
+ final Class[] positiveClasses = {
+ Configuration.class,
+ HadoopUtils.class,
+ WithStaticField.class,
+ WithCast.class,
+ WithClassAnnotation.class,
+ WithConstructorInvocation.class,
+ WithMethodCheckedException.class,
+ WithMethodRuntimeException.class,
+ WithExtends.class,
+ WithField.class,
+ WithImplements.class,
+ WithInitializer.class,
+ WithInnerClass.class,
+ WithOuterClass.InnerNoHadoop.class,
+ WithLocalVariable.class,
+ WithMethodAnnotation.class,
+ WithMethodInvocation.class,
+ WithMethodArgument.class,
+ WithMethodReturnType.class,
+ WithParameterAnnotation.class,
+ WithStaticField.class,
+ WithStaticInitializer.class,
+ WithIndirectField.class,
+ CircularWIthHadoop.class,
+ CircularWithoutHadoop.class,
+ };
- /** */
- Job[][] jobs = new Job[4][4];
- }
+ for (Class c: positiveClasses)
+ assertTrue(c.getName(), ldr.hasExternalDependencies(c.getName()));
- /**
- *
- */
- private static abstract class Test2 {
- /** */
- abstract Test1 t1();
- }
+ // Negative cases:
+ final Class[] negativeClasses = {
+ Object.class,
+ AuthPermission.class,
+ Without.class,
+ };
- /**
- *
- */
- private static class Test3 {
- // No-op.
+ for (Class c: negativeClasses)
+ assertFalse(c.getName(), ldr.hasExternalDependencies(c.getName()));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWIthHadoop.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWIthHadoop.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWIthHadoop.java
new file mode 100644
index 0000000..c3aa7d9
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWIthHadoop.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Class has a direct Hadoop dependency and a circular dependency on another class.
+ */
+@SuppressWarnings("unused")
+public class CircularWIthHadoop {
+ /** */
+ private Job[][] jobs = new Job[4][4];
+
+ /** */
+ private CircularWithoutHadoop y;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWithoutHadoop.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWithoutHadoop.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWithoutHadoop.java
new file mode 100644
index 0000000..93d659c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/CircularWithoutHadoop.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+/**
+ * Does not have direct Hadoop dependency, but has a circular
+ */
+@SuppressWarnings("unused")
+public class CircularWithoutHadoop {
+ /** */
+ private CircularWIthHadoop x;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithCast.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithCast.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithCast.java
new file mode 100644
index 0000000..5b1e8e0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithCast.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Class contains casting to a Hadoop type.
+ */
+@SuppressWarnings("unused")
+public abstract class WithCast<T> {
+ /** */
+ public abstract T create();
+
+ /** */
+ public void consume(T t) {
+ // noop
+ }
+
+ /** */
+ void test(WithCast<FileSystem> c) {
+ FileSystem fs = c.create();
+
+ c.consume(fs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithClassAnnotation.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithClassAnnotation.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithClassAnnotation.java
new file mode 100644
index 0000000..a9ecae0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithClassAnnotation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Class has Hadoop annotation.
+ */
+@SuppressWarnings("unused")
+@InterfaceAudience.Public
+public class WithClassAnnotation {
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithConstructorInvocation.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithConstructorInvocation.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithConstructorInvocation.java
new file mode 100644
index 0000000..98c8991
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithConstructorInvocation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Invokes a Hadoop type constructor.
+ */
+@SuppressWarnings("unused")
+public class WithConstructorInvocation {
+ /** */
+ private void foo() {
+ Object x = new Configuration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithExtends.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithExtends.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithExtends.java
new file mode 100644
index 0000000..80c99e1
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithExtends.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.fs.LocalFileSystem;
+
+/**
+ * Class extends a Hadoop class.
+ */
+public class WithExtends extends LocalFileSystem {
+ // noop
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithField.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithField.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithField.java
new file mode 100644
index 0000000..dd979db
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Has a Hadoop field.
+ */
+@SuppressWarnings("unused")
+public class WithField {
+ /** */
+ private Configuration conf;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithImplements.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithImplements.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithImplements.java
new file mode 100644
index 0000000..c2d8e5b
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithImplements.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements a Hadoop interface.
+ */
+public class WithImplements implements Configurable {
+ /** {@inheritDoc} */
+ @Override public void setConf(Configuration conf) {
+ // noop
+ }
+
+ /** {@inheritDoc} */
+ @Override public Configuration getConf() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithIndirectField.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithIndirectField.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithIndirectField.java
new file mode 100644
index 0000000..ce078f1
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithIndirectField.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+/**
+ * Has a unidirected dependency on Hadoop-dependent class.
+ */
+@SuppressWarnings("unused")
+public class WithIndirectField {
+ /** */
+ WithField x;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInitializer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInitializer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInitializer.java
new file mode 100644
index 0000000..360986c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInitializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+/**
+ * Has a field initialized with an expression invoking Hadoop method.
+ */
+
+@SuppressWarnings({"ConstantConditions", "unused"})
+public class WithInitializer {
+ /** */
+ private final Object x = org.apache.hadoop.fs.FileSystem.getDefaultUri(null);
+
+ /** */
+ WithInitializer() throws Exception {
+ // noop
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInnerClass.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInnerClass.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInnerClass.java
new file mode 100644
index 0000000..4a5a49c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithInnerClass.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * Has a *static* inner class depending on Hadoop.
+ */
+@SuppressWarnings("unused")
+public class WithInnerClass {
+ /** */
+ private static abstract class Foo implements Configurable {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithLocalVariable.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithLocalVariable.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithLocalVariable.java
new file mode 100644
index 0000000..ea4a5de
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithLocalVariable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Has a local variable of Hadoop type.
+ */
+@SuppressWarnings({"unused", "ConstantConditions"})
+public class WithLocalVariable {
+ /** */
+ void foo() {
+ Configuration c = null;
+
+ moo(c);
+ }
+
+ /** */
+ void moo(Object x) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodAnnotation.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodAnnotation.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodAnnotation.java
new file mode 100644
index 0000000..ff9fbe0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodAnnotation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Method has a Hadoop annotation.
+ */
+@SuppressWarnings("unused")
+public class WithMethodAnnotation {
+ /** */
+ @InterfaceStability.Unstable
+ void foo() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodArgument.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodArgument.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodArgument.java
new file mode 100644
index 0000000..7f639e4
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodArgument.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Contains a formal parameter of Hadoop type.
+ */
+@SuppressWarnings("unused")
+public class WithMethodArgument {
+ /** */
+ protected void paramaterMethod(Configuration c) {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodCheckedException.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodCheckedException.java
new file mode 100644
index 0000000..8fd12ae
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodCheckedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.fs.ChecksumException;
+
+/**
+ * Method declares a checked Hadoop Exception.
+ */
+@SuppressWarnings("unused")
+public class WithMethodCheckedException {
+ /** */
+ void foo() throws ChecksumException {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodInvocation.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodInvocation.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodInvocation.java
new file mode 100644
index 0000000..de8b306
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodInvocation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Method contains a Hadoop type method invocation.
+ */
+@SuppressWarnings("unused")
+public class WithMethodInvocation {
+ /** */
+ void foo(FileSystem fs) {
+ fs.getChildFileSystems();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodReturnType.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodReturnType.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodReturnType.java
new file mode 100644
index 0000000..0e0ea72
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodReturnType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Contains a method return value of Hadoop type.
+ */
+@SuppressWarnings("unused")
+public class WithMethodReturnType {
+ /** */
+ FileSystem fsMethod() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodRuntimeException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodRuntimeException.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodRuntimeException.java
new file mode 100644
index 0000000..dcd471c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithMethodRuntimeException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+
+/**
+ * Method declares a runtime Hadoop Exception.
+ */
+@SuppressWarnings("unused")
+public class WithMethodRuntimeException {
+ /** */
+ void foo() throws HadoopIllegalArgumentException {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithOuterClass.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithOuterClass.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithOuterClass.java
new file mode 100644
index 0000000..cae1da7
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithOuterClass.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Outer class depends on Hadoop, but Inner *static* one does not.
+ */
+@SuppressWarnings("unused")
+public class WithOuterClass {
+ /** */
+ Configuration c;
+
+ /** */
+ public static class InnerNoHadoop {
+ /** */
+ int x;
+
+ /** */
+ void foo() {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithParameterAnnotation.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithParameterAnnotation.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithParameterAnnotation.java
new file mode 100644
index 0000000..9d3414e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithParameterAnnotation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Has a paramater annotated with a Hadoop annotation.
+ */
+@SuppressWarnings("unused")
+public class WithParameterAnnotation {
+ /** */
+ void foo(@InterfaceStability.Stable Object annotatedParam) {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticField.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticField.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticField.java
new file mode 100644
index 0000000..301b912
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Has a static field of Hadoop type.
+ */
+@SuppressWarnings("unused")
+public class WithStaticField {
+ /** */
+ static FileSystem fs;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticInitializer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticInitializer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticInitializer.java
new file mode 100644
index 0000000..e0fc2f3
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/WithStaticInitializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Uses Hadoop type in a static initializer.
+ */
+@SuppressWarnings("unused")
+public class WithStaticInitializer {
+ /** */
+ static final List x;
+
+ static {
+ x = FileSystem.getAllStatistics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/Without.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/Without.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/Without.java
new file mode 100644
index 0000000..ab84740
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/deps/Without.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.deps;
+
+/**
+ * Class that does not anyhow depend on Hadoop.
+ */
+public class Without {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d58fcbf/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 6641bc8..1831085 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -54,6 +54,7 @@ import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfT
import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest;
import org.apache.ignite.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest;
import org.apache.ignite.igfs.IgniteHadoopFileSystemSecondaryModeSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoaderTest;
import org.apache.ignite.internal.processors.hadoop.HadoopCommandLineTest;
import org.apache.ignite.internal.processors.hadoop.HadoopDefaultMapReducePlannerSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopFileSystemsTest;
@@ -95,6 +96,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualSyncSelfTest.class.getName())));