You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/09/26 12:13:41 UTC
[27/50] ignite git commit: IGNITE-3912: Hadoop: Implemented new class
loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
deleted file mode 100644
index 88d0f80..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Collection;
-import java.util.UUID;
-
-/**
- * Mock job for planner tests.
- */
-public class HadoopPlannerMockJob implements HadoopJob {
- /** Input splits. */
- private final Collection<HadoopInputSplit> splits;
-
- /** Reducers count. */
- private final int reducers;
-
- /**
- * Constructor.
- *
- * @param splits Input splits.
- * @param reducers Reducers.
- */
- public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int reducers) {
- this.splits = splits;
- this.reducers = reducers;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
- return splits;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobInfo info() {
- return new JobInfo(reducers);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobId id() {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void dispose(boolean external) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupStagingDirectory() {
- throwUnsupported();
- }
-
- /**
- * Throw {@link UnsupportedOperationException}.
- */
- private static void throwUnsupported() {
- throw new UnsupportedOperationException("Should not be called!");
- }
-
- /**
- * Mocked job info.
- */
- private static class JobInfo implements HadoopJobInfo {
- /** Reducers. */
- private final int reducers;
-
- /**
- * Constructor.
- *
- * @param reducers Reducers.
- */
- public JobInfo(int reducers) {
- this.reducers = reducers;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- return reducers;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String property(String name) {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasCombiner() {
- throwUnsupported();
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasReducer() {
- throwUnsupported();
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
- @Nullable String[] libNames) throws IgniteCheckedException {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String jobName() {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- throwUnsupported();
-
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
deleted file mode 100644
index 3f825b0..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import com.google.common.collect.MinMaxPriorityQueue;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static com.google.common.collect.Maps.immutableEntry;
-import static com.google.common.collect.MinMaxPriorityQueue.orderedBy;
-import static java.util.Collections.reverseOrder;
-
-/**
- * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than
- * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are
- * output.
- *
- * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system
- * path.
- */
-public class HadoopPopularWordsTest {
- /** Ignite home. */
- private static final String IGNITE_HOME = U.getIgniteHome();
-
- /** The path to the input directory. ALl files in that directory will be processed. */
- private static final Path BOOKS_LOCAL_DIR =
- new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books");
-
- /** The path to the output directory. THe result file will be written to this location. */
- private static final Path RESULT_LOCAL_DIR =
- new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output");
-
- /** Popular books source dir in DFS. */
- private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in");
-
- /** Popular books source dir in DFS. */
- private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out");
-
- /** Path to the distributed file system configuration. */
- private static final String DFS_CFG = "examples/config/filesystem/core-site.xml";
-
- /** Top N words to select **/
- private static final int POPULAR_WORDS_CNT = 10;
-
- /**
- * For each token in the input string the mapper emits a {word, 1} pair.
- */
- private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
- /** Constant value. */
- private static final IntWritable ONE = new IntWritable(1);
-
- /** The word converted into the Text. */
- private Text word = new Text();
-
- /**
- * Emits a entry where the key is the word and the value is always 1.
- *
- * @param key the current position in the input file (not used here)
- * @param val the text string
- * @param ctx mapper context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override protected void map(LongWritable key, Text val, Context ctx)
- throws IOException, InterruptedException {
- // Get the mapped object.
- final String line = val.toString();
-
- // Splits the given string to words.
- final String[] words = line.split("[^a-zA-Z0-9]");
-
- for (final String w : words) {
- // Only emit counts for longer words.
- if (w.length() <= 3)
- continue;
-
- word.set(w);
-
- // Write the word into the context with the initial count equals 1.
- ctx.write(word, ONE);
- }
- }
- }
-
- /**
- * The reducer uses a priority queue to rank the words based on its number of occurrences.
- */
- private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- private MinMaxPriorityQueue<Entry<Integer, String>> q;
-
- TopNWordsReducer() {
- q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() {
- @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) {
- return o1.getKey().compareTo(o2.getKey());
- }
- })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create();
- }
-
- /**
- * This method doesn't emit anything, but just keeps track of the top N words.
- *
- * @param key The word.
- * @param vals The words counts.
- * @param ctx Reducer context.
- * @throws IOException If failed.
- * @throws InterruptedException If failed.
- */
- @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException,
- InterruptedException {
- int sum = 0;
-
- for (IntWritable val : vals)
- sum += val.get();
-
- q.add(immutableEntry(sum, key.toString()));
- }
-
- /**
- * This method is called after all the word entries have been processed. It writes the accumulated
- * statistics to the job output file.
- *
- * @param ctx The job context.
- * @throws IOException If failed.
- * @throws InterruptedException If failed.
- */
- @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
- IntWritable i = new IntWritable();
-
- Text txt = new Text();
-
- // iterate in desc order
- while (!q.isEmpty()) {
- Entry<Integer, String> e = q.removeFirst();
-
- i.set(e.getKey());
-
- txt.set(e.getValue());
-
- ctx.write(txt, i);
- }
- }
- }
-
- /**
- * Configures the Hadoop MapReduce job.
- *
- * @return Instance of the Hadoop MapRed job.
- * @throws IOException If failed.
- */
- @SuppressWarnings("deprecation")
- private Job createConfigBasedHadoopJob() throws IOException {
- Job jobCfg = new Job();
-
- Configuration cfg = jobCfg.getConfiguration();
-
- // Use explicit configuration of distributed file system, if provided.
- cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
-
- jobCfg.setJobName("HadoopPopularWordExample");
- jobCfg.setJarByClass(HadoopPopularWordsTest.class);
- jobCfg.setInputFormatClass(TextInputFormat.class);
- jobCfg.setOutputKeyClass(Text.class);
- jobCfg.setOutputValueClass(IntWritable.class);
- jobCfg.setMapperClass(TokenizingMapper.class);
- jobCfg.setReducerClass(TopNWordsReducer.class);
-
- FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
- FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
-
- // Local job tracker allows the only task per wave, but text input format
- // replaces it with the calculated value based on input split size option.
- if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
- // Split job into tasks using 32MB split size.
- FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024);
- FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
- }
-
- return jobCfg;
- }
-
- /**
- * Runs the Hadoop job.
- *
- * @return {@code True} if succeeded, {@code false} otherwise.
- * @throws Exception If failed.
- */
- private boolean runWordCountConfigBasedHadoopJob() throws Exception {
- Job job = createConfigBasedHadoopJob();
-
- // Distributed file system this job will work with.
- FileSystem fs = FileSystem.get(job.getConfiguration());
-
- X.println(">>> Using distributed file system: " + fs.getHomeDirectory());
-
- // Prepare input and output job directories.
- prepareDirectories(fs);
-
- long time = System.currentTimeMillis();
-
- // Run job.
- boolean res = job.waitForCompletion(true);
-
- X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec.");
-
- // Move job results into local file system, so you can view calculated results.
- publishResults(fs);
-
- return res;
- }
-
- /**
- * Prepare job's data: cleanup result directories that might have left over
- * after previous runs, copy input files from the local file system into DFS.
- *
- * @param fs Distributed file system to use in job.
- * @throws IOException If failed.
- */
- private void prepareDirectories(FileSystem fs) throws IOException {
- X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR);
-
- fs.delete(RESULT_DFS_DIR, true);
-
- X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
-
- fs.delete(BOOKS_DFS_DIR, true);
-
- X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR);
-
- fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR);
- }
-
- /**
- * Publish job execution results into local file system, so you can view them.
- *
- * @param fs Distributed file sytem used in job.
- * @throws IOException If failed.
- */
- private void publishResults(FileSystem fs) throws IOException {
- X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
-
- fs.delete(BOOKS_DFS_DIR, true);
-
- X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR);
-
- fs.delete(RESULT_LOCAL_DIR, true);
-
- X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR);
-
- fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR);
- }
-
- /**
- * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of
- * occurrences of the word in the source files, the N most popular words are selected.
- *
- * @param args None.
- */
- public static void main(String[] args) {
- try {
- new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob();
- }
- catch (Exception e) {
- X.println(">>> Failed to run word count example: " + e.getMessage());
- }
-
- System.exit(0);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
deleted file mode 100644
index 789a6b3..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.util.Arrays;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.JavaSerialization;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSerializationWrapper;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Test of wrapper of the native serialization.
- */
-public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
- /**
- * Tests read/write of IntWritable via native WritableSerialization.
- * @throws Exception If fails.
- */
- public void testIntWritableSerialization() throws Exception {
- HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
-
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
- DataOutput out = new DataOutputStream(buf);
-
- ser.write(out, new IntWritable(3));
- ser.write(out, new IntWritable(-5));
-
- assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray()));
-
- DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
- assertEquals(3, ((IntWritable)ser.read(in, null)).get());
- assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
- }
-
- /**
- * Tests read/write of Integer via native JavaleSerialization.
- * @throws Exception If fails.
- */
- public void testIntJavaSerialization() throws Exception {
- HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
-
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
- DataOutput out = new DataOutputStream(buf);
-
- ser.write(out, 3);
- ser.write(out, -5);
- ser.close();
-
- DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
- assertEquals(3, ((Integer)ser.read(in, null)).intValue());
- assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
index 7552028..fd72821 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
@@ -43,6 +43,7 @@ public class HadoopSharedMap {
* @param key Key.
* @param val Value.
*/
+ @SuppressWarnings("unchecked")
public <T> T put(String key, T val) {
Object old = map.putIfAbsent(key, val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
deleted file mode 100644
index 27a5fcd..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-/**
- * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
- */
-public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
- /** {@inheritDoc} */
- @Override protected boolean compressOutputSnappy() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean[][] getApiModes() {
- return new boolean[][] {
- { false, false, true },
- { true, true, true },
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
deleted file mode 100644
index b4e3dc2..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Tests isolated Hadoop Snappy codec usage.
- */
-public class HadoopSnappyTest extends GridCommonAbstractTest {
- /** Length of data. */
- private static final int BYTE_SIZE = 1024 * 50;
-
- /**
- * Checks Snappy codec usage.
- *
- * @throws Exception On error.
- */
- public void testSnappy() throws Throwable {
- // Run Snappy test in default class loader:
- checkSnappy();
-
- // Run the same in several more class loaders simulating jobs and tasks:
- for (int i = 0; i < 2; i++) {
- ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null);
-
- Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
-
- assertEquals(hadoopClsLdr, cls.getClassLoader());
-
- U.invoke(cls, null, "checkSnappy");
- }
- }
-
- /**
- * Internal check routine.
- *
- * @throws Throwable If failed.
- */
- public static void checkSnappy() throws Throwable {
- try {
- byte[] expBytes = new byte[BYTE_SIZE];
- byte[] actualBytes = new byte[BYTE_SIZE];
-
- for (int i = 0; i < expBytes.length ; i++)
- expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
-
- SnappyCodec codec = new SnappyCodec();
-
- codec.setConf(new Configuration());
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
- cos.write(expBytes);
- cos.flush();
- }
-
- try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
- int read = cis.read(actualBytes, 0, actualBytes.length);
-
- assert read == actualBytes.length;
- }
-
- assert Arrays.equals(expBytes, actualBytes);
- }
- catch (Throwable e) {
- System.out.println("Snappy check failed:");
- System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded());
- System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded());
-
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
deleted file mode 100644
index dff5e70..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-
-/**
- * External test for sorting.
- */
-public class HadoopSortingExternalTest extends HadoopSortingTest {
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(true);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setMarshaller(new JdkMarshaller());
-
- return cfg;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
deleted file mode 100644
index 20f5eef..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Scanner;
-import java.util.UUID;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.JavaSerialization;
-import org.apache.hadoop.io.serializer.JavaSerializationComparator;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.util.typedef.X;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Tests correct sorting.
- */
-public class HadoopSortingTest extends HadoopAbstractSelfTest {
- /** */
- private static final String PATH_INPUT = "/test-in";
-
- /** */
- private static final String PATH_OUTPUT = "/test-out";
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
- }
-
- /**
- * @return {@code True} if IGFS is enabled on Hadoop nodes.
- */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(false);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSortSimple() throws Exception {
- // Generate test data.
- Job job = Job.getInstance();
-
- job.setInputFormatClass(InFormat.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
-
- job.setMapperClass(Mapper.class);
- job.setNumReduceTasks(0);
-
- setupFileSystems(job.getConfiguration());
-
- FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
-
- X.printerrln("Data generation started.");
-
- grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration())).get(180000);
-
- X.printerrln("Data generation complete.");
-
- // Run main map-reduce job.
- job = Job.getInstance();
-
- setupFileSystems(job.getConfiguration());
-
- job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
- "," + WritableSerialization.class.getName());
-
- FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
-
- job.setSortComparatorClass(JavaSerializationComparator.class);
-
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
-
- job.setNumReduceTasks(2);
-
- job.setMapOutputKeyClass(UUID.class);
- job.setMapOutputValueClass(NullWritable.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
-
- X.printerrln("Job started.");
-
- grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(180000);
-
- X.printerrln("Job complete.");
-
- // Check result.
- Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
-
- AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
-
- for (FileStatus file : fs.listStatus(outDir)) {
- X.printerrln("__ file: " + file);
-
- if (file.getLen() == 0)
- continue;
-
- FSDataInputStream in = fs.open(file.getPath());
-
- Scanner sc = new Scanner(in);
-
- UUID prev = null;
-
- while(sc.hasNextLine()) {
- UUID next = UUID.fromString(sc.nextLine());
-
-// X.printerrln("___ check: " + next);
-
- if (prev != null)
- assertTrue(prev.compareTo(next) < 0);
-
- prev = next;
- }
- }
- }
-
- public static class InFormat extends InputFormat<Text, NullWritable> {
- /** {@inheritDoc} */
- @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
- List<InputSplit> res = new ArrayList<>();
-
- FakeSplit split = new FakeSplit(20);
-
- for (int i = 0; i < 10; i++)
- res.add(split);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split,
- TaskAttemptContext ctx) throws IOException, InterruptedException {
- return new RecordReader<Text, NullWritable>() {
- /** */
- int cnt;
-
- /** */
- Text txt = new Text();
-
- @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
- // No-op.
- }
-
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- return ++cnt <= split.getLength();
- }
-
- @Override public Text getCurrentKey() {
- txt.set(UUID.randomUUID().toString());
-
-// X.printerrln("___ read: " + txt);
-
- return txt;
- }
-
- @Override public NullWritable getCurrentValue() {
- return NullWritable.get();
- }
-
- @Override public float getProgress() throws IOException, InterruptedException {
- return (float)cnt / split.getLength();
- }
-
- @Override public void close() {
- // No-op.
- }
- };
- }
- }
-
- public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> {
- /** {@inheritDoc} */
- @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException {
-// X.printerrln("___ map: " + val);
-
- ctx.write(UUID.fromString(val.toString()), NullWritable.get());
- }
- }
-
- public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> {
- /** */
- private Text text = new Text();
-
- /** {@inheritDoc} */
- @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx)
- throws IOException, InterruptedException {
-// X.printerrln("___ rdc: " + key);
-
- text.set(key.toString());
-
- ctx.write(text, NullWritable.get());
- }
- }
-
- public static class FakeSplit extends InputSplit implements Writable {
- /** */
- private static final String[] HOSTS = {"127.0.0.1"};
-
- /** */
- private int len;
-
- /**
- * @param len Length.
- */
- public FakeSplit(int len) {
- this.len = len;
- }
-
- /**
- *
- */
- public FakeSplit() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public long getLength() throws IOException, InterruptedException {
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public String[] getLocations() throws IOException, InterruptedException {
- return HOSTS;
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- out.writeInt(len);
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- len = in.readInt();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
deleted file mode 100644
index 11c3907..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
-import org.apache.ignite.testframework.GridTestUtils;
-
-/**
- * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}.
- */
-public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest {
- /**
- * Tests serialization of wrapper and the wrapped native split.
- * @throws Exception If fails.
- */
- public void testSerialization() throws Exception {
- FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"});
-
- assertEquals("/path/to/file:100+500", nativeSplit.toString());
-
- HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations());
-
- assertEquals("[host1, host2]", Arrays.toString(split.hosts()));
-
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
- ObjectOutput out = new ObjectOutputStream(buf);
-
- out.writeObject(split);
-
- ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
- final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject();
-
- assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString());
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- res.hosts();
-
- return null;
- }
- }, AssertionError.class, null);
- }
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
deleted file mode 100644
index 820a1f3..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.typedef.G;
-
-/**
- * Hadoop node startup.
- */
-public class HadoopStartup {
- /**
- * @param args Arguments.
- */
- public static void main(String[] args) {
- G.start("config/hadoop/default-config.xml");
- }
-
- /**
- * @return Configuration for job run.
- */
- @SuppressWarnings("UnnecessaryFullyQualifiedName")
- public static Configuration configuration() {
- Configuration cfg = new Configuration();
-
- cfg.set("fs.defaultFS", "igfs://igfs@localhost");
-
- cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
- cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName());
-
- cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
-
- cfg.set("mapreduce.framework.name", "ignite");
- cfg.set("mapreduce.jobtracker.address", "localhost:11211");
-
- return cfg;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
deleted file mode 100644
index 431433e..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Tests map-reduce task execution basics.
- */
-public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
- /** */
- private static HadoopSharedMap m = HadoopSharedMap.map(HadoopTaskExecutionSelfTest.class);
-
- /** Line count. */
- private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger());
-
- /** Executed tasks. */
- private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger());
-
- /** Cancelled tasks. */
- private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger());
-
- /** Working directory of each task. */
- private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs",
- new ConcurrentHashMap<String, String>());
-
- /** Mapper id to fail. */
- private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger());
-
- /** Number of splits of the current input. */
- private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger());
-
- /** Test param. */
- private static final String MAP_WRITE = "test.map.write";
-
-
- /** {@inheritDoc} */
- @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
- FileSystemConfiguration cfg = super.igfsConfiguration();
-
- cfg.setFragmentizerEnabled(false);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
-
- super.afterTestsStopped();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- grid(0).fileSystem(igfsName).format();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- cfg.setMaxParallelTasks(5);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(false);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMapRun() throws Exception {
- int lineCnt = 10000;
- String fileName = "/testFile";
-
- prepareFile(fileName, lineCnt);
-
- totalLineCnt.set(0);
- taskWorkDirs.clear();
-
- Configuration cfg = new Configuration();
-
- cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
-
- Job job = Job.getInstance(cfg);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(TestMapper.class);
-
- job.setNumReduceTasks(0);
-
- job.setInputFormatClass(TextInputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
- FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
-
- job.setJarByClass(getClass());
-
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
-
- fut.get();
-
- assertEquals(lineCnt, totalLineCnt.get());
-
- assertEquals(32, taskWorkDirs.size());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMapCombineRun() throws Exception {
- int lineCnt = 10001;
- String fileName = "/testFile";
-
- prepareFile(fileName, lineCnt);
-
- totalLineCnt.set(0);
- taskWorkDirs.clear();
-
- Configuration cfg = new Configuration();
-
- cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
- cfg.setBoolean(MAP_WRITE, true);
-
- Job job = Job.getInstance(cfg);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(TestMapper.class);
- job.setCombinerClass(TestCombiner.class);
- job.setReducerClass(TestReducer.class);
-
- job.setNumReduceTasks(2);
-
- job.setInputFormatClass(TextInputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
- FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
-
- job.setJarByClass(getClass());
-
- HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
-
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
-
- fut.get();
-
- assertEquals(lineCnt, totalLineCnt.get());
-
- assertEquals(34, taskWorkDirs.size());
-
- for (int g = 0; g < gridCount(); g++)
- grid(g).hadoop().finishFuture(jobId).get();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMapperException() throws Exception {
- prepareFile("/testFile", 1000);
-
- Configuration cfg = new Configuration();
-
- cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
-
- Job job = Job.getInstance(cfg);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(FailMapper.class);
-
- job.setNumReduceTasks(0);
-
- job.setInputFormatClass(TextInputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
- FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
-
- job.setJarByClass(getClass());
-
- final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
- createJobInfo(job.getConfiguration()));
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- fut.get();
-
- return null;
- }
- }, IgniteCheckedException.class, null);
- }
-
- /**
- * @param fileName File name.
- * @param lineCnt Line count.
- * @throws Exception If failed.
- */
- private void prepareFile(String fileName, int lineCnt) throws Exception {
- IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
-
- try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) {
- PrintWriter w = new PrintWriter(new OutputStreamWriter(os));
-
- for (int i = 0; i < lineCnt; i++)
- w.print("Hello, Hadoop map-reduce!\n");
-
- w.flush();
- }
- }
-
- /**
- * Prepare job with mappers to cancel.
- * @return Fully configured job.
- * @throws Exception If fails.
- */
- private Configuration prepareJobForCancelling() throws Exception {
- prepareFile("/testFile", 1500);
-
- executedTasks.set(0);
- cancelledTasks.set(0);
- failMapperId.set(0);
- splitsCount.set(0);
-
- Configuration cfg = new Configuration();
-
- setupFileSystems(cfg);
-
- Job job = Job.getInstance(cfg);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(CancellingTestMapper.class);
-
- job.setNumReduceTasks(0);
-
- job.setInputFormatClass(InFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
- FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
-
- job.setJarByClass(getClass());
-
- return job.getConfiguration();
- }
-
- /**
- * Test input format.
- */
- private static class InFormat extends TextInputFormat {
- @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException {
- List<InputSplit> res = super.getSplits(ctx);
-
- splitsCount.set(res.size());
-
- X.println("___ split of input: " + splitsCount.get());
-
- return res;
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTaskCancelling() throws Exception {
- Configuration cfg = prepareJobForCancelling();
-
- HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
-
- final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
-
- if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return splitsCount.get() > 0;
- }
- }, 20000)) {
- U.dumpThreads(log);
-
- assertTrue(false);
- }
-
- if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return executedTasks.get() == splitsCount.get();
- }
- }, 20000)) {
- U.dumpThreads(log);
-
- assertTrue(false);
- }
-
- // Fail mapper with id "1", cancels others
- failMapperId.set(1);
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- fut.get();
-
- return null;
- }
- }, IgniteCheckedException.class, null);
-
- assertEquals(executedTasks.get(), cancelledTasks.get() + 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testJobKill() throws Exception {
- Configuration cfg = prepareJobForCancelling();
-
- Hadoop hadoop = grid(0).hadoop();
-
- HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
-
- //Kill unknown job.
- boolean killRes = hadoop.kill(jobId);
-
- assertFalse(killRes);
-
- final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
-
- if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return splitsCount.get() > 0;
- }
- }, 20000)) {
- U.dumpThreads(log);
-
- assertTrue(false);
- }
-
- if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- X.println("___ executed tasks: " + executedTasks.get());
-
- return executedTasks.get() == splitsCount.get();
- }
- }, 20000)) {
- U.dumpThreads(log);
-
- fail();
- }
-
- //Kill really ran job.
- killRes = hadoop.kill(jobId);
-
- assertTrue(killRes);
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- fut.get();
-
- return null;
- }
- }, IgniteCheckedException.class, null);
-
- assertEquals(executedTasks.get(), cancelledTasks.get());
-
- //Kill the same job again.
- killRes = hadoop.kill(jobId);
-
- assertFalse(killRes);
- }
-
- private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> {
- private int mapperId;
-
- /** {@inheritDoc} */
- @Override protected void setup(Context ctx) throws IOException, InterruptedException {
- mapperId = executedTasks.incrementAndGet();
- }
-
- /** {@inheritDoc} */
- @Override public void run(Context ctx) throws IOException, InterruptedException {
- try {
- super.run(ctx);
- }
- catch (HadoopTaskCancelledException e) {
- cancelledTasks.incrementAndGet();
-
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- if (mapperId == failMapperId.get())
- throw new IOException();
-
- Thread.sleep(1000);
- }
- }
-
- /**
- * Test failing mapper.
- */
- private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> {
- /** {@inheritDoc} */
- @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- throw new IOException("Expected");
- }
- }
-
- /**
- * Mapper calculates number of lines.
- */
- private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
- /** Writable integer constant of '1'. */
- private static final IntWritable ONE = new IntWritable(1);
-
- /** Line count constant. */
- public static final Text LINE_COUNT = new Text("lineCount");
-
- /** {@inheritDoc} */
- @Override protected void setup(Context ctx) throws IOException, InterruptedException {
- X.println("___ Mapper: " + ctx.getTaskAttemptID());
-
- String taskId = ctx.getTaskAttemptID().toString();
-
- LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration());
-
- String workDir = locFs.getWorkingDirectory().toString();
-
- assertNull(taskWorkDirs.put(workDir, taskId));
- }
-
- /** {@inheritDoc} */
- @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- if (ctx.getConfiguration().getBoolean(MAP_WRITE, false))
- ctx.write(LINE_COUNT, ONE);
- else
- totalLineCnt.incrementAndGet();
- }
- }
-
- /**
- * Combiner calculates number of lines.
- */
- private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
- /** */
- IntWritable sum = new IntWritable();
-
- /** {@inheritDoc} */
- @Override protected void setup(Context ctx) throws IOException, InterruptedException {
- X.println("___ Combiner: ");
- }
-
- /** {@inheritDoc} */
- @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
- InterruptedException {
- int lineCnt = 0;
-
- for (IntWritable value : values)
- lineCnt += value.get();
-
- sum.set(lineCnt);
-
- X.println("___ combo: " + lineCnt);
-
- ctx.write(key, sum);
- }
- }
-
- /**
- * Combiner calculates number of lines.
- */
- private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- /** */
- IntWritable sum = new IntWritable();
-
- /** {@inheritDoc} */
- @Override protected void setup(Context ctx) throws IOException, InterruptedException {
- X.println("___ Reducer: " + ctx.getTaskAttemptID());
-
- String taskId = ctx.getTaskAttemptID().toString();
- String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString();
-
- assertNull(taskWorkDirs.put(workDir, taskId));
- }
-
- /** {@inheritDoc} */
- @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
- InterruptedException {
- int lineCnt = 0;
-
- for (IntWritable value : values) {
- lineCnt += value.get();
-
- X.println("___ rdcr: " + value.get());
- }
-
- sum.set(lineCnt);
-
- ctx.write(key, sum);
-
- X.println("___ RDCR SUM: " + lineCnt);
-
- totalLineCnt.addAndGet(lineCnt);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
deleted file mode 100644
index 7c6d244..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import com.google.common.base.Joiner;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-
-/**
- * Tests of Map, Combine and Reduce task executions of any version of hadoop API.
- */
-abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
- /** Empty hosts array. */
- private static final String[] HOSTS = new String[0];
-
- /**
- * Creates some grid hadoop job. Override this method to create tests for any job implementation.
- *
- * @param inFile Input file name for the job.
- * @param outFile Output file name for the job.
- * @return Hadoop job.
- * @throws IOException If fails.
- */
- public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
-
- /**
- * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
- */
- public abstract String getOutputFileNamePrefix();
-
- /**
- * Tests map task execution.
- *
- * @throws Exception If fails.
- */
- @SuppressWarnings("ConstantConditions")
- public void testMapTask() throws Exception {
- IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
- igfs.mkdirs(inDir);
-
- IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
-
- URI inFileUri = URI.create(igfsScheme() + inFile.toString());
-
- try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) {
- pw.println("hello0 world0");
- pw.println("world1 hello1");
- }
-
- HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1);
-
- try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) {
- pw.println("hello2 world2");
- pw.println("world3 hello3");
- }
- HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
- igfs.info(inFile).length() - fileBlock1.length());
-
- HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
-
- HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
-
- HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
-
- ctx.mockOutput().clear();
-
- ctx.run();
-
- assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput()));
-
- ctx.mockOutput().clear();
-
- ctx.taskInfo(new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2));
-
- ctx.run();
-
- assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput()));
- }
-
- /**
- * Generates input data for reduce-like operation into mock context input and runs the operation.
- *
- * @param gridJob Job is to create reduce task from.
- * @param taskType Type of task - combine or reduce.
- * @param taskNum Number of task in job.
- * @param words Pairs of words and its counts.
- * @return Context with mock output.
- * @throws IgniteCheckedException If fails.
- */
- private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
- int taskNum, String... words) throws IgniteCheckedException {
- HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
-
- HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
-
- for (int i = 0; i < words.length; i+=2) {
- List<IntWritable> valList = new ArrayList<>();
-
- for (int j = 0; j < Integer.parseInt(words[i + 1]); j++)
- valList.add(new IntWritable(1));
-
- ctx.mockInput().put(new Text(words[i]), valList);
- }
-
- ctx.run();
-
- return ctx;
- }
-
- /**
- * Tests reduce task execution.
- *
- * @throws Exception If fails.
- */
- public void testReduceTask() throws Exception {
- HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
-
- runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
- runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
-
- assertEquals(
- "word1\t5\n" +
- "word2\t10\n",
- readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" +
- getOutputFileNamePrefix() + "00000")
- );
-
- assertEquals(
- "word3\t7\n" +
- "word4\t15\n",
- readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" +
- getOutputFileNamePrefix() + "00001")
- );
- }
-
- /**
- * Tests combine task execution.
- *
- * @throws Exception If fails.
- */
- public void testCombinerTask() throws Exception {
- HadoopJob gridJob = getHadoopJob("/", "/");
-
- HadoopTestTaskContext ctx =
- runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
-
- assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput()));
-
- ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15");
-
- assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput()));
- }
-
- /**
- * Runs chain of map-combine task on file block.
- *
- * @param fileBlock block of input file to be processed.
- * @param gridJob Hadoop job implementation.
- * @return Context of combine task with mock output.
- * @throws IgniteCheckedException If fails.
- */
- private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
- throws IgniteCheckedException {
- HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
-
- HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob);
-
- mapCtx.run();
-
- //Prepare input for combine
- taskInfo = new HadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null);
-
- HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob);
-
- combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
-
- combineCtx.run();
-
- return combineCtx;
- }
-
- /**
- * Tests all job in complex.
- * Runs 2 chains of map-combine tasks and sends result into one reduce task.
- *
- * @throws Exception If fails.
- */
- @SuppressWarnings("ConstantConditions")
- public void testAllTasks() throws Exception {
- IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
- igfs.mkdirs(inDir);
-
- IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
-
- URI inFileUri = URI.create(igfsScheme() + inFile.toString());
-
- generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70);
-
- //Split file into two blocks
- long fileLen = igfs.info(inFile).length();
-
- Long l = fileLen / 2;
-
- HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
- HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
-
- HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
-
- HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
-
- HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob);
-
- //Prepare input for combine
- HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null);
-
- HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob);
-
- reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
- reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
-
- reduceCtx.run();
-
- reduceCtx.taskInfo(new HadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null));
-
- reduceCtx.run();
-
- assertEquals(
- "blue\t200\n" +
- "green\t150\n" +
- "red\t100\n" +
- "yellow\t70\n",
- readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000")
- );
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
deleted file mode 100644
index 27d7fc2..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.util.UUID;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1.
- */
-public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
- /**
- * Creates WordCount hadoop job for API v1.
- *
- * @param inFile Input file name for the job.
- * @param outFile Output file name for the job.
- * @return Hadoop job.
- * @throws IOException If fails.
- */
- @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
- JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
-
- setupFileSystems(jobConf);
-
- HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
-
- UUID uuid = new UUID(0, 0);
-
- HadoopJobId jobId = new HadoopJobId(uuid, 0);
-
- return jobInfo.createJob(HadoopV2Job.class, jobId, log, null);
- }
-
- /** {@inheritDoc} */
- @Override public String getOutputFileNamePrefix() {
- return "part-";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
deleted file mode 100644
index 30cf50c..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2.
- */
-public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
- /**
- * Creates WordCount hadoop job for API v2.
- *
- * @param inFile Input file name for the job.
- * @param outFile Output file name for the job.
- * @return Hadoop job.
- * @throws Exception if fails.
- */
- @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
- Job job = Job.getInstance();
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- HadoopWordCount2.setTasksClasses(job, true, true, true, false);
-
- Configuration conf = job.getConfiguration();
-
- setupFileSystems(conf);
-
- FileInputFormat.setInputPaths(job, new Path(inFile));
- FileOutputFormat.setOutputPath(job, new Path(outFile));
-
- job.setJarByClass(HadoopWordCount2.class);
-
- Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
-
- HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
-
- UUID uuid = new UUID(0, 0);
-
- HadoopJobId jobId = new HadoopJobId(uuid, 0);
-
- return jobInfo.createJob(HadoopV2Job.class, jobId, log, null);
- }
-
- /** {@inheritDoc} */
- @Override public String getOutputFileNamePrefix() {
- return "part-r-";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java
new file mode 100644
index 0000000..c5302f8
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Hadoop test class loader aimed to provide better isolation.
+ */
+public class HadoopTestClassLoader extends URLClassLoader {
+ /** Parent class loader. */
+ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopTestClassLoader.class.getClassLoader();
+
+ /** */
+ private static final Collection<URL> APP_JARS = F.asList(APP_CLS_LDR.getURLs());
+
+ /** All participating URLs. */
+ private static final URL[] URLS;
+
+ static {
+ try {
+ List<URL> res = new ArrayList<>();
+
+ for (URL url : APP_JARS) {
+ String urlStr = url.toString();
+
+ if (urlStr.contains("modules/hadoop/"))
+ res.add(url);
+ }
+
+ res.addAll(HadoopClasspathUtils.classpathForClassLoader());
+
+ X.println(">>> " + HadoopTestClassLoader.class.getSimpleName() + " static paths:");
+
+ for (URL url : res)
+ X.println(">>> \t" + url.toString());
+
+ URLS = res.toArray(new URL[res.size()]);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to initialize class loader JARs.", e);
+ }
+ }
+
+ /**
+ * Constructor.
+ */
+ public HadoopTestClassLoader() {
+ super(URLS, APP_CLS_LDR);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ if (HadoopClassLoader.loadByCurrentClassloader(name)) {
+ try {
+ synchronized (getClassLoadingLock(name)) {
+ // First, check if the class has already been loaded
+ Class c = findLoadedClass(name);
+
+ if (c == null) {
+ long t1 = System.nanoTime();
+
+ c = findClass(name);
+
+ // this is the defining class loader; record the stats
+ sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
+ sun.misc.PerfCounter.getFindClasses().increment();
+ }
+
+ if (resolve)
+ resolveClass(c);
+
+ return c;
+ }
+ }
+ catch (NoClassDefFoundError | ClassNotFoundException e) {
+ throw new IgniteException("Failed to load class by test class loader: " + name, e);
+ }
+ }
+
+ return super.loadClass(name, resolve);
+ }
+}