You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/26 11:16:18 UTC
[02/50] [abbrv] ignite git commit: IGNITE-4341: Hadoop: added
Terasort to unit tests. This closes #1302. This closes #1321.
IGNITE-4341: Hadoop: added Terasort to unit tests. This closes #1302. This closes #1321.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b44baf1e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b44baf1e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b44baf1e
Branch: refs/heads/ignite-2.0
Commit: b44baf1e8c42c57fa4e241d5943593fa4ae42f12
Parents: ffe53eb
Author: iveselovskiy <iv...@gridgain.com>
Authored: Mon Dec 12 16:52:47 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Dec 15 13:46:29 2016 +0300
----------------------------------------------------------------------
modules/hadoop/pom.xml | 7 +
.../hadoop/impl/fs/HadoopFileSystemsUtils.java | 11 +
.../impl/v2/HadoopV2JobResourceManager.java | 25 +-
.../hadoop/impl/HadoopAbstractSelfTest.java | 13 +-
.../impl/HadoopAbstractWordCountTest.java | 6 +-
.../hadoop/impl/HadoopFileSystemsTest.java | 9 +
.../hadoop/impl/HadoopJobTrackerSelfTest.java | 4 +-
.../impl/HadoopTaskExecutionSelfTest.java | 4 +-
.../hadoop/impl/HadoopTeraSortTest.java | 376 +++++++++++++++++++
.../client/HadoopClientProtocolSelfTest.java | 4 +-
.../collections/HadoopSkipListSelfTest.java | 14 +-
.../HadoopExternalTaskExecutionSelfTest.java | 2 +
.../testsuites/IgniteHadoopTestSuite.java | 3 +
13 files changed, 450 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/modules/hadoop/pom.xml b/modules/hadoop/pom.xml
index d0b0481..db302d7 100644
--- a/modules/hadoop/pom.xml
+++ b/modules/hadoop/pom.xml
@@ -95,6 +95,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-examples</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.gridgain</groupId>
<artifactId>ignite-shmem</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
index 5115cb4..37902f0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.hadoop.impl.fs;
+import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.jetbrains.annotations.Nullable;
@@ -48,4 +50,13 @@ public class HadoopFileSystemsUtils {
public static String disableFsCachePropertyName(@Nullable String scheme) {
return String.format("fs.%s.impl.disable.cache", scheme);
}
+
+ /**
+ * Clears Hadoop {@link FileSystem} cache.
+ *
+ * @throws IOException On error.
+ */
+ public static void clearFileSystemCache() throws IOException {
+ FileSystem.closeAll();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
index 3984f83..52e394b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
@@ -123,7 +123,9 @@ class HadoopV2JobResourceManager {
JobConf cfg = ctx.getJobConf();
- String mrDir = cfg.get("mapreduce.job.dir");
+ Collection<URL> clsPathUrls = new ArrayList<>();
+
+ String mrDir = cfg.get(MRJobConfig.MAPREDUCE_JOB_DIR);
if (mrDir != null) {
stagingDir = new Path(new URI(mrDir));
@@ -144,28 +146,23 @@ class HadoopV2JobResourceManager {
File jarJobFile = new File(jobLocDir, "job.jar");
- Collection<URL> clsPathUrls = new ArrayList<>();
-
clsPathUrls.add(jarJobFile.toURI().toURL());
rsrcSet.add(jarJobFile);
rsrcSet.add(new File(jobLocDir, "job.xml"));
-
- processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
- processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
- processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null);
- processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null);
-
- if (!clsPathUrls.isEmpty()) {
- clsPath = new URL[clsPathUrls.size()];
-
- clsPathUrls.toArray(clsPath);
- }
}
else if (!jobLocDir.mkdirs())
throw new IgniteCheckedException("Failed to create local job directory: "
+ jobLocDir.getAbsolutePath());
+ processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
+ processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
+ processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null);
+ processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null);
+
+ if (!clsPathUrls.isEmpty())
+ clsPath = clsPathUrls.toArray(new URL[clsPathUrls.size()]);
+
setLocalFSWorkingDirectory(jobLocDir);
}
catch (URISyntaxException | IOException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
index 12351c6..5666cbc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
@@ -83,7 +83,9 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
private static String initCp;
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
+ @Override protected final void beforeTestsStarted() throws Exception {
+ HadoopFileSystemsUtils.clearFileSystemCache();
+
// Add surefire classpath to regular classpath.
initCp = System.getProperty("java.class.path");
@@ -93,6 +95,15 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp);
super.beforeTestsStarted();
+
+ beforeTestsStarted0();
+ }
+
+ /**
+ * Performs additional initialization in the beginning of test class execution.
+ */
+ protected void beforeTestsStarted0() throws Exception {
+ // noop
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
index 3cb8f91..84e6aee 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
@@ -49,9 +49,7 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
protected IgfsEx igfs;
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
+ @Override protected void beforeTestsStarted0() throws Exception {
Configuration cfg = new Configuration();
setupFileSystems(cfg);
@@ -62,6 +60,8 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
index 252d6cb..7680690 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLocalFileSystemV1;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -37,11 +38,15 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
startGrids(gridCount());
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ super.afterTest();
+
stopAllGrids(true);
}
@@ -70,6 +75,10 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString());
+ FileSystem fs = FileSystem.get(uri, cfg);
+
+ assertTrue(fs instanceof HadoopLocalFileSystemV1);
+
final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT);
final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
index a3bf49c..91ad5ec 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -65,9 +65,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
+ @Override protected void beforeTestsStarted0() throws Exception {
startGrids(gridCount());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index 027f921..9d45b03 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -83,9 +83,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
+ @Override protected void beforeTestsStarted0() throws Exception {
startGrids(gridCount());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
new file mode 100644
index 0000000..0cc9564
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraInputFormat;
+import org.apache.hadoop.examples.terasort.TeraOutputFormat;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Implements TeraSort Hadoop sample as a unit test.
+ */
+public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
+ /** Copy of Hadoop constant of package-private visibility. */
+ public static final String PARTITION_FILENAME = getPartitionFileNameConstant();
+
+ /** Out destination dir. */
+ protected final String generateOutDir = getFsBase() + "/tera-generated";
+
+ /** Sort destination dir. */
+ protected final String sortOutDir = getFsBase() + "/tera-sorted";
+
+ /** Validation destination dir. */
+ protected final String validateOutDir = getFsBase() + "/tera-validated";
+
+ /**
+ * Extracts value of Hadoop package-private constant.
+ *
+ * @return TeraInputFormat.PARTITION_FILENAME.
+ */
+ private static String getPartitionFileNameConstant() {
+ try {
+ Field f = TeraInputFormat.class.getDeclaredField("PARTITION_FILENAME");
+
+ f.setAccessible(true);
+
+ return (String)f.get(null);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Gets base directory.
+ * Note that this directory will be completely deleted in the and of the test.
+ * @return The base directory.
+ */
+ protected String getFsBase() {
+ return "file:///tmp/" + getUser() + "/hadoop-terasort-test";
+ }
+
+ /**
+ * @return Full input data size, in bytes.
+ */
+ protected long dataSizeBytes() {
+ return 100_000_000;
+ }
+
+ /**
+ * Desired number of maps in TeraSort job.
+ * @return The number of maps.
+ */
+ protected int numMaps() {
+ return gridCount() * 10;
+ }
+
+ /**
+ * Desired number of reduces in TeraSort job.
+ * @return The number of reduces.
+ */
+ protected int numReduces() {
+ return gridCount() * 8;
+ }
+
+ /**
+ * The user to run Hadoop job on behalf of.
+ * @return The user to run Hadoop job on behalf of.
+ */
+ protected String getUser() {
+ return System.getProperty("user.name");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+
+ // Delete files used:
+ getFileSystem().delete(new Path(getFsBase()), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected final boolean igfsEnabled() {
+ return false;
+ }
+
+ /**
+ * Does actual test TeraSort job Through Ignite API
+ */
+ protected final void teraSort() throws Exception {
+ System.out.println("TeraSort ===============================================================");
+
+ getFileSystem().delete(new Path(sortOutDir), true);
+
+ final JobConf jobConf = new JobConf();
+
+ jobConf.setUser(getUser());
+
+ jobConf.set("fs.defaultFS", getFsBase());
+
+ log().info("Desired number of reduces: " + numReduces());
+
+ jobConf.set("mapreduce.job.reduces", String.valueOf(numReduces()));
+
+ log().info("Desired number of maps: " + numMaps());
+
+ final long splitSize = dataSizeBytes() / numMaps();
+
+ log().info("Desired split size: " + splitSize);
+
+ // Force the split to be of the desired size:
+ jobConf.set("mapred.min.split.size", String.valueOf(splitSize));
+ jobConf.set("mapred.max.split.size", String.valueOf(splitSize));
+
+ Job job = setupConfig(jobConf);
+
+ HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+ fut.get();
+ }
+
+ /**
+ * Gets the file system we work upon.
+ * @return The file system.
+ * @throws Exception
+ */
+ FileSystem getFileSystem() throws Exception{
+ return FileSystem.get(new URI(getFsBase()), new Configuration());
+ }
+
+ /**
+ * Represents the data generation stage.
+ * @throws Exception
+ */
+ private void teraGenerate() throws Exception {
+ System.out.println("TeraGenerate ===============================================================");
+
+ getFileSystem().delete(new Path(generateOutDir), true);
+
+ final long numLines = dataSizeBytes() / 100; // TeraGen makes 100 bytes ber line
+
+ if (numLines < 1)
+ throw new IllegalStateException("Data size is too small: " + dataSizeBytes());
+
+ // Generate input data:
+ int res = ToolRunner.run(new Configuration(), new TeraGen(), new String[] {"-Dmapreduce.framework.name=local",
+ String.valueOf(numLines), generateOutDir});
+
+ assertEquals(0, res);
+
+ FileStatus[] fileStatuses = getFileSystem().listStatus(new Path(generateOutDir));
+
+ long sumLen = 0;
+
+ for (FileStatus fs: fileStatuses)
+ sumLen += fs.getLen();
+
+ assertEquals(dataSizeBytes(), sumLen); // Ensure correct size data is generated.
+ }
+
+ /**
+ * Creates Job instance and sets up necessary properties for it.
+ * @param conf The Job config.
+ * @return The job.
+ * @throws Exception On error.
+ */
+ private Job setupConfig(JobConf conf) throws Exception {
+ Job job = Job.getInstance(conf);
+
+ Path inputDir = new Path(generateOutDir);
+ Path outputDir = new Path(sortOutDir);
+
+ boolean useSimplePartitioner = TeraSort.getUseSimplePartitioner(job);
+
+ TeraInputFormat.setInputPaths(job, inputDir);
+ FileOutputFormat.setOutputPath(job, outputDir);
+
+ job.setJobName("TeraSort");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(TeraInputFormat.class);
+ job.setOutputFormatClass(TeraOutputFormat.class);
+
+ if (useSimplePartitioner)
+ job.setPartitionerClass(TeraSort.SimplePartitioner.class);
+ else {
+ long start = System.currentTimeMillis();
+
+ Path partFile = new Path(outputDir, PARTITION_FILENAME);
+
+ URI partUri = new URI(partFile.toString() + "#" + PARTITION_FILENAME);
+
+ try {
+ TeraInputFormat.writePartitionFile(job, partFile);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ job.addCacheFile(partUri);
+
+ long end = System.currentTimeMillis();
+
+ System.out.println("Spent " + (end - start) + "ms computing partitions. " +
+ "Partition file added to distributed cache: " + partUri);
+
+ job.setPartitionerClass(getTeraSortTotalOrderPartitioner()/*TeraSort.TotalOrderPartitioner.class*/);
+ }
+
+ job.getConfiguration().setInt("dfs.replication", TeraSort.getOutputReplication(job));
+
+ /* TeraOutputFormat.setFinalSync(job, true); */
+ Method m = TeraOutputFormat.class.getDeclaredMethod("setFinalSync", JobContext.class, boolean.class);
+ m.setAccessible(true);
+ m.invoke(null, job, true);
+
+ return job;
+ }
+
+ /**
+ * Extracts package-private TeraSort total order partitioner class.
+ *
+ * @return The class.
+ */
+ @SuppressWarnings("unchecked")
+ private Class<? extends Partitioner> getTeraSortTotalOrderPartitioner() {
+ Class[] classes = TeraSort.class.getDeclaredClasses();
+
+ Class<? extends Partitioner> totalOrderPartitionerCls = null;
+
+ for (Class<?> x: classes) {
+ if ("TotalOrderPartitioner".equals(x.getSimpleName())) {
+ totalOrderPartitionerCls = (Class<? extends Partitioner>)x;
+
+ break;
+ }
+ }
+
+ if (totalOrderPartitionerCls == null)
+ throw new IllegalStateException("Failed to find TeraSort total order partitioner class.");
+
+ return totalOrderPartitionerCls;
+ }
+
+ /**
+ * Implements validation phase of the sample.
+ * @throws Exception
+ */
+ private void teraValidate() throws Exception {
+ System.out.println("TeraValidate ===============================================================");
+
+ getFileSystem().delete(new Path(validateOutDir), true);
+
+ // Generate input data:
+ int res = ToolRunner.run(new Configuration(), new TeraValidate(),
+ new String[] {"-Dmapreduce.framework.name=local", sortOutDir, validateOutDir});
+
+ assertEquals(0, res);
+
+ FileStatus[] fileStatuses = getFileSystem().listStatus(new Path(validateOutDir), new PathFilter() {
+ @Override public boolean accept(Path path) {
+ // Typically name is "part-r-00000":
+ return path.getName().startsWith("part-r-");
+ }
+ });
+
+ // TeraValidate has only 1 reduce, so should be only 1 result file:
+ assertEquals(1, fileStatuses.length);
+
+ // The result file must contain only 1 line with the checksum, like this:
+ // "checksum 7a27e2d0d55de",
+ // typically it has length of 23 bytes.
+ // If sorting was not correct, the result contains list of K-V pairs that are not ordered correctly.
+ // In such case the size of the output will be much larger.
+ long len = fileStatuses[0].getLen();
+
+ assertTrue("TeraValidate length: " + len, len >= 16 && len <= 32);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ getFileSystem().delete(new Path(getFsBase()), true);
+
+ startGrids(gridCount());
+ }
+
+ /**
+ * Runs generate/sort/validate phases of the terasort sample.
+ * @throws Exception
+ */
+ public void testTeraSort() throws Exception {
+ teraGenerate();
+
+ teraSort();
+
+ teraValidate();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration igc = super.getConfiguration(gridName);
+
+ HadoopConfiguration hc = createHadoopConfiguration();
+
+ igc.setHadoopConfiguration(hc);
+
+ return igc;
+ }
+
+ /**
+ * Creates Hadoop configuration for the test.
+ * @return The {@link HadoopConfiguration}.
+ */
+ protected HadoopConfiguration createHadoopConfiguration() {
+ HadoopConfiguration hadoopCfg = new HadoopConfiguration();
+
+ // See org.apache.ignite.configuration.HadoopConfiguration.DFLT_MAX_TASK_QUEUE_SIZE
+ hadoopCfg.setMaxTaskQueueSize(30_000);
+
+ return hadoopCfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
index 7156a3d..44fc46e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
@@ -99,9 +99,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
+ @Override protected void beforeTestsStarted0() throws Exception {
startGrids(gridCount());
setupLockFile.delete();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
index 111ea78..1138803 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
@@ -85,6 +85,9 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
}
}
+ /**
+ * @throws Exception On error.
+ */
public void testMapSimple() throws Exception {
GridUnsafeMemory mem = new GridUnsafeMemory(0);
@@ -139,7 +142,16 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
assertEquals(0, mem.allocatedSize());
}
- private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx)
+ /**
+ * Check.
+ * @param m The multimap.
+ * @param mm The multimap storing expectations.
+ * @param vis The multimap to store visitor results.
+ * @param taskCtx The task context.
+ * @throws Exception On error.
+ */
+ private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis,
+ HadoopTaskContext taskCtx)
throws Exception {
final HadoopTaskInput in = m.input(taskCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
index 7c43500..5f64ce7 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -57,6 +57,8 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest
@Override protected void beforeTest() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-404");
+ super.beforeTest();
+
startGrids(gridCount());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b44baf1e/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 959bc59..6046cc1 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.hadoop.HadoopTestClassLoader;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopTeraSortTest;
import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolEmbeddedSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolMultipleServersSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest;
@@ -123,6 +124,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopTeraSortTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));