You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/01 19:24:40 UTC

[49/50] incubator-ignite git commit: #[IGNITE-218]: tests for MapRed part.

#[IGNITE-218]: tests for MapRed part.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc87c4df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc87c4df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc87c4df

Branch: refs/heads/ignite-218
Commit: bc87c4df4ddbc85197c7b812341437987bfe7010
Parents: 04653a0
Author: iveselovskiy <iv...@gridgain.com>
Authored: Mon Jun 1 20:22:25 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Mon Jun 1 20:22:25 2015 +0300

----------------------------------------------------------------------
 config/hadoop/default-config.xml                |   2 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  45 ++--
 .../hadoop/HadoopAbstractSelfTest.java          |  18 +-
 .../processors/hadoop/HadoopMapReduceTest.java  | 231 ++++++++++++-------
 .../hadoop/HadoopTaskExecutionSelfTest.java     |   2 +-
 5 files changed, 197 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/config/hadoop/default-config.xml
----------------------------------------------------------------------
diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml
index c1e4855..54073fc 100644
--- a/config/hadoop/default-config.xml
+++ b/config/hadoop/default-config.xml
@@ -90,7 +90,7 @@
         Configuration of Ignite node.
     -->
     <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
-        <!-- Temporary workaround for tests: -->
+        <!-- TODO: Temporary workaround for tests: -->
         <property name="localHost" value="127.0.0.1"/>
 
         <!--

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 328120b..ec4a26f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -353,10 +353,21 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
-        if (cacheEnabled && get(getUri(), getConf()) == this)
-            return;
+        if (closeGuard.compareAndSet(false, true)) {
+            if (cacheEnabled) {
+                FileSystem cached = get(getUri(), getConf());
 
-        close0();
+                if (cached == this)
+                    return;
+                else {
+                    X.println("### Cache enabled, but this file system is not found in the cache: " +
+                        " this = " + this + ", user =" + this.user +
+                        " cached = " + cached + ", user =" + ((IgniteHadoopFileSystem)cached).user);
+                }
+            }
+
+            close0();
+        }
     }
 
     /**
@@ -365,27 +376,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @throws IOException If failed.
      */
     private void close0() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (LOG.isDebugEnabled())
-                LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+        if (LOG.isDebugEnabled())
+            LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
 
-            if (rmtClient == null)
-                return;
+        if (rmtClient == null)
+            return;
 
-            super.close();
+        super.close();
 
-            rmtClient.close(false);
+        rmtClient.close(false);
 
-            if (clientLog.isLogEnabled())
-                clientLog.close();
+        if (clientLog.isLogEnabled())
+            clientLog.close();
 
-            if (secondaryFs != null)
-                U.closeQuiet(secondaryFs);
+        if (secondaryFs != null)
+            U.closeQuiet(secondaryFs);
 
-            // Reset initialized resources.
-            uri = null;
-            rmtClient = null;
-        }
+        // Reset initialized resources.
+        uri = null;
+        rmtClient = null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index f41eb17..7aa2bde 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -61,6 +61,22 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /** Initial REST port. */
     private int restPort = REST_PORT;
 
+    /** Secondary file system REST endpoint configuration map. */
+    protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+    static {
+//        PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+//
+//        PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+//        PRIMARY_REST_CFG.setPort(10500);
+
+        SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+        SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+        SECONDARY_REST_CFG.setPort(11500);
+    }
+
+
     /** Initial classpath. */
     private static String initCp;
 
@@ -132,7 +148,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /**
      * @return IGFS configuration.
      */
-    public FileSystemConfiguration igfsConfiguration() {
+    public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = new FileSystemConfiguration();
 
         cfg.setName(igfsName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 7d09433..b0a098d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,32 +24,102 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
 public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+    /** IGFS block size. */
+    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+    /** Amount of blocks to prefetch. */
+    protected static final int PREFETCH_BLOCKS = 1;
+
+    /** Amount of sequential block reads before prefetch is triggered. */
+    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+    /** Secondary file system URI. */
+    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+    /** Secondary file system configuration path. */
+    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+    protected static final String USER = "vasya";
+
+    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+    /** */
+    protected Ignite igniteSecondary;
+
+    /** */
+    protected IgfsSecondaryFileSystem secondaryFs;
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 3;
     }
 
     /**
+     *
+     * @param p
+     * @return
+     */
+    private static String getOwner(IgfsEx i, IgfsPath p) {
+        return i.info(p).property(IgfsEx.PROP_USER_NAME);
+    }
+
+    /**
+     *
+     * @param secFs
+     * @param p
+     * @return
+     */
+    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+            @Override public String apply() {
+                return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+            }
+        });
+    }
+
+    /**
+     *
+     * @param p
+     */
+    private void checkOwner(IgfsPath p) {
+        String ownerPrim = getOwner(igfs, p);
+        assertEquals(USER, ownerPrim);
+
+        String ownerSec = getOwnerSecondary(secondaryFs, p);
+        assertEquals(USER, ownerSec);
+    }
+
+    /**
      * Tests whole job execution with all phases in all combination of new and old versions of API.
      * @throws Exception If fails.
      */
@@ -71,8 +141,11 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             JobConf jobConf = new JobConf();
 
+            //jobConf.setBoolean("fs.igfs.impl.disable.cache", true); // avoid hangup in shutdown hook
+            // when fs.close() causes the fs to be created again. // This does not work, why?????
+
             jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
-            jobConf.setUser("yyy");
+            jobConf.setUser(USER);
             jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
 
             //To split into about 40 items for v2
@@ -108,6 +181,10 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
 
+            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+            checkOwner(new IgfsPath(outFile));
+
             assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
                     useNewReducer,
                 "blue\t200000\n" +
@@ -185,7 +262,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             }
         }
 
-        final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
 
         assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -216,95 +293,89 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
         }
     }
 
-//    /**
-//     * Startup secondary file system.
-//     *
-//     * @throws Exception If failed.
-//     */
-//    private void startUpSecondary() throws Exception {
-//        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-//
-//        igfsCfg.setDataCacheName("partitioned");
-//        igfsCfg.setMetaCacheName("replicated");
-//        igfsCfg.setName("igfs-secondary");
-//        igfsCfg.setBlockSize(512 * 1024);
-//        igfsCfg.setDefaultMode(PRIMARY);
-//
-//        IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-//
-//        endpointCfg.setType(IgfsIpcEndpointType.TCP);
-//        endpointCfg.setPort(11500);
-//
-//        igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-//
-//        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-//
-//        cacheCfg.setName("partitioned");
-//        cacheCfg.setCacheMode(PARTITIONED);
-//        cacheCfg.setNearConfiguration(null);
-//        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-//        cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
-//        cacheCfg.setBackups(0);
-//        cacheCfg.setAtomicityMode(TRANSACTIONAL);
-//
-//        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-//
-//        metaCacheCfg.setName("replicated1");
-//        metaCacheCfg.setCacheMode(REPLICATED);
-//        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-//        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-//
-//        IgniteConfiguration cfg = new IgniteConfiguration();
-//
-//        cfg.setGridName("igfs-grid-secondary");
-//
-//        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-//
-//        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-//
-//        cfg.setDiscoverySpi(discoSpi);
-//        cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
-//        cfg.setFileSystemConfiguration(igfsCfg);
-//
-//        cfg.setLocalHost("127.0.0.1");
-//
-//        G.start(cfg);
-//    }
-
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-//        startUpSecondary();
+        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
 
         super.beforeTest();
     }
 
     /**
-     * @return IGFS configuration.
+     * Start grid with IGFS.
+     *
+     * @param gridName Grid name.
+     * @param igfsName IGFS name
+     * @param mode IGFS mode.
+     * @param secondaryFs Secondary file system (optional).
+     * @param restCfg Rest configuration string (optional).
+     * @return Started grid instance.
+     * @throws Exception If failed.
      */
-    @Override public FileSystemConfiguration igfsConfiguration() {
+    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(igfsName);
+        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+        igfsCfg.setDefaultMode(mode);
+        igfsCfg.setIpcEndpointConfiguration(restCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setNearConfiguration(null);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(gridName);
 
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return G.start(cfg);
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration fsCfg = super.igfsConfiguration();
-//
-//        fsCfg.setName("igfs-secondary");
-//        fsCfg.setDefaultMode(PRIMARY);
-//
-//        IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-//
-//        endpointCfg.setType(IgfsIpcEndpointType.TCP);
-//        endpointCfg.setPort(11500);
-//
-//        fsCfg.setIpcEndpointConfiguration(endpointCfg);
-//
-//        try {
-//
-//            fsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
-//                "igfs://igfs-secondary:igfs-grid-secondary@127.0.0.1:11500/",
-//                "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
-//        }
-//        catch (Exception e) {
-//            throw new IgniteException(e);
-//        }
+
+        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+        fsCfg.setSecondaryFileSystem(secondaryFs);
 
         return fsCfg;
     }
+
+//    /** {@inheritDoc} */
+//    @Override protected long getTestTimeout() {
+//        return 30 * 60 * 1000; // TODO: for testing
+//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
 
     /** {@inheritDoc} */
-    @Override public FileSystemConfiguration igfsConfiguration() {
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = super.igfsConfiguration();
 
         cfg.setFragmentizerEnabled(false);