You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/01 19:24:40 UTC
[49/50] incubator-ignite git commit: #[IGNITE-218]: tests for MapRed
part.
#[IGNITE-218]: tests for MapRed part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc87c4df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc87c4df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc87c4df
Branch: refs/heads/ignite-218
Commit: bc87c4df4ddbc85197c7b812341437987bfe7010
Parents: 04653a0
Author: iveselovskiy <iv...@gridgain.com>
Authored: Mon Jun 1 20:22:25 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Mon Jun 1 20:22:25 2015 +0300
----------------------------------------------------------------------
config/hadoop/default-config.xml | 2 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 45 ++--
.../hadoop/HadoopAbstractSelfTest.java | 18 +-
.../processors/hadoop/HadoopMapReduceTest.java | 231 ++++++++++++-------
.../hadoop/HadoopTaskExecutionSelfTest.java | 2 +-
5 files changed, 197 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/config/hadoop/default-config.xml
----------------------------------------------------------------------
diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml
index c1e4855..54073fc 100644
--- a/config/hadoop/default-config.xml
+++ b/config/hadoop/default-config.xml
@@ -90,7 +90,7 @@
Configuration of Ignite node.
-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
- <!-- Temporary workaround for tests: -->
+ <!-- TODO: Temporary workaround for tests: -->
<property name="localHost" value="127.0.0.1"/>
<!--
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 328120b..ec4a26f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -353,10 +353,21 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** {@inheritDoc} */
@Override public void close() throws IOException {
- if (cacheEnabled && get(getUri(), getConf()) == this)
- return;
+ if (closeGuard.compareAndSet(false, true)) {
+ if (cacheEnabled) {
+ FileSystem cached = get(getUri(), getConf());
- close0();
+ if (cached == this)
+ return;
+ else {
+ X.println("### Cache enabled, but this file system is not found in the cache: " +
+ " this = " + this + ", user =" + this.user +
+ " cached = " + cached + ", user =" + ((IgniteHadoopFileSystem)cached).user);
+ }
+ }
+
+ close0();
+ }
}
/**
@@ -365,27 +376,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @throws IOException If failed.
*/
private void close0() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- if (LOG.isDebugEnabled())
- LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+ if (LOG.isDebugEnabled())
+ LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
- if (rmtClient == null)
- return;
+ if (rmtClient == null)
+ return;
- super.close();
+ super.close();
- rmtClient.close(false);
+ rmtClient.close(false);
- if (clientLog.isLogEnabled())
- clientLog.close();
+ if (clientLog.isLogEnabled())
+ clientLog.close();
- if (secondaryFs != null)
- U.closeQuiet(secondaryFs);
+ if (secondaryFs != null)
+ U.closeQuiet(secondaryFs);
- // Reset initialized resources.
- uri = null;
- rmtClient = null;
- }
+ // Reset initialized resources.
+ uri = null;
+ rmtClient = null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index f41eb17..7aa2bde 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -61,6 +61,22 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
/** Initial REST port. */
private int restPort = REST_PORT;
+ /** Secondary file system REST endpoint configuration map. */
+ protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+ static {
+// PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+//
+// PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+// PRIMARY_REST_CFG.setPort(10500);
+
+ SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+ SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+ SECONDARY_REST_CFG.setPort(11500);
+ }
+
+
/** Initial classpath. */
private static String initCp;
@@ -132,7 +148,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
/**
* @return IGFS configuration.
*/
- public FileSystemConfiguration igfsConfiguration() {
+ public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration cfg = new FileSystemConfiguration();
cfg.setName(igfsName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 7d09433..b0a098d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,32 +24,102 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
/**
* Test of whole cycle of map-reduce processing via Job tracker.
*/
public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+ /** IGFS block size. */
+ protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+ /** Amount of blocks to prefetch. */
+ protected static final int PREFETCH_BLOCKS = 1;
+
+ /** Amount of sequential block reads before prefetch is triggered. */
+ protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+ /** Secondary file system URI. */
+ protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+ /** Secondary file system configuration path. */
+ protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+ protected static final String USER = "vasya";
+
+ protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+ /** */
+ protected Ignite igniteSecondary;
+
+ /** */
+ protected IgfsSecondaryFileSystem secondaryFs;
+
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
}
/**
+ *
+ * @param p
+ * @return
+ */
+ private static String getOwner(IgfsEx i, IgfsPath p) {
+ return i.info(p).property(IgfsEx.PROP_USER_NAME);
+ }
+
+ /**
+ *
+ * @param secFs
+ * @param p
+ * @return
+ */
+ private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+ return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+ @Override public String apply() {
+ return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+ }
+ });
+ }
+
+ /**
+ *
+ * @param p
+ */
+ private void checkOwner(IgfsPath p) {
+ String ownerPrim = getOwner(igfs, p);
+ assertEquals(USER, ownerPrim);
+
+ String ownerSec = getOwnerSecondary(secondaryFs, p);
+ assertEquals(USER, ownerSec);
+ }
+
+ /**
* Tests whole job execution with all phases in all combination of new and old versions of API.
* @throws Exception If fails.
*/
@@ -71,8 +141,11 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
JobConf jobConf = new JobConf();
+ //jobConf.setBoolean("fs.igfs.impl.disable.cache", true); // avoid hangup in shutdown hook
+ // when fs.close() causes the fs to be created again. // This does not work, why?????
+
jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
- jobConf.setUser("yyy");
+ jobConf.setUser(USER);
jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
//To split into about 40 items for v2
@@ -108,6 +181,10 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+ checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+ checkOwner(new IgfsPath(outFile));
+
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
useNewReducer,
"blue\t200000\n" +
@@ -185,7 +262,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
}
}
- final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+ final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
@@ -216,95 +293,89 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
}
}
-// /**
-// * Startup secondary file system.
-// *
-// * @throws Exception If failed.
-// */
-// private void startUpSecondary() throws Exception {
-// FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-//
-// igfsCfg.setDataCacheName("partitioned");
-// igfsCfg.setMetaCacheName("replicated");
-// igfsCfg.setName("igfs-secondary");
-// igfsCfg.setBlockSize(512 * 1024);
-// igfsCfg.setDefaultMode(PRIMARY);
-//
-// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-//
-// endpointCfg.setType(IgfsIpcEndpointType.TCP);
-// endpointCfg.setPort(11500);
-//
-// igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-//
-// CacheConfiguration cacheCfg = defaultCacheConfiguration();
-//
-// cacheCfg.setName("partitioned");
-// cacheCfg.setCacheMode(PARTITIONED);
-// cacheCfg.setNearConfiguration(null);
-// cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-// cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
-// cacheCfg.setBackups(0);
-// cacheCfg.setAtomicityMode(TRANSACTIONAL);
-//
-// CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-//
-// metaCacheCfg.setName("replicated1");
-// metaCacheCfg.setCacheMode(REPLICATED);
-// metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-// metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-//
-// IgniteConfiguration cfg = new IgniteConfiguration();
-//
-// cfg.setGridName("igfs-grid-secondary");
-//
-// TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-//
-// discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-//
-// cfg.setDiscoverySpi(discoSpi);
-// cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
-// cfg.setFileSystemConfiguration(igfsCfg);
-//
-// cfg.setLocalHost("127.0.0.1");
-//
-// G.start(cfg);
-// }
-
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
-// startUpSecondary();
+ igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
super.beforeTest();
}
/**
- * @return IGFS configuration.
+ * Start grid with IGFS.
+ *
+ * @param gridName Grid name.
+ * @param igfsName IGFS name
+ * @param mode IGFS mode.
+ * @param secondaryFs Secondary file system (optional).
+ * @param restCfg Rest configuration string (optional).
+ * @return Started grid instance.
+ * @throws Exception If failed.
*/
- @Override public FileSystemConfiguration igfsConfiguration() {
+ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(igfsName);
+ igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+ igfsCfg.setDefaultMode(mode);
+ igfsCfg.setIpcEndpointConfiguration(restCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+ igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+ igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setNearConfiguration(null);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(gridName);
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ return G.start(cfg);
+ }
+
+ /**
+ * @return IGFS configuration.
+ */
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration fsCfg = super.igfsConfiguration();
-//
-// fsCfg.setName("igfs-secondary");
-// fsCfg.setDefaultMode(PRIMARY);
-//
-// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-//
-// endpointCfg.setType(IgfsIpcEndpointType.TCP);
-// endpointCfg.setPort(11500);
-//
-// fsCfg.setIpcEndpointConfiguration(endpointCfg);
-//
-// try {
-//
-// fsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
-// "igfs://igfs-secondary:igfs-grid-secondary@127.0.0.1:11500/",
-// "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
-// }
-// catch (Exception e) {
-// throw new IgniteException(e);
-// }
+
+ secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+ fsCfg.setSecondaryFileSystem(secondaryFs);
return fsCfg;
}
+
+// /** {@inheritDoc} */
+// @Override protected long getTestTimeout() {
+// return 30 * 60 * 1000; // TODO: for testing
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
/** {@inheritDoc} */
- @Override public FileSystemConfiguration igfsConfiguration() {
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration cfg = super.igfsConfiguration();
cfg.setFragmentizerEnabled(false);