You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/18 09:23:53 UTC
[07/29] ignite git commit: IGNITE-2218: Fixed a problem with native
Hadoop libraries load. This closes #378.
IGNITE-2218: Fixed a problem with native Hadoop libraries load. This closes #378.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d58d14a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d58d14a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d58d14a
Branch: refs/heads/ignite-2236
Commit: 7d58d14a80b1c32f88fbb4cf68e5d289c5aee474
Parents: 012ca73
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jan 4 12:14:58 2016 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jan 4 12:14:58 2016 +0400
----------------------------------------------------------------------
.../processors/hadoop/HadoopClassLoader.java | 71 ++++++++++---
.../hadoop/v2/HadoopNativeCodeLoader.java | 74 --------------
.../hadoop/HadoopAbstractWordCountTest.java | 46 +++++++--
.../hadoop/HadoopMapReduceEmbeddedSelfTest.java | 2 +-
.../processors/hadoop/HadoopMapReduceTest.java | 15 ++-
.../hadoop/HadoopSnappyFullMapReduceTest.java | 28 +++++
.../processors/hadoop/HadoopSnappyTest.java | 102 +++++++++++++++++++
.../processors/hadoop/HadoopTasksV2Test.java | 2 +-
.../hadoop/examples/HadoopWordCount1Reduce.java | 1 +
.../hadoop/examples/HadoopWordCount2.java | 18 +++-
.../examples/HadoopWordCount2Reducer.java | 1 +
.../testsuites/IgniteHadoopTestSuite.java | 18 +++-
12 files changed, 279 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 735133f..270b31d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -30,13 +30,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-
+import java.util.Vector;
+import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopNativeCodeLoader;
import org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.objectweb.asm.AnnotationVisitor;
@@ -69,6 +70,9 @@ public class HadoopClassLoader extends URLClassLoader {
/** Name of the Hadoop daemon class. */
public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon";
+ /** Name of libhadoop library. */
+ private static final String LIBHADOOP = "hadoop.";
+
/** */
private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader();
@@ -119,6 +123,51 @@ public class HadoopClassLoader extends URLClassLoader {
assert !(getParent() instanceof HadoopClassLoader);
this.name = name;
+
+ initializeNativeLibraries();
+ }
+
+ /**
+ * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different
+ * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries
+ * to load the same native library over and over again.
+ * <p>
+ * To fix the problem, we force native library load in parent class loader and then "link" handle to this native
+ * library to our class loader. As a result, our class loader will think that the library is already loaded and will
+ * be able to link native methods.
+ *
+ * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version">
+ * JNI specification</a>
+ */
+ private void initializeNativeLibraries() {
+ try {
+ // This must trigger native library load.
+ Class.forName(NativeCodeLoader.class.getName(), true, APP_CLS_LDR);
+
+ final Vector<Object> curVector = U.field(this, "nativeLibraries");
+
+ ClassLoader ldr = APP_CLS_LDR;
+
+ while (ldr != null) {
+ Vector vector = U.field(ldr, "nativeLibraries");
+
+ for (Object lib : vector) {
+ String libName = U.field(lib, "name");
+
+ if (libName.contains(LIBHADOOP)) {
+ curVector.add(lib);
+
+ return;
+ }
+ }
+
+ ldr = ldr.getParent();
+ }
+ }
+ catch (Exception e) {
+ U.quietAndWarn(null, "Failed to initialize Hadoop native library " +
+ "(native Hadoop methods might not work properly): " + e);
+ }
}
/**
@@ -152,8 +201,6 @@ public class HadoopClassLoader extends URLClassLoader {
if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks.
return loadFromBytes(name, HadoopShutdownHookManager.class.getName());
- else if (name.endsWith(".util.NativeCodeLoader"))
- return loadFromBytes(name, HadoopNativeCodeLoader.class.getName());
else if (name.equals(HADOOP_DAEMON_CLASS_NAME))
// We replace this in order to be able to forcibly stop some daemon threads
// that otherwise never stop (e.g. PeerCache runnables):
@@ -274,7 +321,7 @@ public class HadoopClassLoader extends URLClassLoader {
/**
* Check whether class has external dependencies on Hadoop.
- *
+ *
* @param clsName Class name.
* @return {@code True} if class has external dependencies.
*/
@@ -285,15 +332,15 @@ public class HadoopClassLoader extends URLClassLoader {
ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor);
ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor);
ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor);
-
+
return hasExternalDependencies(clsName, ctx);
}
-
+
/**
* Check whether class has external dependencies on Hadoop.
- *
+ *
* @param clsName Class name.
- * @param ctx Context.
+ * @param ctx Context.
* @return {@code true} If the class has external dependencies.
*/
boolean hasExternalDependencies(String clsName, CollectingContext ctx) {
@@ -519,7 +566,7 @@ public class HadoopClassLoader extends URLClassLoader {
/** Field visitor. */
private FieldVisitor fldVisitor;
-
+
/** Class visitor. */
private ClassVisitor clsVisitor;
@@ -627,7 +674,7 @@ public class HadoopClassLoader extends URLClassLoader {
onType(t);
}
}
- }
+ }
/**
* Annotation visitor.
@@ -638,7 +685,7 @@ public class HadoopClassLoader extends URLClassLoader {
/**
* Annotation visitor.
- *
+ *
* @param ctx The collector.
*/
CollectingAnnotationVisitor(CollectingContext ctx) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
deleted file mode 100644
index 4c4840d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A fake helper to load the native hadoop code i.e. libhadoop.so.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class HadoopNativeCodeLoader {
- /**
- * Check if native-hadoop code is loaded for this platform.
- *
- * @return <code>true</code> if native-hadoop is loaded,
- * else <code>false</code>
- */
- public static boolean isNativeCodeLoaded() {
- return false;
- }
-
- /**
- * Returns true only if this build was compiled with support for snappy.
- */
- public static boolean buildSupportsSnappy() {
- return false;
- }
-
- /**
- * @return Library name.
- */
- public static String getLibraryName() {
- throw new IllegalStateException();
- }
-
- /**
- * Return if native hadoop libraries, if present, can be used for this job.
- * @param conf configuration
- *
- * @return <code>true</code> if native hadoop libraries, if present, can be
- * used for this job; <code>false</code> otherwise.
- */
- public boolean getLoadNativeLibraries(Configuration conf) {
- return false;
- }
-
- /**
- * Set if native hadoop libraries, if present, can be used for this job.
- *
- * @param conf configuration
- * @param loadNativeLibraries can native hadoop libraries be loaded
- */
- public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
index a47eaf6..e45c127 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -26,6 +27,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
@@ -118,21 +124,49 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
}
/**
+ * Read w/o decoding (default).
+ *
+ * @param fileName The file.
+ * @return The file contents, human-readable.
+ * @throws Exception On error.
+ */
+ protected String readAndSortFile(String fileName) throws Exception {
+ return readAndSortFile(fileName, null);
+ }
+
+ /**
* Reads whole text file into String.
*
* @param fileName Name of the file to read.
* @return Content of the file as String value.
* @throws Exception If could not read the file.
*/
- protected String readAndSortFile(String fileName) throws Exception {
- BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName))));
+ protected String readAndSortFile(String fileName, Configuration conf) throws Exception {
+ final List<String> list = new ArrayList<>();
+
+ final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false);
+
+ if (snappyDecode) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(new Path(fileName)))) {
+ Text key = new Text();
- List<String> list = new ArrayList<>();
+ IntWritable val = new IntWritable();
- String line;
+ while (reader.next(key, val))
+ list.add(key + "\t" + val);
+ }
+ }
+ else {
+ try (InputStream is0 = igfs.open(new IgfsPath(fileName))) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is0));
+
+ String line;
- while ((line = reader.readLine()) != null)
- list.add(line);
+ while ((line = reader.readLine()) != null)
+ list.add(line);
+ }
+ }
Collections.sort(list);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
index c0eff48..25ef382 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
@@ -106,7 +106,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
Job job = Job.getInstance(jobConf);
- HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI);
+ HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false);
if (useNewAPI) {
job.setPartitionerClass(CustomV2Partitioner.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index d0bd92b..7fd8272 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -183,7 +183,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
Job job = Job.getInstance(jobConf);
- HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer);
+ HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
@@ -207,18 +207,29 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
checkOwner(new IgfsPath(outFile));
+ String actual = readAndSortFile(outFile, job.getConfiguration());
+
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
useNewReducer,
"blue\t" + blue + "\n" +
"green\t" + green + "\n" +
"red\t" + red + "\n" +
"yellow\t" + yellow + "\n",
- readAndSortFile(outFile)
+ actual
);
}
}
/**
+ * Gets if to compress output data with Snappy.
+ *
+ * @return If to compress output data with Snappy.
+ */
+ protected boolean compressOutputSnappy() {
+ return false;
+ }
+
+ /**
* Simple test job statistics.
*
* @param jobId Job id.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
new file mode 100644
index 0000000..22d33a5
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
+ */
+public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override protected boolean compressOutputSnappy() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
new file mode 100644
index 0000000..014ff1e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests isolated Hadoop Snappy codec usage.
+ */
+public class HadoopSnappyTest extends GridCommonAbstractTest {
+ /** Length of data. */
+ private static final int BYTE_SIZE = 1024 * 50;
+
+ /**
+ * Checks Snappy codec usage.
+ *
+ * @throws Exception On error.
+ */
+ public void testSnappy() throws Throwable {
+ // Run Snappy test in default class loader:
+ checkSnappy();
+
+ // Run the same in several more class loaders simulating jobs and tasks:
+ for (int i = 0; i < 2; i++) {
+ ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i);
+
+ Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
+
+ assertEquals(hadoopClsLdr, cls.getClassLoader());
+
+ U.invoke(cls, null, "checkSnappy");
+ }
+ }
+
+ /**
+ * Internal check routine.
+ *
+ * @throws Throwable If failed.
+ */
+ public static void checkSnappy() throws Throwable {
+ try {
+ byte[] expBytes = new byte[BYTE_SIZE];
+ byte[] actualBytes = new byte[BYTE_SIZE];
+
+ for (int i = 0; i < expBytes.length ; i++)
+ expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
+
+ SnappyCodec codec = new SnappyCodec();
+
+ codec.setConf(new Configuration());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
+ cos.write(expBytes);
+ cos.flush();
+ }
+
+ try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+ int read = cis.read(actualBytes, 0, actualBytes.length);
+
+ assert read == actualBytes.length;
+ }
+
+ assert Arrays.equals(expBytes, actualBytes);
+ }
+ catch (Throwable e) {
+ System.out.println("Snappy check failed:");
+ System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded());
+ System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded());
+
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index 3a964d6..d125deb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -48,7 +48,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
- HadoopWordCount2.setTasksClasses(job, true, true, true);
+ HadoopWordCount2.setTasksClasses(job, true, true, true, false);
Configuration conf = job.getConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
index 120ac19..2335911 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
@@ -47,6 +47,7 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex
output.collect(key, new IntWritable(sum));
}
+ /** {@inheritDoc} */
@Override public void configure(JobConf job) {
super.configure(job);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index 942a908..4b508ca 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
@@ -62,7 +65,7 @@ public class HadoopWordCount2 {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
- setTasksClasses(job, true, true, true);
+ setTasksClasses(job, true, true, true, false);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
@@ -80,7 +83,8 @@ public class HadoopWordCount2 {
* @param setCombiner Option to set combiner class.
* @param setReducer Option to set reducer and output format classes.
*/
- public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) {
+ public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer,
+ boolean outputCompression) {
if (setMapper) {
job.setMapperClass(HadoopWordCount2Mapper.class);
job.setInputFormatClass(TextInputFormat.class);
@@ -93,5 +97,15 @@ public class HadoopWordCount2 {
job.setReducerClass(HadoopWordCount2Reducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
}
+
+ if (outputCompression) {
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
+
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+
+ job.getConfiguration().set(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
index b2be53e..63a9d95 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
@@ -55,6 +55,7 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In
/** {@inheritDoc} */
@Override protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
+
wasSetUp = true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 1831085..6c542b5 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
import junit.framework.TestSuite;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest;
import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopSnappyTest;
import org.apache.ignite.internal.processors.hadoop.HadoopSortingTest;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapperSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest;
@@ -70,6 +73,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test;
import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test;
import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest;
import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopSnappyFullMapReduceTest;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest;
@@ -96,6 +100,9 @@ public class IgniteHadoopTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName())));
@@ -192,7 +199,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
X.println("Will use Hadoop version: " + ver);
- String downloadPath = "hadoop/common/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz";
+ String downloadPath = "hadoop/core/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz";
download("Hadoop", "HADOOP_HOME", downloadPath, "hadoop-" + ver);
}
@@ -217,6 +224,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
}
List<String> urls = F.asList(
+ "http://archive.apache.org/dist/",
"http://apache-mirror.rbc.ru/pub/apache/",
"http://www.eu.apache.org/dist/",
"http://www.us.apache.org/dist/");
@@ -273,6 +281,14 @@ public class IgniteHadoopTestSuite extends TestSuite {
if (!dest.mkdirs())
throw new IllegalStateException();
}
+ else if (entry.isSymbolicLink()) {
+ // Important: in Hadoop installation there are symlinks, we need to create them:
+ Path theLinkItself = Paths.get(install.getAbsolutePath(), entry.getName());
+
+ Path linkTarget = Paths.get(entry.getLinkName());
+
+ Files.createSymbolicLink(theLinkItself, linkTarget);
+ }
else {
File parent = dest.getParentFile();