You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/11 06:58:16 UTC

[46/50] [abbrv] ignite git commit: IGNITE-2218: Fixed a problem with native Hadoop libraries load. This closes #378.

IGNITE-2218: Fixed a problem with native Hadoop libraries load. This closes #378.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d58d14a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d58d14a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d58d14a

Branch: refs/heads/ignite-1537
Commit: 7d58d14a80b1c32f88fbb4cf68e5d289c5aee474
Parents: 012ca73
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jan 4 12:14:58 2016 +0400
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jan 4 12:14:58 2016 +0400

----------------------------------------------------------------------
 .../processors/hadoop/HadoopClassLoader.java    |  71 ++++++++++---
 .../hadoop/v2/HadoopNativeCodeLoader.java       |  74 --------------
 .../hadoop/HadoopAbstractWordCountTest.java     |  46 +++++++--
 .../hadoop/HadoopMapReduceEmbeddedSelfTest.java |   2 +-
 .../processors/hadoop/HadoopMapReduceTest.java  |  15 ++-
 .../hadoop/HadoopSnappyFullMapReduceTest.java   |  28 +++++
 .../processors/hadoop/HadoopSnappyTest.java     | 102 +++++++++++++++++++
 .../processors/hadoop/HadoopTasksV2Test.java    |   2 +-
 .../hadoop/examples/HadoopWordCount1Reduce.java |   1 +
 .../hadoop/examples/HadoopWordCount2.java       |  18 +++-
 .../examples/HadoopWordCount2Reducer.java       |   1 +
 .../testsuites/IgniteHadoopTestSuite.java       |  18 +++-
 12 files changed, 279 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 735133f..270b31d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -30,13 +30,14 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-
+import java.util.Vector;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopNativeCodeLoader;
 import org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.objectweb.asm.AnnotationVisitor;
@@ -69,6 +70,9 @@ public class HadoopClassLoader extends URLClassLoader {
     /** Name of the Hadoop daemon class. */
     public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon";
 
+    /** Name of libhadoop library. */
+    private static final String LIBHADOOP = "hadoop.";
+
     /** */
     private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader();
 
@@ -119,6 +123,51 @@ public class HadoopClassLoader extends URLClassLoader {
         assert !(getParent() instanceof HadoopClassLoader);
 
         this.name = name;
+
+        initializeNativeLibraries();
+    }
+
+    /**
+     * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different
+     * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries
+     * to load the same native library over and over again.
+     * <p>
+     * To fix the problem, we force native library load in parent class loader and then "link" handle to this native
+     * library to our class loader. As a result, our class loader will think that the library is already loaded and will
+     * be able to link native methods.
+     *
+     * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version">
+     *     JNI specification</a>
+     */
+    private void initializeNativeLibraries() {
+        try {
+            // This must trigger native library load.
+            Class.forName(NativeCodeLoader.class.getName(), true, APP_CLS_LDR);
+
+            final Vector<Object> curVector = U.field(this, "nativeLibraries");
+
+            ClassLoader ldr = APP_CLS_LDR;
+
+            while (ldr != null) {
+                Vector vector = U.field(ldr, "nativeLibraries");
+
+                for (Object lib : vector) {
+                    String libName = U.field(lib, "name");
+
+                    if (libName.contains(LIBHADOOP)) {
+                        curVector.add(lib);
+
+                        return;
+                    }
+                }
+
+                ldr = ldr.getParent();
+            }
+        }
+        catch (Exception e) {
+            U.quietAndWarn(null, "Failed to initialize Hadoop native library " +
+                "(native Hadoop methods might not work properly): " + e);
+        }
     }
 
     /**
@@ -152,8 +201,6 @@ public class HadoopClassLoader extends URLClassLoader {
             if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
                 if (name.endsWith(".util.ShutdownHookManager"))  // Dirty hack to get rid of Hadoop shutdown hooks.
                     return loadFromBytes(name, HadoopShutdownHookManager.class.getName());
-                else if (name.endsWith(".util.NativeCodeLoader"))
-                    return loadFromBytes(name, HadoopNativeCodeLoader.class.getName());
                 else if (name.equals(HADOOP_DAEMON_CLASS_NAME))
                     // We replace this in order to be able to forcibly stop some daemon threads
                     // that otherwise never stop (e.g. PeerCache runnables):
@@ -274,7 +321,7 @@ public class HadoopClassLoader extends URLClassLoader {
 
     /**
      * Check whether class has external dependencies on Hadoop.
-     * 
+     *
      * @param clsName Class name.
      * @return {@code True} if class has external dependencies.
      */
@@ -285,15 +332,15 @@ public class HadoopClassLoader extends URLClassLoader {
         ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor);
         ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor);
         ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor);
-        
+
         return hasExternalDependencies(clsName, ctx);
     }
-        
+
     /**
      * Check whether class has external dependencies on Hadoop.
-     * 
+     *
      * @param clsName Class name.
-     * @param ctx Context.                
+     * @param ctx Context.
      * @return {@code true} If the class has external dependencies.
      */
     boolean hasExternalDependencies(String clsName, CollectingContext ctx) {
@@ -519,7 +566,7 @@ public class HadoopClassLoader extends URLClassLoader {
 
         /** Field visitor. */
         private FieldVisitor fldVisitor;
-        
+
         /** Class visitor. */
         private ClassVisitor clsVisitor;
 
@@ -627,7 +674,7 @@ public class HadoopClassLoader extends URLClassLoader {
                 onType(t);
             }
         }
-    }    
+    }
 
     /**
      * Annotation visitor.
@@ -638,7 +685,7 @@ public class HadoopClassLoader extends URLClassLoader {
 
         /**
          * Annotation visitor.
-         * 
+         *
          * @param ctx The collector.
          */
         CollectingAnnotationVisitor(CollectingContext ctx) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
deleted file mode 100644
index 4c4840d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A fake helper to load the native hadoop code i.e. libhadoop.so.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class HadoopNativeCodeLoader {
-    /**
-     * Check if native-hadoop code is loaded for this platform.
-     *
-     * @return <code>true</code> if native-hadoop is loaded,
-     *         else <code>false</code>
-     */
-    public static boolean isNativeCodeLoaded() {
-        return false;
-    }
-
-    /**
-     * Returns true only if this build was compiled with support for snappy.
-     */
-    public static boolean buildSupportsSnappy() {
-        return false;
-    }
-
-    /**
-     * @return Library name.
-     */
-    public static String getLibraryName() {
-        throw new IllegalStateException();
-    }
-
-    /**
-     * Return if native hadoop libraries, if present, can be used for this job.
-     * @param conf configuration
-     *
-     * @return <code>true</code> if native hadoop libraries, if present, can be
-     *         used for this job; <code>false</code> otherwise.
-     */
-    public boolean getLoadNativeLibraries(Configuration conf) {
-        return false;
-    }
-
-    /**
-     * Set if native hadoop libraries, if present, can be used for this job.
-     *
-     * @param conf configuration
-     * @param loadNativeLibraries can native hadoop libraries be loaded
-     */
-    public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
index a47eaf6..e45c127 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import com.google.common.base.Joiner;
 import java.io.BufferedReader;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -26,6 +27,11 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 
@@ -118,21 +124,49 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest
     }
 
     /**
+     * Read w/o decoding (default).
+     *
+     * @param fileName The file.
+     * @return The file contents, human-readable.
+     * @throws Exception On error.
+     */
+    protected String readAndSortFile(String fileName) throws Exception {
+        return readAndSortFile(fileName, null);
+    }
+
+    /**
      * Reads whole text file into String.
      *
      * @param fileName Name of the file to read.
      * @return Content of the file as String value.
      * @throws Exception If could not read the file.
      */
-    protected String readAndSortFile(String fileName) throws Exception {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName))));
+    protected String readAndSortFile(String fileName, Configuration conf) throws Exception {
+        final List<String> list = new ArrayList<>();
+
+        final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false);
+
+        if (snappyDecode) {
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+                    SequenceFile.Reader.file(new Path(fileName)))) {
+                Text key = new Text();
 
-        List<String> list = new ArrayList<>();
+                IntWritable val = new IntWritable();
 
-        String line;
+                while (reader.next(key, val))
+                    list.add(key + "\t" + val);
+            }
+        }
+        else {
+            try (InputStream is0 = igfs.open(new IgfsPath(fileName))) {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(is0));
+
+                String line;
 
-        while ((line = reader.readLine()) != null)
-            list.add(line);
+                while ((line = reader.readLine()) != null)
+                    list.add(line);
+            }
+        }
 
         Collections.sort(list);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
index c0eff48..25ef382 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java
@@ -106,7 +106,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
 
             Job job = Job.getInstance(jobConf);
 
-            HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI);
+            HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false);
 
             if (useNewAPI) {
                 job.setPartitionerClass(CustomV2Partitioner.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index d0bd92b..7fd8272 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -183,7 +183,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             Job job = Job.getInstance(jobConf);
 
-            HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer);
+            HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
 
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(IntWritable.class);
@@ -207,18 +207,29 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             checkOwner(new IgfsPath(outFile));
 
+            String actual = readAndSortFile(outFile, job.getConfiguration());
+
             assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
                 useNewReducer,
                 "blue\t" + blue + "\n" +
                 "green\t" + green + "\n" +
                 "red\t" + red + "\n" +
                 "yellow\t" + yellow + "\n",
-                readAndSortFile(outFile)
+                actual
             );
         }
     }
 
     /**
+     * Gets if to compress output data with Snappy.
+     *
+     * @return If to compress output data with Snappy.
+     */
+    protected boolean compressOutputSnappy() {
+        return false;
+    }
+
+    /**
      * Simple test job statistics.
      *
      * @param jobId Job id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
new file mode 100644
index 0000000..22d33a5
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
+ */
+public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected boolean compressOutputSnappy() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
new file mode 100644
index 0000000..014ff1e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests isolated Hadoop Snappy codec usage.
+ */
+public class HadoopSnappyTest extends GridCommonAbstractTest {
+    /** Length of data. */
+    private static final int BYTE_SIZE = 1024 * 50;
+
+    /**
+     * Checks Snappy codec usage.
+     *
+     * @throws Exception On error.
+     */
+    public void testSnappy() throws Throwable {
+        // Run Snappy test in default class loader:
+        checkSnappy();
+
+        // Run the same in several more class loaders simulating jobs and tasks:
+        for (int i = 0; i < 2; i++) {
+            ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i);
+
+            Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
+
+            assertEquals(hadoopClsLdr, cls.getClassLoader());
+
+            U.invoke(cls, null, "checkSnappy");
+        }
+    }
+
+    /**
+     * Internal check routine.
+     *
+     * @throws Throwable If failed.
+     */
+    public static void checkSnappy() throws Throwable {
+        try {
+            byte[] expBytes = new byte[BYTE_SIZE];
+            byte[] actualBytes = new byte[BYTE_SIZE];
+
+            for (int i = 0; i < expBytes.length ; i++)
+                expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
+
+            SnappyCodec codec = new SnappyCodec();
+
+            codec.setConf(new Configuration());
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
+                cos.write(expBytes);
+                cos.flush();
+            }
+
+            try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+                int read = cis.read(actualBytes, 0, actualBytes.length);
+
+                assert read == actualBytes.length;
+            }
+
+            assert Arrays.equals(expBytes, actualBytes);
+        }
+        catch (Throwable e) {
+            System.out.println("Snappy check failed:");
+            System.out.println("### NativeCodeLoader.isNativeCodeLoaded:  " + NativeCodeLoader.isNativeCodeLoaded());
+            System.out.println("### SnappyCompressor.isNativeCodeLoaded:  " + SnappyCompressor.isNativeCodeLoaded());
+
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index 3a964d6..d125deb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -48,7 +48,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);
 
-        HadoopWordCount2.setTasksClasses(job, true, true, true);
+        HadoopWordCount2.setTasksClasses(job, true, true, true, false);
 
         Configuration conf = job.getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
index 120ac19..2335911 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
@@ -47,6 +47,7 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex
         output.collect(key, new IntWritable(sum));
     }
 
+    /** {@inheritDoc} */
     @Override public void configure(JobConf job) {
         super.configure(job);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index 942a908..4b508ca 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.hadoop.examples;
 import java.io.IOException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 /**
@@ -62,7 +65,7 @@ public class HadoopWordCount2 {
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);
 
-        setTasksClasses(job, true, true, true);
+        setTasksClasses(job, true, true, true, false);
 
         FileInputFormat.setInputPaths(job, new Path(input));
         FileOutputFormat.setOutputPath(job, new Path(output));
@@ -80,7 +83,8 @@ public class HadoopWordCount2 {
      * @param setCombiner Option to set combiner class.
      * @param setReducer Option to set reducer and output format classes.
      */
-    public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) {
+    public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer,
+            boolean outputCompression) {
         if (setMapper) {
             job.setMapperClass(HadoopWordCount2Mapper.class);
             job.setInputFormatClass(TextInputFormat.class);
@@ -93,5 +97,15 @@ public class HadoopWordCount2 {
             job.setReducerClass(HadoopWordCount2Reducer.class);
             job.setOutputFormatClass(TextOutputFormat.class);
         }
+
+        if (outputCompression) {
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+            SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
+
+            SequenceFileOutputFormat.setCompressOutput(job, true);
+
+            job.getConfiguration().set(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
index b2be53e..63a9d95 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
@@ -55,6 +55,7 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In
     /** {@inheritDoc} */
     @Override protected void setup(Context context) throws IOException, InterruptedException {
         super.setup(context);
+
         wasSetUp = true;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 1831085..6c542b5 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
 import junit.framework.TestSuite;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopSnappyTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopSortingTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapperSelfTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest;
@@ -70,6 +73,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test;
 import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test;
 import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopSnappyFullMapReduceTest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest;
@@ -96,6 +100,9 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
 
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));
+
         suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName())));
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName())));
@@ -192,7 +199,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         X.println("Will use Hadoop version: " + ver);
 
-        String downloadPath = "hadoop/common/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz";
+        String downloadPath = "hadoop/core/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz";
 
         download("Hadoop", "HADOOP_HOME", downloadPath, "hadoop-" + ver);
     }
@@ -217,6 +224,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
         }
 
         List<String> urls = F.asList(
+            "http://archive.apache.org/dist/",
             "http://apache-mirror.rbc.ru/pub/apache/",
             "http://www.eu.apache.org/dist/",
             "http://www.us.apache.org/dist/");
@@ -273,6 +281,14 @@ public class IgniteHadoopTestSuite extends TestSuite {
                             if (!dest.mkdirs())
                                 throw new IllegalStateException();
                         }
+                        else if (entry.isSymbolicLink()) {
+                            // Important: in Hadoop installation there are symlinks, we need to create them:
+                            Path theLinkItself = Paths.get(install.getAbsolutePath(), entry.getName());
+
+                            Path linkTarget = Paths.get(entry.getLinkName());
+
+                            Files.createSymbolicLink(theLinkItself, linkTarget);
+                        }
                         else {
                             File parent = dest.getParentFile();