You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:53:18 UTC
[26/92] [abbrv] [partial] ignite git commit: Moving classes around.
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
deleted file mode 100644
index 946ba77..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.testframework.GridTestUtils;
-
-/**
- * Test file systems for the working directory multi-threading support.
- */
-public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
- /** the number of threads */
- private static final int THREAD_COUNT = 3;
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 1;
- }
-
-
- /**
- * Test the file system with specified URI for the multi-thread working directory support.
- *
- * @param uri Base URI of the file system (scheme and authority).
- * @throws Exception If fails.
- */
- private void testFileSystem(final URI uri) throws Exception {
- final Configuration cfg = new Configuration();
-
- setupFileSystems(cfg);
-
- cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
- new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString());
-
- final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT);
- final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
- final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT);
- final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT);
-
- final Path[] newUserInitWorkDir = new Path[THREAD_COUNT];
- final Path[] newWorkDir = new Path[THREAD_COUNT];
- final Path[] newAbsWorkDir = new Path[THREAD_COUNT];
- final Path[] newInstanceWorkDir = new Path[THREAD_COUNT];
-
- final AtomicInteger threadNum = new AtomicInteger(0);
-
- GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- int curThreadNum = threadNum.getAndIncrement();
-
- if ("file".equals(uri.getScheme()))
- FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
-
- changeUserPhase.countDown();
- changeUserPhase.await();
-
- newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
-
- FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum));
-
- changeDirPhase.countDown();
- changeDirPhase.await();
-
- newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
-
- FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum));
-
- changeAbsDirPhase.countDown();
- changeAbsDirPhase.await();
-
- newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
-
- newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory();
-
- finishPhase.countDown();
- }
- catch (InterruptedException | IOException e) {
- error("Failed to execute test thread.", e);
-
- fail();
- }
- }
- }, THREAD_COUNT, "filesystems-test");
-
- finishPhase.await();
-
- for (int i = 0; i < THREAD_COUNT; i ++) {
- cfg.set(MRJobConfig.USER_NAME, "user" + i);
-
- Path workDir = new Path(new Path(uri), "user/user" + i);
-
- cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString());
-
- assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory());
-
- assertEquals(workDir, newUserInitWorkDir[i]);
-
- assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]);
-
- assertEquals(new Path("/folder" + i), newAbsWorkDir[i]);
-
- assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]);
- }
-
- System.out.println(System.getProperty("user.dir"));
- }
-
- /**
- * Test LocalFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testLocal() throws Exception {
- testFileSystem(URI.create("file:///"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
deleted file mode 100644
index db87e33..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Grouping test.
- */
-public class HadoopGroupingTest extends HadoopAbstractSelfTest {
- /** */
- private static final String PATH_OUTPUT = "/test-out";
-
- /** */
- private static final GridConcurrentHashSet<UUID> vals = HadoopSharedMap.map(HadoopGroupingTest.class)
- .put("vals", new GridConcurrentHashSet<UUID>());
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- protected boolean igfsEnabled() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(false);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGroupingReducer() throws Exception {
- doTestGrouping(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGroupingCombiner() throws Exception {
- doTestGrouping(true);
- }
-
- /**
- * @param combiner With combiner.
- * @throws Exception If failed.
- */
- public void doTestGrouping(boolean combiner) throws Exception {
- vals.clear();
-
- Job job = Job.getInstance();
-
- job.setInputFormatClass(InFormat.class);
- job.setOutputFormatClass(OutFormat.class);
-
- job.setOutputKeyClass(YearTemperature.class);
- job.setOutputValueClass(Text.class);
-
- job.setMapperClass(Mapper.class);
-
- if (combiner) {
- job.setCombinerClass(MyReducer.class);
- job.setNumReduceTasks(0);
- job.setCombinerKeyGroupingComparatorClass(YearComparator.class);
- }
- else {
- job.setReducerClass(MyReducer.class);
- job.setNumReduceTasks(4);
- job.setGroupingComparatorClass(YearComparator.class);
- }
-
- grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
- createJobInfo(job.getConfiguration())).get(30000);
-
- assertTrue(vals.isEmpty());
- }
-
- public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> {
- /** */
- int lastYear;
-
- @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context)
- throws IOException, InterruptedException {
- X.println("___ : " + context.getTaskAttemptID() + " --> " + key);
-
- Set<UUID> ids = new HashSet<>();
-
- for (Text val : vals0)
- assertTrue(ids.add(UUID.fromString(val.toString())));
-
- for (Text val : vals0)
- assertTrue(ids.remove(UUID.fromString(val.toString())));
-
- assertTrue(ids.isEmpty());
-
- assertTrue(key.year > lastYear);
-
- lastYear = key.year;
-
- for (Text val : vals0)
- assertTrue(vals.remove(UUID.fromString(val.toString())));
- }
- }
-
- public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator.
- /** {@inheritDoc} */
- @Override public int compare(YearTemperature o1, YearTemperature o2) {
- return Integer.compare(o1.year, o2.year);
- }
-
- /** {@inheritDoc} */
- @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- throw new IllegalStateException();
- }
- }
-
- public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable {
- /** */
- private int year;
-
- /** */
- private int temperature;
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- out.writeInt(year);
- out.writeInt(temperature);
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- year = in.readInt();
- temperature = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- throw new IllegalStateException();
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() { // To be partitioned by year.
- return year;
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(YearTemperature o) {
- int res = Integer.compare(year, o.year);
-
- if (res != 0)
- return res;
-
- // Sort comparator by year and temperature, to find max for year.
- return Integer.compare(o.temperature, temperature);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(YearTemperature.class, this);
- }
- }
-
- public static class InFormat extends InputFormat<YearTemperature, Text> {
- /** {@inheritDoc} */
- @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- ArrayList<InputSplit> list = new ArrayList<>();
-
- for (int i = 0; i < 10; i++)
- list.add(new HadoopSortingTest.FakeSplit(20));
-
- return list;
- }
-
- /** {@inheritDoc} */
- @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new RecordReader<YearTemperature, Text>() {
- /** */
- int cnt;
-
- /** */
- Random rnd = new GridRandom();
-
- /** */
- YearTemperature key = new YearTemperature();
-
- /** */
- Text val = new Text();
-
- @Override public void initialize(InputSplit split, TaskAttemptContext context) {
- // No-op.
- }
-
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- return cnt++ < split.getLength();
- }
-
- @Override public YearTemperature getCurrentKey() {
- key.year = 1990 + rnd.nextInt(10);
- key.temperature = 10 + rnd.nextInt(20);
-
- return key;
- }
-
- @Override public Text getCurrentValue() {
- UUID id = UUID.randomUUID();
-
- assertTrue(vals.add(id));
-
- val.set(id.toString());
-
- return val;
- }
-
- @Override public float getProgress() {
- return 0;
- }
-
- @Override public void close() {
- // No-op.
- }
- };
- }
- }
-
- /**
- *
- */
- public static class OutFormat extends OutputFormat {
- /** {@inheritDoc} */
- @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
deleted file mode 100644
index 9e268b7..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Job tracker self test.
- */
-public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
- /** */
- private static final String PATH_OUTPUT = "/test-out";
-
- /** Test block count parameter name. */
- private static final int BLOCK_CNT = 10;
-
- /** */
- private static HadoopSharedMap m = HadoopSharedMap.map(HadoopJobTrackerSelfTest.class);
-
- /** Map task execution count. */
- private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger());
-
- /** Reduce task execution count. */
- private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger());
-
- /** Reduce task execution count. */
- private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger());
-
- /** */
- private static final Map<String, CountDownLatch> latch = m.put("latch", new HashMap<String, CountDownLatch>());
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
-
- super.afterTestsStopped();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- latch.put("mapAwaitLatch", new CountDownLatch(1));
- latch.put("reduceAwaitLatch", new CountDownLatch(1));
- latch.put("combineAwaitLatch", new CountDownLatch(1));
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- mapExecCnt.set(0);
- combineExecCnt.set(0);
- reduceExecCnt.set(0);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner());
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(false);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSimpleTaskSubmit() throws Exception {
- try {
- UUID globalId = UUID.randomUUID();
-
- Job job = Job.getInstance();
- setupFileSystems(job.getConfiguration());
-
- job.setMapperClass(TestMapper.class);
- job.setReducerClass(TestReducer.class);
- job.setInputFormatClass(InFormat.class);
-
- FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1"));
-
- HadoopJobId jobId = new HadoopJobId(globalId, 1);
-
- grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
-
- checkStatus(jobId, false);
-
- info("Releasing map latch.");
-
- latch.get("mapAwaitLatch").countDown();
-
- checkStatus(jobId, false);
-
- info("Releasing reduce latch.");
-
- latch.get("reduceAwaitLatch").countDown();
-
- checkStatus(jobId, true);
-
- assertEquals(10, mapExecCnt.get());
- assertEquals(0, combineExecCnt.get());
- assertEquals(1, reduceExecCnt.get());
- }
- finally {
- // Safety.
- latch.get("mapAwaitLatch").countDown();
- latch.get("combineAwaitLatch").countDown();
- latch.get("reduceAwaitLatch").countDown();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTaskWithCombinerPerMap() throws Exception {
- try {
- UUID globalId = UUID.randomUUID();
-
- Job job = Job.getInstance();
- setupFileSystems(job.getConfiguration());
-
- job.setMapperClass(TestMapper.class);
- job.setReducerClass(TestReducer.class);
- job.setCombinerClass(TestCombiner.class);
- job.setInputFormatClass(InFormat.class);
-
- FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2"));
-
- HadoopJobId jobId = new HadoopJobId(globalId, 1);
-
- grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
-
- checkStatus(jobId, false);
-
- info("Releasing map latch.");
-
- latch.get("mapAwaitLatch").countDown();
-
- checkStatus(jobId, false);
-
- // All maps are completed. We have a combiner, so no reducers should be executed
- // before combiner latch is released.
-
- U.sleep(50);
-
- assertEquals(0, reduceExecCnt.get());
-
- info("Releasing combiner latch.");
-
- latch.get("combineAwaitLatch").countDown();
-
- checkStatus(jobId, false);
-
- info("Releasing reduce latch.");
-
- latch.get("reduceAwaitLatch").countDown();
-
- checkStatus(jobId, true);
-
- assertEquals(10, mapExecCnt.get());
- assertEquals(10, combineExecCnt.get());
- assertEquals(1, reduceExecCnt.get());
- }
- finally {
- // Safety.
- latch.get("mapAwaitLatch").countDown();
- latch.get("combineAwaitLatch").countDown();
- latch.get("reduceAwaitLatch").countDown();
- }
- }
-
- /**
- * Checks job execution status.
- *
- * @param jobId Job ID.
- * @param complete Completion status.
- * @throws Exception If failed.
- */
- private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception {
- for (int i = 0; i < gridCount(); i++) {
- IgniteKernal kernal = (IgniteKernal)grid(i);
-
- Hadoop hadoop = kernal.hadoop();
-
- HadoopJobStatus stat = hadoop.status(jobId);
-
- assert stat != null;
-
- IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
-
- if (!complete)
- assertFalse(fut.isDone());
- else {
- info("Waiting for status future completion on node [idx=" + i + ", nodeId=" +
- kernal.getLocalNodeId() + ']');
-
- fut.get();
- }
- }
- }
-
- /**
- * Test input format
- */
- public static class InFormat extends InputFormat {
-
- @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
- List<InputSplit> res = new ArrayList<>(BLOCK_CNT);
-
- for (int i = 0; i < BLOCK_CNT; i++)
- try {
- res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"}));
- }
- catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- return res;
- }
-
- @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException {
- return new RecordReader() {
- @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
- }
-
- @Override public boolean nextKeyValue() {
- return false;
- }
-
- @Override public Object getCurrentKey() {
- return null;
- }
-
- @Override public Object getCurrentValue() {
- return null;
- }
-
- @Override public float getProgress() {
- return 0;
- }
-
- @Override public void close() {
-
- }
- };
- }
- }
-
- /**
- * Test mapper.
- */
- private static class TestMapper extends Mapper {
- @Override public void run(Context ctx) throws IOException, InterruptedException {
- System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
-
- latch.get("mapAwaitLatch").await();
-
- mapExecCnt.incrementAndGet();
-
- System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
- }
- }
-
- /**
- * Test reducer.
- */
- private static class TestReducer extends Reducer {
- @Override public void run(Context ctx) throws IOException, InterruptedException {
- System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
-
- latch.get("reduceAwaitLatch").await();
-
- reduceExecCnt.incrementAndGet();
-
- System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
- }
- }
-
- /**
- * Test combiner.
- */
- private static class TestCombiner extends Reducer {
- @Override public void run(Context ctx) throws IOException, InterruptedException {
- System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
-
- latch.get("combineAwaitLatch").await();
-
- combineExecCnt.incrementAndGet();
-
- System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
deleted file mode 100644
index 25ef382..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Tests map-reduce execution with embedded mode.
- */
-public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
- /** */
- private static Map<String, Boolean> flags = HadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class)
- .put("flags", new HashMap<String, Boolean>());
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(false);
-
- return cfg;
- }
-
- /**
- * Tests whole job execution with all phases in old and new versions of API with definition of custom
- * Serialization, Partitioner and IO formats.
- * @throws Exception If fails.
- */
- public void testMultiReducerWholeMapReduceExecution() throws Exception {
- IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
- igfs.mkdirs(inDir);
-
- IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
-
- generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000,
- "key6", 18000 );
-
- for (int i = 0; i < 2; i++) {
- boolean useNewAPI = i == 1;
-
- igfs.delete(new IgfsPath(PATH_OUTPUT), true);
-
- flags.put("serializationWasConfigured", false);
- flags.put("partitionerWasConfigured", false);
- flags.put("inputFormatWasConfigured", false);
- flags.put("outputFormatWasConfigured", false);
-
- JobConf jobConf = new JobConf();
-
- jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
-
- //To split into about 6-7 items for v2
- jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
-
- //For v1
- jobConf.setInt("fs.local.block.size", 65000);
-
- // File system coordinates.
- setupFileSystems(jobConf);
-
- HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI);
-
- if (!useNewAPI) {
- jobConf.setPartitionerClass(CustomV1Partitioner.class);
- jobConf.setInputFormat(CustomV1InputFormat.class);
- jobConf.setOutputFormat(CustomV1OutputFormat.class);
- }
-
- Job job = Job.getInstance(jobConf);
-
- HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false);
-
- if (useNewAPI) {
- job.setPartitionerClass(CustomV2Partitioner.class);
- job.setInputFormatClass(CustomV2InputFormat.class);
- job.setOutputFormatClass(CustomV2OutputFormat.class);
- }
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
- FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
-
- job.setNumReduceTasks(3);
-
- job.setJarByClass(HadoopWordCount2.class);
-
- IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
- createJobInfo(job.getConfiguration()));
-
- fut.get();
-
- assertTrue("Serialization was configured (new API is " + useNewAPI + ")",
- flags.get("serializationWasConfigured"));
-
- assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")",
- flags.get("partitionerWasConfigured"));
-
- assertTrue("Input format was configured (new API is = " + useNewAPI + ")",
- flags.get("inputFormatWasConfigured"));
-
- assertTrue("Output format was configured (new API is = " + useNewAPI + ")",
- flags.get("outputFormatWasConfigured"));
-
- assertEquals("Use new API = " + useNewAPI,
- "key3\t15000\n" +
- "key6\t18000\n",
- readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000")
- );
-
- assertEquals("Use new API = " + useNewAPI,
- "key1\t10000\n" +
- "key4\t7000\n",
- readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001")
- );
-
- assertEquals("Use new API = " + useNewAPI,
- "key2\t20000\n" +
- "key5\t12000\n",
- readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002")
- );
-
- }
- }
-
- /**
- * Custom serialization class that inherits behaviour of native {@link WritableSerialization}.
- */
- protected static class CustomSerialization extends WritableSerialization {
- @Override public void setConf(Configuration conf) {
- super.setConf(conf);
-
- flags.put("serializationWasConfigured", true);
- }
- }
-
- /**
- * Custom implementation of Partitioner in v1 API.
- */
- private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner {
- /** {@inheritDoc} */
- @Override public void configure(JobConf job) {
- flags.put("partitionerWasConfigured", true);
- }
- }
-
- /**
- * Custom implementation of Partitioner in v2 API.
- */
- private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
- implements Configurable {
- /** {@inheritDoc} */
- @Override public void setConf(Configuration conf) {
- flags.put("partitionerWasConfigured", true);
- }
-
- /** {@inheritDoc} */
- @Override public Configuration getConf() {
- return null;
- }
- }
-
- /**
- * Custom implementation of InputFormat in v2 API.
- */
- private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable {
- /** {@inheritDoc} */
- @Override public void setConf(Configuration conf) {
- flags.put("inputFormatWasConfigured", true);
- }
-
- /** {@inheritDoc} */
- @Override public Configuration getConf() {
- return null;
- }
- }
-
- /**
- * Custom implementation of OutputFormat in v2 API.
- */
- private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable {
- /** {@inheritDoc} */
- @Override public void setConf(Configuration conf) {
- flags.put("outputFormatWasConfigured", true);
- }
-
- /** {@inheritDoc} */
- @Override public Configuration getConf() {
- return null;
- }
- }
-
- /**
- * Custom implementation of InputFormat in v1 API.
- */
- private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat {
- /** {@inheritDoc} */
- @Override public void configure(JobConf job) {
- super.configure(job);
-
- flags.put("inputFormatWasConfigured", true);
- }
- }
-
- /**
- * Custom implementation of OutputFormat in v1 API.
- */
- private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable {
- /** {@inheritDoc} */
- @Override public void configure(JobConf job) {
- flags.put("outputFormatWasConfigured", true);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
deleted file mode 100644
index dd12935..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-
-/**
- * Test of error resiliency after an error in a map-reduce job execution.
- * Combinations tested:
- * { new ALI, old API }
- * x { unchecked exception, checked exception, error }
- * x { phase where the error happens }.
- */
-public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest {
- /**
- * Tests recovery.
- *
- * @throws Exception If failed.
- */
- public void testRecoveryAfterAnError0_Runtime() throws Exception {
- doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime);
- }
-
- /**
- * Tests recovery.
- *
- * @throws Exception If failed.
- */
- public void testRecoveryAfterAnError0_IOException() throws Exception {
- doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException);
- }
-
- /**
- * Tests recovery.
- *
- * @throws Exception If failed.
- */
- public void testRecoveryAfterAnError0_Error() throws Exception {
- doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error);
- }
-
- /**
- * Tests recovery.
- *
- * @throws Exception If failed.
- */
- public void testRecoveryAfterAnError7_Runtime() throws Exception {
- doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime);
- }
- /**
- * Tests recovery.
- *
- * @throws Exception If failed.
- */
- public void testRecoveryAfterAnError7_IOException() throws Exception {
- doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException);
- }
- /**
- * Tests recovery.
- *
- * @throws Exception If failed.
- */
- public void testRecoveryAfterAnError7_Error() throws Exception {
- doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error);
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 10 * 60 * 1000L;
- }
-
- /**
- * Tests correct work after an error.
- *
- * @throws Exception On error.
- */
- private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception {
- try {
- IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
- igfs.mkdirs(inDir);
-
- IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
-
- generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow);
-
- boolean useNewMapper = (useNewBits & 1) == 0;
- boolean useNewCombiner = (useNewBits & 2) == 0;
- boolean useNewReducer = (useNewBits & 4) == 0;
-
- for (int i = 0; i < 12; i++) {
- int bits = 1 << i;
-
- System.out.println("############################ Simulator kind = " + simulatorKind
- + ", Stage bits = " + bits);
-
- HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits);
-
- doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer);
- }
- } catch (Throwable t) {
- t.printStackTrace();
-
- fail("Unexpected throwable: " + t);
- }
- }
-
- /**
- * Performs test with given error simulator.
- *
- * @param sim The simulator.
- * @param inFile Input file.
- * @param useNewMapper If the use new mapper API.
- * @param useNewCombiner If to use new combiner.
- * @param useNewReducer If to use new reducer API.
- * @throws Exception If failed.
- */
- private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper,
- boolean useNewCombiner, boolean useNewReducer) throws Exception {
- // Set real simulating error simulator:
- assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim));
-
- try {
- // Expect failure there:
- doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
- }
- catch (Throwable t) { // This may be an Error.
- // Expected:
- System.out.println(t.toString()); // Ignore, continue the test.
- }
-
- // Set no-op error simulator:
- assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance));
-
- // Expect success there:
- doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
deleted file mode 100644
index b703896..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-
-/**
- * Test of whole cycle of map-reduce processing via Job tracker.
- */
-public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
- /**
- * Tests whole job execution with all phases in all combination of new and old versions of API.
- * @throws Exception If fails.
- */
- public void testWholeMapReduceExecution() throws Exception {
- IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
- igfs.mkdirs(inDir);
-
- IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
-
- generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow );
-
- for (boolean[] apiMode: getApiModes()) {
- assert apiMode.length == 3;
-
- boolean useNewMapper = apiMode[0];
- boolean useNewCombiner = apiMode[1];
- boolean useNewReducer = apiMode[2];
-
- doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
- }
- }
-
- /**
- * Gets API mode combinations to be tested.
- * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
- *
- * @return Arrays of booleans indicating API combinations to test.
- */
- protected boolean[][] getApiModes() {
- return new boolean[][] {
- { false, false, false },
- { false, false, true },
- { false, true, false },
- { true, false, false },
- { true, true, true },
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
deleted file mode 100644
index 0c172c3..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.configuration.IgniteConfiguration;
-
-/**
- * Test attempt to execute a map-reduce task while no Hadoop processor available.
- */
-public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration c = super.getConfiguration(gridName);
-
- c.setHadoopConfiguration(null);
- c.setPeerClassLoadingEnabled(true);
-
- return c;
- }
-
- /** {@inheritDoc} */
- @Override public void testWholeMapReduceExecution() throws Exception {
- try {
- super.testWholeMapReduceExecution();
-
- fail("IllegalStateException expected.");
- }
- catch (IllegalStateException ignore) {
- // No-op.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
deleted file mode 100644
index 1a87865d..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Collection;
-import java.util.UUID;
-
-/**
- * Mock job for planner tests.
- */
-public class HadoopPlannerMockJob implements HadoopJob {
- /** Input splits. */
- private final Collection<HadoopInputSplit> splits;
-
- /** Reducers count. */
- private final int reducers;
-
- /**
- * Constructor.
- *
- * @param splits Input splits.
- * @param reducers Reducers.
- */
- public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int reducers) {
- this.splits = splits;
- this.reducers = reducers;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
- return splits;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobInfo info() {
- return new JobInfo(reducers);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobId id() {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void dispose(boolean external) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
- throwUnsupported();
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupStagingDirectory() {
- throwUnsupported();
- }
-
- /**
- * Throw {@link UnsupportedOperationException}.
- */
- private static void throwUnsupported() {
- throw new UnsupportedOperationException("Should not be called!");
- }
-
- /**
- * Mocked job info.
- */
- private static class JobInfo implements HadoopJobInfo {
- /** Reducers. */
- private final int reducers;
-
- /**
- * Constructor.
- *
- * @param reducers Reducers.
- */
- public JobInfo(int reducers) {
- this.reducers = reducers;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- return reducers;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String property(String name) {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasCombiner() {
- throwUnsupported();
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasReducer() {
- throwUnsupported();
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
- @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String jobName() {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- throwUnsupported();
-
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
deleted file mode 100644
index 3f825b0..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import com.google.common.collect.MinMaxPriorityQueue;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static com.google.common.collect.Maps.immutableEntry;
-import static com.google.common.collect.MinMaxPriorityQueue.orderedBy;
-import static java.util.Collections.reverseOrder;
-
-/**
- * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than
- * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are
- * output.
- *
- * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system
- * path.
- */
-public class HadoopPopularWordsTest {
- /** Ignite home. */
- private static final String IGNITE_HOME = U.getIgniteHome();
-
- /** The path to the input directory. ALl files in that directory will be processed. */
- private static final Path BOOKS_LOCAL_DIR =
- new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books");
-
- /** The path to the output directory. THe result file will be written to this location. */
- private static final Path RESULT_LOCAL_DIR =
- new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output");
-
- /** Popular books source dir in DFS. */
- private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in");
-
- /** Popular books source dir in DFS. */
- private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out");
-
- /** Path to the distributed file system configuration. */
- private static final String DFS_CFG = "examples/config/filesystem/core-site.xml";
-
- /** Top N words to select **/
- private static final int POPULAR_WORDS_CNT = 10;
-
- /**
- * For each token in the input string the mapper emits a {word, 1} pair.
- */
- private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
- /** Constant value. */
- private static final IntWritable ONE = new IntWritable(1);
-
- /** The word converted into the Text. */
- private Text word = new Text();
-
- /**
- * Emits a entry where the key is the word and the value is always 1.
- *
- * @param key the current position in the input file (not used here)
- * @param val the text string
- * @param ctx mapper context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override protected void map(LongWritable key, Text val, Context ctx)
- throws IOException, InterruptedException {
- // Get the mapped object.
- final String line = val.toString();
-
- // Splits the given string to words.
- final String[] words = line.split("[^a-zA-Z0-9]");
-
- for (final String w : words) {
- // Only emit counts for longer words.
- if (w.length() <= 3)
- continue;
-
- word.set(w);
-
- // Write the word into the context with the initial count equals 1.
- ctx.write(word, ONE);
- }
- }
- }
-
- /**
- * The reducer uses a priority queue to rank the words based on its number of occurrences.
- */
- private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- private MinMaxPriorityQueue<Entry<Integer, String>> q;
-
- TopNWordsReducer() {
- q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() {
- @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) {
- return o1.getKey().compareTo(o2.getKey());
- }
- })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create();
- }
-
- /**
- * This method doesn't emit anything, but just keeps track of the top N words.
- *
- * @param key The word.
- * @param vals The words counts.
- * @param ctx Reducer context.
- * @throws IOException If failed.
- * @throws InterruptedException If failed.
- */
- @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException,
- InterruptedException {
- int sum = 0;
-
- for (IntWritable val : vals)
- sum += val.get();
-
- q.add(immutableEntry(sum, key.toString()));
- }
-
- /**
- * This method is called after all the word entries have been processed. It writes the accumulated
- * statistics to the job output file.
- *
- * @param ctx The job context.
- * @throws IOException If failed.
- * @throws InterruptedException If failed.
- */
- @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
- IntWritable i = new IntWritable();
-
- Text txt = new Text();
-
- // iterate in desc order
- while (!q.isEmpty()) {
- Entry<Integer, String> e = q.removeFirst();
-
- i.set(e.getKey());
-
- txt.set(e.getValue());
-
- ctx.write(txt, i);
- }
- }
- }
-
- /**
- * Configures the Hadoop MapReduce job.
- *
- * @return Instance of the Hadoop MapRed job.
- * @throws IOException If failed.
- */
- @SuppressWarnings("deprecation")
- private Job createConfigBasedHadoopJob() throws IOException {
- Job jobCfg = new Job();
-
- Configuration cfg = jobCfg.getConfiguration();
-
- // Use explicit configuration of distributed file system, if provided.
- cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
-
- jobCfg.setJobName("HadoopPopularWordExample");
- jobCfg.setJarByClass(HadoopPopularWordsTest.class);
- jobCfg.setInputFormatClass(TextInputFormat.class);
- jobCfg.setOutputKeyClass(Text.class);
- jobCfg.setOutputValueClass(IntWritable.class);
- jobCfg.setMapperClass(TokenizingMapper.class);
- jobCfg.setReducerClass(TopNWordsReducer.class);
-
- FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
- FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
-
- // Local job tracker allows the only task per wave, but text input format
- // replaces it with the calculated value based on input split size option.
- if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
- // Split job into tasks using 32MB split size.
- FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024);
- FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
- }
-
- return jobCfg;
- }
-
- /**
- * Runs the Hadoop job.
- *
- * @return {@code True} if succeeded, {@code false} otherwise.
- * @throws Exception If failed.
- */
- private boolean runWordCountConfigBasedHadoopJob() throws Exception {
- Job job = createConfigBasedHadoopJob();
-
- // Distributed file system this job will work with.
- FileSystem fs = FileSystem.get(job.getConfiguration());
-
- X.println(">>> Using distributed file system: " + fs.getHomeDirectory());
-
- // Prepare input and output job directories.
- prepareDirectories(fs);
-
- long time = System.currentTimeMillis();
-
- // Run job.
- boolean res = job.waitForCompletion(true);
-
- X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec.");
-
- // Move job results into local file system, so you can view calculated results.
- publishResults(fs);
-
- return res;
- }
-
- /**
- * Prepare job's data: cleanup result directories that might have left over
- * after previous runs, copy input files from the local file system into DFS.
- *
- * @param fs Distributed file system to use in job.
- * @throws IOException If failed.
- */
- private void prepareDirectories(FileSystem fs) throws IOException {
- X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR);
-
- fs.delete(RESULT_DFS_DIR, true);
-
- X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
-
- fs.delete(BOOKS_DFS_DIR, true);
-
- X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR);
-
- fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR);
- }
-
- /**
- * Publish job execution results into local file system, so you can view them.
- *
- * @param fs Distributed file sytem used in job.
- * @throws IOException If failed.
- */
- private void publishResults(FileSystem fs) throws IOException {
- X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
-
- fs.delete(BOOKS_DFS_DIR, true);
-
- X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR);
-
- fs.delete(RESULT_LOCAL_DIR, true);
-
- X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR);
-
- fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR);
- }
-
- /**
- * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of
- * occurrences of the word in the source files, the N most popular words are selected.
- *
- * @param args None.
- */
- public static void main(String[] args) {
- try {
- new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob();
- }
- catch (Exception e) {
- X.println(">>> Failed to run word count example: " + e.getMessage());
- }
-
- System.exit(0);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
deleted file mode 100644
index 789a6b3..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.util.Arrays;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.JavaSerialization;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSerializationWrapper;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Test of wrapper of the native serialization.
- */
-public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
- /**
- * Tests read/write of IntWritable via native WritableSerialization.
- * @throws Exception If fails.
- */
- public void testIntWritableSerialization() throws Exception {
- HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
-
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
- DataOutput out = new DataOutputStream(buf);
-
- ser.write(out, new IntWritable(3));
- ser.write(out, new IntWritable(-5));
-
- assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray()));
-
- DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
- assertEquals(3, ((IntWritable)ser.read(in, null)).get());
- assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
- }
-
- /**
- * Tests read/write of Integer via native JavaleSerialization.
- * @throws Exception If fails.
- */
- public void testIntJavaSerialization() throws Exception {
- HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
-
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
- DataOutput out = new DataOutputStream(buf);
-
- ser.write(out, 3);
- ser.write(out, -5);
- ser.close();
-
- DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
- assertEquals(3, ((Integer)ser.read(in, null)).intValue());
- assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
deleted file mode 100644
index 7552028..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.concurrent.ConcurrentMap;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- * For tests.
- */
-public class HadoopSharedMap {
- /** */
- private static final ConcurrentMap<String, HadoopSharedMap> maps = new ConcurrentHashMap8<>();
-
- /** */
- private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>();
-
- /**
- * Private.
- */
- private HadoopSharedMap() {
- // No-op.
- }
-
- /**
- * Puts object by key.
- *
- * @param key Key.
- * @param val Value.
- */
- public <T> T put(String key, T val) {
- Object old = map.putIfAbsent(key, val);
-
- return old == null ? val : (T)old;
- }
-
- /**
- * @param cls Class.
- * @return Map of static fields.
- */
- public static HadoopSharedMap map(Class<?> cls) {
- HadoopSharedMap m = maps.get(cls.getName());
-
- if (m != null)
- return m;
-
- HadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new HadoopSharedMap());
-
- return old == null ? m : old;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
deleted file mode 100644
index 27a5fcd..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-/**
- * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
- */
-public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
- /** {@inheritDoc} */
- @Override protected boolean compressOutputSnappy() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean[][] getApiModes() {
- return new boolean[][] {
- { false, false, true },
- { true, true, true },
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
deleted file mode 100644
index 32146e4..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.ignite.internal.processors.hadoop.common.HadoopHelperImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Tests isolated Hadoop Snappy codec usage.
- */
-public class HadoopSnappyTest extends GridCommonAbstractTest {
- /** Length of data. */
- private static final int BYTE_SIZE = 1024 * 50;
-
- /**
- * Checks Snappy codec usage.
- *
- * @throws Exception On error.
- */
- public void testSnappy() throws Throwable {
- // Run Snappy test in default class loader:
- checkSnappy();
-
- // Run the same in several more class loaders simulating jobs and tasks:
- for (int i = 0; i < 2; i++) {
- ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null, new HadoopHelperImpl());
-
- Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
-
- assertEquals(hadoopClsLdr, cls.getClassLoader());
-
- U.invoke(cls, null, "checkSnappy");
- }
- }
-
- /**
- * Internal check routine.
- *
- * @throws Throwable If failed.
- */
- public static void checkSnappy() throws Throwable {
- try {
- byte[] expBytes = new byte[BYTE_SIZE];
- byte[] actualBytes = new byte[BYTE_SIZE];
-
- for (int i = 0; i < expBytes.length ; i++)
- expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
-
- SnappyCodec codec = new SnappyCodec();
-
- codec.setConf(new Configuration());
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
- cos.write(expBytes);
- cos.flush();
- }
-
- try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
- int read = cis.read(actualBytes, 0, actualBytes.length);
-
- assert read == actualBytes.length;
- }
-
- assert Arrays.equals(expBytes, actualBytes);
- }
- catch (Throwable e) {
- System.out.println("Snappy check failed:");
- System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded());
- System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded());
-
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
deleted file mode 100644
index dff5e70..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-
-/**
- * External test for sorting.
- */
-public class HadoopSortingExternalTest extends HadoopSortingTest {
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(true);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setMarshaller(new JdkMarshaller());
-
- return cfg;
- }
-}
\ No newline at end of file