You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:18 UTC
[07/51] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java
deleted file mode 100644
index 8319255..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Abstract class for Hadoop tests.
- */
-public abstract class GridHadoopAbstractSelfTest extends GridCommonAbstractTest {
- /** */
- private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** REST port. */
- protected static final int REST_PORT = 11212;
-
- /** IGFS name. */
- protected static final String igfsName = null;
-
- /** IGFS name. */
- protected static final String igfsMetaCacheName = "meta";
-
- /** IGFS name. */
- protected static final String igfsDataCacheName = "data";
-
- /** IGFS block size. */
- protected static final int igfsBlockSize = 1024;
-
- /** IGFS block group size. */
- protected static final int igfsBlockGroupSize = 8;
-
- /** Initial REST port. */
- private int restPort = REST_PORT;
-
- /** Initial classpath. */
- private static String initCp;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- // Add surefire classpath to regular classpath.
- initCp = System.getProperty("java.class.path");
-
- String surefireCp = System.getProperty("surefire.test.class.path");
-
- if (surefireCp != null)
- System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp);
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- // Restore classpath.
- System.setProperty("java.class.path", initCp);
-
- initCp = null;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setHadoopConfiguration(hadoopConfiguration(gridName));
-
- TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
- commSpi.setSharedMemoryPort(-1);
-
- cfg.setCommunicationSpi(commSpi);
-
- TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- if (igfsEnabled()) {
- cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration());
-
- cfg.setIgfsConfiguration(igfsConfiguration());
- }
-
- if (restEnabled()) {
- ConnectorConfiguration clnCfg = new ConnectorConfiguration();
-
- clnCfg.setPort(restPort++);
-
- cfg.setConnectorConfiguration(clnCfg);
- }
-
- cfg.setLocalHost("127.0.0.1");
- cfg.setPeerClassLoadingEnabled(false);
-
- return cfg;
- }
-
- /**
- * @param gridName Grid name.
- * @return Hadoop configuration.
- */
- public GridHadoopConfiguration hadoopConfiguration(String gridName) {
- GridHadoopConfiguration cfg = new GridHadoopConfiguration();
-
- cfg.setMaxParallelTasks(3);
-
- return cfg;
- }
-
- /**
- * @return IGFS configuration.
- */
- public IgfsConfiguration igfsConfiguration() {
- IgfsConfiguration cfg = new IgfsConfiguration();
-
- cfg.setName(igfsName);
- cfg.setBlockSize(igfsBlockSize);
- cfg.setDataCacheName(igfsDataCacheName);
- cfg.setMetaCacheName(igfsMetaCacheName);
- cfg.setFragmentizerEnabled(false);
-
- return cfg;
- }
-
- /**
- * @return IGFS meta cache configuration.
- */
- public CacheConfiguration metaCacheConfiguration() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setName(igfsMetaCacheName);
- cfg.setCacheMode(REPLICATED);
- cfg.setAtomicityMode(TRANSACTIONAL);
- cfg.setWriteSynchronizationMode(FULL_SYNC);
-
- return cfg;
- }
-
- /**
- * @return IGFS data cache configuration.
- */
- private CacheConfiguration dataCacheConfiguration() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setName(igfsDataCacheName);
- cfg.setCacheMode(PARTITIONED);
- cfg.setAtomicityMode(TRANSACTIONAL);
- cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize));
- cfg.setWriteSynchronizationMode(FULL_SYNC);
-
- return cfg;
- }
-
- /**
- * @return {@code True} if IGFS is enabled on Hadoop nodes.
- */
- protected boolean igfsEnabled() {
- return false;
- }
-
- /**
- * @return {@code True} if REST is enabled on Hadoop nodes.
- */
- protected boolean restEnabled() {
- return false;
- }
-
- /**
- * @return Number of nodes to start.
- */
- protected int gridCount() {
- return 3;
- }
-
- /**
- * @param cfg Config.
- */
- protected void setupFileSystems(Configuration cfg) {
- cfg.set("fs.defaultFS", igfsScheme());
- cfg.set("fs.igfs.impl", org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem.class.getName());
- cfg.set("fs.AbstractFileSystem.igfs.impl", IgfsHadoopFileSystem.
- class.getName());
-
- GridHadoopFileSystemsUtils.setupFileSystems(cfg);
- }
-
- /**
- * @return IGFS scheme for test.
- */
- protected String igfsScheme() {
- return "igfs://:" + getTestGridName(0) + "@/";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java
deleted file mode 100644
index ebbc0a6..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import com.google.common.base.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Abstract class for tests based on WordCount test job.
- */
-public abstract class GridHadoopAbstractWordCountTest extends GridHadoopAbstractSelfTest {
- /** Input path. */
- protected static final String PATH_INPUT = "/input";
-
- /** Output path. */
- protected static final String PATH_OUTPUT = "/output";
-
- /** IGFS instance. */
- protected IgfsEx igfs;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- Configuration cfg = new Configuration();
-
- setupFileSystems(cfg);
-
- // Init cache by correct LocalFileSystem implementation
- FileSystem.getLocal(cfg);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 1;
- }
-
- /**
- * Generates test file.
- *
- * @param path File name.
- * @param wordCounts Words and counts.
- * @throws Exception If failed.
- */
- protected void generateTestFile(String path, Object... wordCounts) throws Exception {
- List<String> wordsArr = new ArrayList<>();
-
- //Generating
- for (int i = 0; i < wordCounts.length; i += 2) {
- String word = (String) wordCounts[i];
- int cnt = (Integer) wordCounts[i + 1];
-
- while (cnt-- > 0)
- wordsArr.add(word);
- }
-
- //Shuffling
- for (int i = 0; i < wordsArr.size(); i++) {
- int j = (int)(Math.random() * wordsArr.size());
-
- Collections.swap(wordsArr, i, j);
- }
-
- //Input file preparing
- PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true));
-
- int j = 0;
-
- while (j < wordsArr.size()) {
- int i = 5 + (int)(Math.random() * 5);
-
- List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
- j += i;
-
- testInputFileWriter.println(Joiner.on(' ').join(subList));
- }
-
- testInputFileWriter.close();
- }
-
- /**
- * Reads whole text file into String.
- *
- * @param fileName Name of the file to read.
- * @return Content of the file as String value.
- * @throws Exception If could not read the file.
- */
- protected String readAndSortFile(String fileName) throws Exception {
- BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName))));
-
- List<String> list = new ArrayList<>();
-
- String line;
-
- while ((line = reader.readLine()) != null)
- list.add(line);
-
- Collections.sort(list);
-
- return Joiner.on('\n').join(list) + "\n";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java
deleted file mode 100644
index 767be7c..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import junit.framework.*;
-import org.apache.hadoop.mapreduce.*;
-
-/**
- *
- */
-public class GridHadoopClassLoaderTest extends TestCase {
- /** */
- GridHadoopClassLoader ldr = new GridHadoopClassLoader(null);
-
- /**
- * @throws Exception If failed.
- */
- public void testClassLoading() throws Exception {
- assertNotSame(Test1.class, ldr.loadClass(Test1.class.getName()));
- assertNotSame(Test2.class, ldr.loadClass(Test2.class.getName()));
- assertSame(Test3.class, ldr.loadClass(Test3.class.getName()));
- }
-
-// public void testDependencySearch() {
-// assertTrue(ldr.hasExternalDependencies(Test1.class.getName(), new HashSet<String>()));
-// assertTrue(ldr.hasExternalDependencies(Test2.class.getName(), new HashSet<String>()));
-// }
-
- /**
- *
- */
- private static class Test1 {
- /** */
- Test2 t2;
-
- /** */
- Job[][] jobs = new Job[4][4];
- }
-
- /**
- *
- */
- private static abstract class Test2 {
- /** */
- abstract Test1 t1();
- }
-
- /**
- *
- */
- private static class Test3 {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java
deleted file mode 100644
index 80cd226..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import com.google.common.base.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.io.*;
-import java.nio.file.*;
-import java.util.*;
-
-/**
- * Test of integration with Hadoop client via command line interface.
- */
-public class GridHadoopCommandLineTest extends GridCommonAbstractTest {
- /** IGFS instance. */
- private IgfsEx igfs;
-
- /** */
- private static final String igfsName = "igfs";
-
- /** */
- private static File testWorkDir;
-
- /** */
- private static String hadoopHome;
-
- /** */
- private static String hiveHome;
-
- /** */
- private static File examplesJar;
-
- /**
- *
- * @param path File name.
- * @param wordCounts Words and counts.
- * @throws Exception If failed.
- */
- private void generateTestFile(File path, Object... wordCounts) throws Exception {
- List<String> wordsArr = new ArrayList<>();
-
- //Generating
- for (int i = 0; i < wordCounts.length; i += 2) {
- String word = (String) wordCounts[i];
- int cnt = (Integer) wordCounts[i + 1];
-
- while (cnt-- > 0)
- wordsArr.add(word);
- }
-
- //Shuffling
- for (int i = 0; i < wordsArr.size(); i++) {
- int j = (int)(Math.random() * wordsArr.size());
-
- Collections.swap(wordsArr, i, j);
- }
-
- //Writing file
- try (PrintWriter writer = new PrintWriter(path)) {
- int j = 0;
-
- while (j < wordsArr.size()) {
- int i = 5 + (int)(Math.random() * 5);
-
- List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
- j += i;
-
- writer.println(Joiner.on(' ').join(subList));
- }
-
- writer.flush();
- }
- }
-
- /**
- * Generates two data files to join its with Hive.
- *
- * @throws FileNotFoundException If failed.
- */
- private void generateHiveTestFiles() throws FileNotFoundException {
- try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a"));
- PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) {
- char sep = '\t';
-
- int idB = 0;
- int idA = 0;
- int v = 1000;
-
- for (int i = 0; i < 1000; i++) {
- writerA.print(idA++);
- writerA.print(sep);
- writerA.println(idB);
-
- writerB.print(idB++);
- writerB.print(sep);
- writerB.println(v += 2);
-
- writerB.print(idB++);
- writerB.print(sep);
- writerB.println(v += 2);
- }
-
- writerA.flush();
- writerB.flush();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- hiveHome = IgniteSystemProperties.getString("HIVE_HOME");
-
- assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome));
-
- hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME");
-
- assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome));
-
- String mapredHome = hadoopHome + "/share/hadoop/mapreduce";
-
- File[] fileList = new File(mapredHome).listFiles(new FileFilter() {
- @Override public boolean accept(File pathname) {
- return pathname.getName().startsWith("hadoop-mapreduce-examples-") &&
- pathname.getName().endsWith(".jar");
- }
- });
-
- assertEquals("Invalid hadoop distribution.", 1, fileList.length);
-
- examplesJar = fileList[0];
-
- testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile();
-
- U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false);
-
- File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml");
- File dstFile = new File(testWorkDir, "mapred-site.xml");
-
- try (BufferedReader in = new BufferedReader(new FileReader(srcFile));
- PrintWriter out = new PrintWriter(dstFile)) {
- String line;
-
- while ((line = in.readLine()) != null) {
- if (line.startsWith("</configuration>"))
- out.println(
- " <property>\n" +
- " <name>" + GridHadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" +
- " <value>" + GridHadoopFSCounterWriter.class.getName() + "</value>\n" +
- " </property>\n");
-
- out.println(line);
- }
-
- out.flush();
- }
-
- generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50);
-
- generateHiveTestFiles();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- U.delete(testWorkDir);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /**
- * Creates the process build with appropriate environment to run Hadoop CLI.
- *
- * @return Process builder.
- */
- private ProcessBuilder createProcessBuilder() {
- String sep = ":";
-
- String ggClsPath = GridHadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
- GridHadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
- ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-
- ProcessBuilder res = new ProcessBuilder();
-
- res.environment().put("HADOOP_HOME", hadoopHome);
- res.environment().put("HADOOP_CLASSPATH", ggClsPath);
- res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath());
-
- res.redirectErrorStream(true);
-
- return res;
- }
-
- /**
- * Waits for process exit and prints the its output.
- *
- * @param proc Process.
- * @return Exit code.
- * @throws Exception If failed.
- */
- private int watchProcess(Process proc) throws Exception {
- BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
- String line;
-
- while ((line = reader.readLine()) != null)
- log().info(line);
-
- return proc.waitFor();
- }
-
- /**
- * Executes Hadoop command line tool.
- *
- * @param args Arguments for Hadoop command line tool.
- * @return Process exit code.
- * @throws Exception If failed.
- */
- private int executeHadoopCmd(String... args) throws Exception {
- ProcessBuilder procBuilder = createProcessBuilder();
-
- List<String> cmd = new ArrayList<>();
-
- cmd.add(hadoopHome + "/bin/hadoop");
- cmd.addAll(Arrays.asList(args));
-
- procBuilder.command(cmd);
-
- log().info("Execute: " + procBuilder.command());
-
- return watchProcess(procBuilder.start());
- }
-
- /**
- * Executes Hive query.
- *
- * @param qry Query.
- * @return Process exit code.
- * @throws Exception If failed.
- */
- private int executeHiveQuery(String qry) throws Exception {
- ProcessBuilder procBuilder = createProcessBuilder();
-
- List<String> cmd = new ArrayList<>();
-
- procBuilder.command(cmd);
-
- cmd.add(hiveHome + "/bin/hive");
-
- cmd.add("--hiveconf");
- cmd.add("hive.rpc.query.plan=true");
-
- cmd.add("--hiveconf");
- cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" +
- "databaseName=metastore_db;create=true");
-
- cmd.add("-e");
- cmd.add(qry);
-
- procBuilder.command(cmd);
-
- log().info("Execute: " + procBuilder.command());
-
- return watchProcess(procBuilder.start());
- }
-
- /**
- * Tests Hadoop command line integration.
- */
- public void testHadoopCommandLine() throws Exception {
- assertEquals(0, executeHadoopCmd("fs", "-ls", "/"));
-
- assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input"));
-
- assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input"));
-
- assertTrue(igfs.exists(new IgfsPath("/input/test-data")));
-
- assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output"));
-
- IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/");
-
- assertTrue(igfs.exists(path));
-
- IgfsPath jobStatPath = null;
-
- for (IgfsPath jobPath : igfs.listPaths(path)) {
- assertNull(jobStatPath);
-
- jobStatPath = jobPath;
- }
-
- File locStatFile = new File(testWorkDir, "performance");
-
- assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString()));
-
- long evtCnt = GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile)));
-
- assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner.
-
- assertTrue(igfs.exists(new IgfsPath("/output")));
-
- BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000"))));
-
- List<String> res = new ArrayList<>();
-
- String line;
-
- while ((line = in.readLine()) != null)
- res.add(line);
-
- Collections.sort(res);
-
- assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString());
- }
-
- /**
- * Runs query check result.
- *
- * @param expRes Expected result.
- * @param qry Query.
- * @throws Exception If failed.
- */
- private void checkQuery(String expRes, String qry) throws Exception {
- assertEquals(0, executeHiveQuery("drop table if exists result"));
-
- assertEquals(0, executeHiveQuery(
- "create table result " +
- "row format delimited fields terminated by ' ' " +
- "stored as textfile " +
- "location '/result' as " + qry
- ));
-
- IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0"));
-
- byte[] buf = new byte[(int) in.length()];
-
- in.read(buf);
-
- assertEquals(expRes, new String(buf));
- }
-
- /**
- * Tests Hive integration.
- */
- public void testHiveCommandLine() throws Exception {
- assertEquals(0, executeHiveQuery(
- "create table table_a (" +
- "id_a int," +
- "id_b int" +
- ") " +
- "row format delimited fields terminated by '\\t'" +
- "stored as textfile " +
- "location '/table-a'"
- ));
-
- assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a"));
-
- assertEquals(0, executeHiveQuery(
- "create table table_b (" +
- "id_b int," +
- "rndv int" +
- ") " +
- "row format delimited fields terminated by '\\t'" +
- "stored as textfile " +
- "location '/table-b'"
- ));
-
- assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b"));
-
- checkQuery(
- "0 0\n" +
- "1 2\n" +
- "2 4\n" +
- "3 6\n" +
- "4 8\n" +
- "5 10\n" +
- "6 12\n" +
- "7 14\n" +
- "8 16\n" +
- "9 18\n",
- "select * from table_a order by id_a limit 10"
- );
-
- checkQuery("2000\n", "select count(id_b) from table_b");
-
- checkQuery(
- "250 500 2002\n" +
- "251 502 2006\n" +
- "252 504 2010\n" +
- "253 506 2014\n" +
- "254 508 2018\n" +
- "255 510 2022\n" +
- "256 512 2026\n" +
- "257 514 2030\n" +
- "258 516 2034\n" +
- "259 518 2038\n",
- "select a.id_a, a.id_b, b.rndv" +
- " from table_a a" +
- " inner join table_b b on a.id_b = b.id_b" +
- " where b.rndv > 2000" +
- " order by a.id_a limit 10"
- );
-
- checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
deleted file mode 100644
index b1b0275..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java
+++ /dev/null
@@ -1,1005 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.igfs.mapreduce.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.planner.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSelfTest {
- /** */
- private static final UUID ID_1 = new UUID(0, 1);
-
- /** */
- private static final UUID ID_2 = new UUID(0, 2);
-
- /** */
- private static final UUID ID_3 = new UUID(0, 3);
-
- /** */
- private static final String HOST_1 = "host1";
-
- /** */
- private static final String HOST_2 = "host2";
-
- /** */
- private static final String HOST_3 = "host3";
-
- /** */
- private static final String INVALID_HOST_1 = "invalid_host1";
-
- /** */
- private static final String INVALID_HOST_2 = "invalid_host2";
-
- /** */
- private static final String INVALID_HOST_3 = "invalid_host3";
-
- /** Mocked Grid. */
- private static final MockIgnite GRID = new MockIgnite();
-
- /** Mocked IGFS. */
- private static final IgniteFs IGFS = new MockIgfs();
-
- /** Planner. */
- private static final GridHadoopMapReducePlanner PLANNER = new GridHadoopDefaultMapReducePlanner();
-
- /** Block locations. */
- private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>();
-
- /** Proxy map. */
- private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>();
-
- /** Last created plan. */
- private static final ThreadLocal<GridHadoopMapReducePlan> PLAN = new ThreadLocal<>();
-
- /**
- *
- */
- static {
- GridTestUtils.setFieldValue(PLANNER, "ignite", GRID);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- GridTestUtils.setFieldValue(PLANNER, "log", log());
-
- BLOCK_MAP.clear();
- PROXY_MAP.clear();
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testIgfsOneBlockPerNode() throws IgniteCheckedException {
- GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
- GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
- GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
-
- mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1));
- mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2));
- mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3));
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 2);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException {
- GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
- GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
- GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 2);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
- GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2);
- GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2);
- GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3);
-
- mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2));
- mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2));
- mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3));
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
- GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2);
- GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2);
- GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3);
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException {
- GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3);
- GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3);
-
- mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3));
- mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3));
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(1, split2);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_1);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testNonIgfsOrphans() throws IgniteCheckedException {
- GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2);
- GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3);
- GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3);
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2);
-
- plan(1, split1, split2, split3);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) ||
- ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) ||
- ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1);
-
- plan(3, split1, split2, split3);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * Create plan.
- *
- * @param reducers Reducers count.
- * @param splits Splits.
- * @return Plan.
- * @throws IgniteCheckedException If failed.
- */
- private static GridHadoopMapReducePlan plan(int reducers, GridHadoopInputSplit... splits) throws IgniteCheckedException {
- assert reducers > 0;
- assert splits != null && splits.length > 0;
-
- Collection<GridHadoopInputSplit> splitList = new ArrayList<>(splits.length);
-
- Collections.addAll(splitList, splits);
-
- Collection<ClusterNode> top = new ArrayList<>();
-
- GridTestNode node1 = new GridTestNode(ID_1);
- GridTestNode node2 = new GridTestNode(ID_2);
- GridTestNode node3 = new GridTestNode(ID_3);
-
- node1.setHostName(HOST_1);
- node2.setHostName(HOST_2);
- node3.setHostName(HOST_3);
-
- top.add(node1);
- top.add(node2);
- top.add(node3);
-
- GridHadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null);
-
- PLAN.set(plan);
-
- return plan;
- }
-
- /**
- * Ensure that node contains the given mappers.
- *
- * @param nodeId Node ID.
- * @param expSplits Expected splits.
- * @return {@code True} if this assumption is valid.
- */
- private static boolean ensureMappers(UUID nodeId, GridHadoopInputSplit... expSplits) {
- Collection<GridHadoopInputSplit> expSplitsCol = new ArrayList<>();
-
- Collections.addAll(expSplitsCol, expSplits);
-
- Collection<GridHadoopInputSplit> splits = PLAN.get().mappers(nodeId);
-
- return F.eq(expSplitsCol, splits);
- }
-
- /**
- * Ensure that node contains the given amount of reducers.
- *
- * @param nodeId Node ID.
- * @param reducers Reducers.
- * @return {@code True} if this assumption is valid.
- */
- private static boolean ensureReducers(UUID nodeId, int reducers) {
- int[] reducersArr = PLAN.get().reducers(nodeId);
-
- return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers);
- }
-
- /**
- * Ensure that no mappers and reducers is located on this node.
- *
- * @param nodeId Node ID.
- * @return {@code True} if this assumption is valid.
- */
- private static boolean ensureEmpty(UUID nodeId) {
- return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId));
- }
-
- /**
- * Create split.
- *
- * @param igfs IGFS flag.
- * @param file File.
- * @param start Start.
- * @param len Length.
- * @param hosts Hosts.
- * @return Split.
- */
- private static GridHadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) {
- URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file);
-
- return new GridHadoopFileBlock(hosts, uri, start, len);
- }
-
- /**
- * Create block location.
- *
- * @param start Start.
- * @param len Length.
- * @param nodeIds Node IDs.
- * @return Block location.
- */
- private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) {
- assert nodeIds != null && nodeIds.length > 0;
-
- Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length);
-
- for (UUID id : nodeIds)
- nodes.add(new GridTestNode(id));
-
- return new IgfsBlockLocationImpl(start, len, nodes);
- }
-
- /**
- * Map IGFS block to nodes.
- *
- * @param file File.
- * @param start Start.
- * @param len Length.
- * @param locations Locations.
- */
- private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) {
- assert locations != null && locations.length > 0;
-
- IgfsPath path = new IgfsPath(file);
-
- Block block = new Block(path, start, len);
-
- Collection<IgfsBlockLocation> locationsList = new ArrayList<>();
-
- Collections.addAll(locationsList, locations);
-
- BLOCK_MAP.put(block, locationsList);
- }
-
- /**
- * Block.
- */
- private static class Block {
- /** */
- private final IgfsPath path;
-
- /** */
- private final long start;
-
- /** */
- private final long len;
-
- /**
- * Constructor.
- *
- * @param path Path.
- * @param start Start.
- * @param len Length.
- */
- private Block(IgfsPath path, long start, long len) {
- this.path = path;
- this.start = start;
- this.len = len;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("RedundantIfStatement")
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Block)) return false;
-
- Block block = (Block) o;
-
- if (len != block.len)
- return false;
-
- if (start != block.start)
- return false;
-
- if (!path.equals(block.path))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = path.hashCode();
-
- res = 31 * res + (int) (start ^ (start >>> 32));
- res = 31 * res + (int) (len ^ (len >>> 32));
-
- return res;
- }
- }
-
- /**
- * Mocked job.
- */
- private static class MockJob implements GridHadoopJob {
- /** Reducers count. */
- private final int reducers;
-
- /** */
- private Collection<GridHadoopInputSplit> splitList;
-
- /**
- * Constructor.
- *
- * @param reducers Reducers count.
- * @param splitList Splits.
- */
- private MockJob(int reducers, Collection<GridHadoopInputSplit> splitList) {
- this.reducers = reducers;
- this.splitList = splitList;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobId id() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobInfo info() {
- return new GridHadoopDefaultJobInfo() {
- @Override public int reducers() {
- return reducers;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException {
- return splitList;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void dispose(boolean external) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupStagingDirectory() {
- // No-op.
- }
- }
-
- /**
- * Mocked IGFS.
- */
- private static class MockIgfs implements IgfsEx {
- /** {@inheritDoc} */
- @Override public boolean isProxy(URI path) {
- return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
- return BLOCK_MAP.get(new Block(path, start, len));
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
- long maxLen) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void stop() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public IgfsContext context() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPaths proxyPaths() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsStatus globalSpace() throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Boolean globalSampling() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsLocalMetrics localMetrics() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long groupBlockSize() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String clientLogDirectory() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void clientLogDirectory(String logDir) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean evictExclude(IgfsPath path, boolean primary) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String name() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsConfiguration configuration() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean exists(IgfsPath path) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgfsFile info(IgfsPath path) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPathSummary summary(IgfsPath path) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void rename(IgfsPath src, IgfsPath dest) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(IgfsPath path, boolean recursive) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(IgfsPath path) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long usedSpaceSize() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, String> properties() {
- return Collections.emptyMap();
- }
-
- /** {@inheritDoc} */
- @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
- long blockSize, @Nullable Map<String, String> props) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite,
- @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsOutputStream append(IgfsPath path, boolean create) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create,
- @Nullable Map<String, String> props) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public IgfsMetrics metrics() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void resetMetrics() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public long size(IgfsPath path) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void format() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
- Collection<IgfsPath> paths, @Nullable T arg) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
- Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
- @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
- @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles,
- long maxRangeLen, @Nullable T arg) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid nextAffinityKey() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFs withAsync() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isAsync() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public <R> IgniteFuture<R> future() {
- return null;
- }
- }
-
- /**
- * Mocked Grid.
- */
- @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
- private static class MockIgnite extends IgniteSpringBean implements IgniteEx {
- /** {@inheritDoc} */
- @Override public IgniteClusterEx cluster() {
- return (IgniteClusterEx)super.cluster();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFs igfsx(String name) {
- assert F.eq("igfs", name);
-
- return IGFS;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoop hadoop() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public <K extends GridCacheUtilityKey, V> GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls,
- Class<V> valCls) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public <K, V> GridCache<K, V> cachex(@Nullable String name) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public <K, V> GridCache<K, V> cachex() {
- return null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public Collection<GridCache<?, ?>> cachesx(@Nullable IgnitePredicate<? super GridCache<?, ?>>... p) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean eventUserRecordable(int type) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean allEventsUserRecordable(int[] types) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<String> compatibleVersions() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isJmxRemoteEnabled() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isRestartEnabled() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode localNode() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String latestVersion() {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java
deleted file mode 100644
index 18e5c03..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.testframework.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Test file systems for the working directory multi-threading support.
- */
-public class GridHadoopFileSystemsTest extends GridHadoopAbstractSelfTest {
- private static final int THREAD_COUNT = 3;
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 1;
- }
-
-
- /**
- * Test the file system with specified URI for the multi-thread working directory support.
- *
- * @param uri Base URI of the file system (scheme and authority).
- * @throws Exception If fails.
- */
- private void testFileSystem(final URI uri) throws Exception {
- final Configuration cfg = new Configuration();
-
- setupFileSystems(cfg);
-
- cfg.set(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
- new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString());
-
- final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT);
- final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
- final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT);
- final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT);
-
- final Path[] newUserInitWorkDir = new Path[THREAD_COUNT];
- final Path[] newWorkDir = new Path[THREAD_COUNT];
- final Path[] newAbsWorkDir = new Path[THREAD_COUNT];
- final Path[] newInstanceWorkDir = new Path[THREAD_COUNT];
-
- final AtomicInteger threadNum = new AtomicInteger(0);
-
- GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- int curThreadNum = threadNum.getAndIncrement();
-
- FileSystem fs = FileSystem.get(uri, cfg);
-
- GridHadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
-
- if ("file".equals(uri.getScheme()))
- FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
-
- changeUserPhase.countDown();
- changeUserPhase.await();
-
- newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
-
- FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum));
-
- changeDirPhase.countDown();
- changeDirPhase.await();
-
- newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
-
- FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum));
-
- changeAbsDirPhase.countDown();
- changeAbsDirPhase.await();
-
- newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
-
- newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory();
-
- finishPhase.countDown();
- }
- catch (InterruptedException | IOException e) {
- error("Failed to execute test thread.", e);
-
- fail();
- }
- }
- }, THREAD_COUNT, "filesystems-test");
-
- finishPhase.await();
-
- for (int i = 0; i < THREAD_COUNT; i ++) {
- cfg.set(MRJobConfig.USER_NAME, "user" + i);
-
- Path workDir = new Path(new Path(uri), "user/user" + i);
-
- cfg.set(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString());
-
- assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory());
-
- assertEquals(workDir, newUserInitWorkDir[i]);
-
- assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]);
-
- assertEquals(new Path("/folder" + i), newAbsWorkDir[i]);
-
- assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]);
- }
-
- System.out.println(System.getProperty("user.dir"));
- }
-
- /**
- * Test IGFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testIgfs() throws Exception {
- testFileSystem(URI.create(igfsScheme()));
- }
-
- /**
- * Test HDFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testHdfs() throws Exception {
- testFileSystem(URI.create("hdfs://localhost/"));
- }
-
- /**
- * Test LocalFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testLocal() throws Exception {
- testFileSystem(URI.create("file:///"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java
deleted file mode 100644
index 49099fc..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Grouping test.
- */
-public class GridHadoopGroupingTest extends GridHadoopAbstractSelfTest {
- /** */
- private static final String PATH_OUTPUT = "/test-out";
-
- /** */
- private static final GridConcurrentHashSet<UUID> vals = GridHadoopSharedMap.map(GridHadoopGroupingTest.class)
- .put("vals", new GridConcurrentHashSet<UUID>());
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- protected boolean igfsEnabled() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
- GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- cfg.setExternalExecution(false);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGroupingReducer() throws Exception {
- doTestGrouping(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGroupingCombiner() throws Exception {
- doTestGrouping(true);
- }
-
- /**
- * @param combiner With combiner.
- * @throws Exception If failed.
- */
- public void doTestGrouping(boolean combiner) throws Exception {
- vals.clear();
-
- Job job = Job.getInstance();
-
- job.setInputFormatClass(InFormat.class);
- job.setOutputFormatClass(OutFormat.class);
-
- job.setOutputKeyClass(YearTemperature.class);
- job.setOutputValueClass(Text.class);
-
- job.setMapperClass(Mapper.class);
-
- if (combiner) {
- job.setCombinerClass(MyReducer.class);
- job.setNumReduceTasks(0);
- job.setCombinerKeyGroupingComparatorClass(YearComparator.class);
- }
- else {
- job.setReducerClass(MyReducer.class);
- job.setNumReduceTasks(4);
- job.setGroupingComparatorClass(YearComparator.class);
- }
-
- grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(30000);
-
- assertTrue(vals.isEmpty());
- }
-
- public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> {
- /** */
- int lastYear;
-
- @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context)
- throws IOException, InterruptedException {
- X.println("___ : " + context.getTaskAttemptID() + " --> " + key);
-
- Set<UUID> ids = new HashSet<>();
-
- for (Text val : vals0)
- assertTrue(ids.add(UUID.fromString(val.toString())));
-
- for (Text val : vals0)
- assertTrue(ids.remove(UUID.fromString(val.toString())));
-
- assertTrue(ids.isEmpty());
-
- assertTrue(key.year > lastYear);
-
- lastYear = key.year;
-
- for (Text val : vals0)
- assertTrue(vals.remove(UUID.fromString(val.toString())));
- }
- }
-
- public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator.
- /** {@inheritDoc} */
- @Override public int compare(YearTemperature o1, YearTemperature o2) {
- return Integer.compare(o1.year, o2.year);
- }
-
- /** {@inheritDoc} */
- @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- throw new IllegalStateException();
- }
- }
-
- public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable {
- /** */
- private int year;
-
- /** */
- private int temperature;
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- out.writeInt(year);
- out.writeInt(temperature);
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- year = in.readInt();
- temperature = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- throw new IllegalStateException();
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() { // To be partitioned by year.
- return year;
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(YearTemperature o) {
- int res = Integer.compare(year, o.year);
-
- if (res != 0)
- return res;
-
- // Sort comparator by year and temperature, to find max for year.
- return Integer.compare(o.temperature, temperature);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(YearTemperature.class, this);
- }
- }
-
- public static class InFormat extends InputFormat<YearTemperature, Text> {
- /** {@inheritDoc} */
- @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- ArrayList<InputSplit> list = new ArrayList<>();
-
- for (int i = 0; i < 10; i++)
- list.add(new GridHadoopSortingTest.FakeSplit(20));
-
- return list;
- }
-
- /** {@inheritDoc} */
- @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new RecordReader<YearTemperature, Text>() {
- /** */
- int cnt;
-
- /** */
- Random rnd = new GridRandom();
-
- /** */
- YearTemperature key = new YearTemperature();
-
- /** */
- Text val = new Text();
-
- @Override public void initialize(InputSplit split, TaskAttemptContext context) {
- // No-op.
- }
-
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- return cnt++ < split.getLength();
- }
-
- @Override public YearTemperature getCurrentKey() {
- key.year = 1990 + rnd.nextInt(10);
- key.temperature = 10 + rnd.nextInt(20);
-
- return key;
- }
-
- @Override public Text getCurrentValue() {
- UUID id = UUID.randomUUID();
-
- assertTrue(vals.add(id));
-
- val.set(id.toString());
-
- return val;
- }
-
- @Override public float getProgress() {
- return 0;
- }
-
- @Override public void close() {
- // No-op.
- }
- };
- }
- }
-
- /**
- *
- */
- public static class OutFormat extends OutputFormat {
- /** {@inheritDoc} */
- @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return null;
- }
- }
-}