You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/17 09:59:16 UTC

[06/50] [abbrv] ignite git commit: IGNITE-4516: Hadoop: implemented optional compression for remote shuffle output. This closes #1399.

IGNITE-4516: Hadoop: implemented optional compression for remote shuffle output. This closes #1399.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7d781ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7d781ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7d781ee

Branch: refs/heads/ignite-1.9
Commit: e7d781ee3221107d9a819dd70cb5776558a59e2a
Parents: 1776e9e
Author: devozerov <vo...@gridgain.com>
Authored: Thu Jan 5 11:30:42 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Jan 5 12:39:09 2017 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopJobProperty.java    |  7 +++
 .../shuffle/HadoopDirectShuffleMessage.java     | 34 +++++++++++-
 .../hadoop/shuffle/HadoopShuffleJob.java        | 57 +++++++++++++++++---
 .../shuffle/direct/HadoopDirectDataOutput.java  | 14 +++++
 .../direct/HadoopDirectDataOutputContext.java   | 48 +++++++++++++++--
 .../direct/HadoopDirectDataOutputState.java     | 14 ++++-
 .../hadoop/impl/HadoopTeraSortTest.java         | 32 +++++++++--
 7 files changed, 188 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index a3115bf..60992d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -103,6 +103,13 @@ public enum HadoopJobProperty {
     SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),
 
     /**
+     * Whether shuffle message should be compressed with GZIP.
+     * <p>
+     * Defaults to {@code false}.
+     */
+    SHUFFLE_MSG_GZIP("ignite.shuffle.message.gzip"),
+
+    /**
      * Whether to stripe mapper output for remote reducers.
      * <p>
      * Defaults to {@code true}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
index e81dc5f..f596100 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
@@ -57,6 +57,9 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
     @GridDirectTransient
     private transient int bufLen;
 
+    /** Data length. */
+    private int dataLen;
+
     /**
      * Default constructor.
      */
@@ -72,8 +75,9 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
      * @param cnt Count.
      * @param buf Buffer.
      * @param bufLen Buffer length.
+     * @param dataLen Data length.
      */
-    public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen) {
+    public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen, int dataLen) {
         assert jobId != null;
 
         this.jobId = jobId;
@@ -81,6 +85,7 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
         this.cnt = cnt;
         this.buf = buf;
         this.bufLen = bufLen;
+        this.dataLen = dataLen;
     }
 
     /**
@@ -111,6 +116,13 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
         return buf;
     }
 
+    /**
+     * @return Data length.
+     */
+    public int dataLength() {
+        return dataLen;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -147,6 +159,12 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
 
                 writer.incrementState();
 
+            case 4:
+                if (!writer.writeInt("dataLen", dataLen))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -194,6 +212,14 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
 
                 reader.incrementState();
 
+            case 4:
+                dataLen = reader.readInt("dataLen");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(HadoopDirectShuffleMessage.class);
@@ -206,7 +232,7 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 5;
     }
 
     /** {@inheritDoc} */
@@ -222,6 +248,8 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
         out.writeInt(cnt);
 
         U.writeByteArray(out, buf);
+
+        out.writeInt(dataLen);
     }
 
     /** {@inheritDoc} */
@@ -234,6 +262,8 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
 
         buf = U.readByteArray(in);
         bufLen = buf != null ? buf.length : 0;
+
+        dataLen = in.readInt();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 7713d6d..318ead3 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -56,6 +56,8 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -63,10 +65,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.zip.GZIPInputStream;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_GZIP;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -79,6 +83,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     private static final int DFLT_SHUFFLE_MSG_SIZE = 1024 * 1024;
 
     /** */
+    private static final boolean DFLT_SHUFFLE_MSG_GZIP = false;
+
+    /** */
     private final HadoopJob job;
 
     /** */
@@ -130,6 +137,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** Message size. */
     private final int msgSize;
 
+    /** Whether to GZIP shuffle messages. */
+    private final boolean msgGzip;
+
     /** Whether to strip mappers for remote execution. */
     private final boolean stripeMappers;
 
@@ -190,6 +200,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         stripeMappers = stripeMappers0;
 
         msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
+        msgGzip = get(job.info(), SHUFFLE_MSG_GZIP, DFLT_SHUFFLE_MSG_GZIP);
 
         locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt);
 
@@ -360,22 +371,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * @throws IgniteCheckedException Exception.
      */
     public void onDirectShuffleMessage(T src, HadoopDirectShuffleMessage msg) throws IgniteCheckedException {
-        assert msg.buffer() != null;
+        byte[] buf = extractBuffer(msg);
 
-        HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get();
+        assert buf != null;
+
+        int rdc = msg.reducer();
+
+        HadoopTaskContext taskCtx = locReducersCtx.get(rdc).get();
 
         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
 
-        perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+        perfCntr.onShuffleMessage(rdc, U.currentTimeMillis());
 
-        HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer());
+        HadoopMultimap map = getOrCreateMap(locMaps, rdc);
 
         HadoopSerialization keySer = taskCtx.keySerialization();
         HadoopSerialization valSer = taskCtx.valueSerialization();
 
         // Add data from message to the map.
         try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
-            HadoopDirectDataInput in = new HadoopDirectDataInput(msg.buffer());
+            HadoopDirectDataInput in = new HadoopDirectDataInput(buf);
 
             Object key = null;
             Object val = null;
@@ -393,6 +408,31 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     }
 
     /**
+     * Extract buffer from direct shuffle message.
+     *
+     * @param msg Message.
+     * @return Buffer.
+     */
+    private byte[] extractBuffer(HadoopDirectShuffleMessage msg) throws IgniteCheckedException {
+        if (msgGzip) {
+            byte[] res = new byte[msg.dataLength()];
+
+            try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(msg.buffer()), res.length)) {
+                int len = in.read(res, 0, res.length);
+
+                assert len == res.length;
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to uncompress direct shuffle message.", e);
+            }
+
+            return res;
+        }
+        else
+            return msg.buffer();
+    }
+
+    /**
      * @param ack Shuffle ack.
      */
     @SuppressWarnings("ConstantConditions")
@@ -595,7 +635,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * @param rmtDirectCtx Remote direct context.
      * @param reset Whether to perform reset.
      */
-    private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, boolean reset) {
+    private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx,
+        boolean reset) {
         if (rmtDirectCtx == null)
             return;
 
@@ -612,7 +653,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             rmtDirectCtx.reset();
 
         HadoopDirectShuffleMessage msg = new HadoopDirectShuffleMessage(job.id(), rmtRdcIdx, cnt,
-            state.buffer(), state.bufferLength());
+            state.buffer(), state.bufferLength(), state.dataLength());
 
         T nodeId = reduceAddrs[rmtRdcIdx];
 
@@ -983,7 +1024,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                     HadoopDirectDataOutputContext rmtDirectCtx = rmtDirectCtxs[idx];
 
                     if (rmtDirectCtx == null) {
-                        rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, taskCtx);
+                        rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, msgGzip, taskCtx);
 
                         rmtDirectCtxs[idx] = rmtDirectCtx;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
index 151e552..5840994 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
@@ -170,6 +170,13 @@ public class HadoopDirectDataOutput extends OutputStream implements DataOutput {
     }
 
     /**
+     * @return Buffer length (how much memory is allocated).
+     */
+    public int bufferLength() {
+        return bufSize;
+    }
+
+    /**
      * @return Position.
      */
     public int position() {
@@ -184,6 +191,13 @@ public class HadoopDirectDataOutput extends OutputStream implements DataOutput {
     }
 
     /**
+     * Reset the stream.
+     */
+    public void reset() {
+        pos = 0;
+    }
+
+    /**
      * Ensure that the given amount of bytes is available within the stream, then shift the position.
      *
      * @param cnt Count.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
index bc70ef3..454278b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
@@ -18,16 +18,29 @@
 package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
 /**
  * Hadoop data output context for direct communication.
  */
 public class HadoopDirectDataOutputContext {
+    /** Initial allocation size for GZIP output. We start with very low value, but then it will grow if needed. */
+    private static final int GZIP_OUT_MIN_ALLOC_SIZE = 1024;
+
+    /** GZIP buffer size. We should remove it when we implement efficient direct GZIP output. */
+    private static final int GZIP_BUFFER_SIZE = 8096;
+
     /** Flush size. */
     private final int flushSize;
 
+    /** Whether to perform GZIP. */
+    private final boolean gzip;
+
     /** Key serialization. */
     private final HadoopSerialization keySer;
 
@@ -37,6 +50,9 @@ public class HadoopDirectDataOutputContext {
     /** Data output. */
     private HadoopDirectDataOutput out;
 
+    /** Data output for GZIP. */
+    private HadoopDirectDataOutput gzipOut;
+
     /** Number of keys written. */
     private int cnt;
 
@@ -44,17 +60,22 @@ public class HadoopDirectDataOutputContext {
      * Constructor.
      *
      * @param flushSize Flush size.
+     * @param gzip Whether to perform GZIP.
      * @param taskCtx Task context.
      * @throws IgniteCheckedException If failed.
      */
-    public HadoopDirectDataOutputContext(int flushSize, HadoopTaskContext taskCtx)
+    public HadoopDirectDataOutputContext(int flushSize, boolean gzip, HadoopTaskContext taskCtx)
         throws IgniteCheckedException {
         this.flushSize = flushSize;
+        this.gzip = gzip;
 
         keySer = taskCtx.keySerialization();
         valSer = taskCtx.valueSerialization();
 
         out = new HadoopDirectDataOutput(flushSize);
+
+        if (gzip)
+            gzipOut = new HadoopDirectDataOutput(Math.max(flushSize / 8, GZIP_OUT_MIN_ALLOC_SIZE));
     }
 
     /**
@@ -85,16 +106,35 @@ public class HadoopDirectDataOutputContext {
      * @return State.
      */
     public HadoopDirectDataOutputState state() {
-        return new HadoopDirectDataOutputState(out.buffer(), out.position());
+        if (gzip) {
+            try {
+                try (GZIPOutputStream gzip = new GZIPOutputStream(gzipOut, GZIP_BUFFER_SIZE)) {
+                    gzip.write(out.buffer(), 0, out.position());
+                }
+
+                return new HadoopDirectDataOutputState(gzipOut.buffer(), gzipOut.position(), out.position());
+            }
+            catch (IOException e) {
+                throw new IgniteException("Failed to compress.", e);
+            }
+        }
+        else
+            return new HadoopDirectDataOutputState(out.buffer(), out.position(), out.position());
     }
 
     /**
      * Reset buffer.
      */
     public void reset() {
-        int allocSize = Math.max(flushSize, out.position());
+        if (gzip) {
+            // In GZIP mode we do not expose normal output to the outside. Hence, no need for reallocation, just reset.
+            out.reset();
+
+            gzipOut = new HadoopDirectDataOutput(gzipOut.bufferLength());
+        }
+        else
+            out = new HadoopDirectDataOutput(flushSize, out.bufferLength());
 
-        out = new HadoopDirectDataOutput(flushSize, allocSize);
         cnt = 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
index a9c12e3..cadde7a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
@@ -27,15 +27,20 @@ public class HadoopDirectDataOutputState {
     /** Buffer length. */
     private final int bufLen;
 
+    /** Data length. */
+    private final int dataLen;
+
     /**
      * Constructor.
      *
      * @param buf Buffer.
      * @param bufLen Buffer length.
+     * @param dataLen Data length.
      */
-    public HadoopDirectDataOutputState(byte[] buf, int bufLen) {
+    public HadoopDirectDataOutputState(byte[] buf, int bufLen, int dataLen) {
         this.buf = buf;
         this.bufLen = bufLen;
+        this.dataLen = dataLen;
     }
 
     /**
@@ -51,4 +56,11 @@ public class HadoopDirectDataOutputState {
     public int bufferLength() {
         return bufLen;
     }
+
+    /**
+     * @return Original data length.
+     */
+    public int dataLength() {
+        return dataLen;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7d781ee/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index b1fa91f..d237180 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -137,8 +137,10 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
 
     /**
      * Does actual test TeraSort job Through Ignite API
+     *
+     * @param gzip Whether to use GZIP.
      */
-    protected final void teraSort() throws Exception {
+    protected final void teraSort(boolean gzip) throws Exception {
         System.out.println("TeraSort ===============================================================");
 
         getFileSystem().delete(new Path(sortOutDir), true);
@@ -164,6 +166,10 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
         jobConf.set("mapred.max.split.size", String.valueOf(splitSize));
 
         jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), true);
+        jobConf.setInt(HadoopJobProperty.SHUFFLE_MSG_SIZE.propertyName(), 4096);
+
+        if (gzip)
+            jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MSG_GZIP.propertyName(), true);
 
         jobConf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(),
             TextPartiallyRawComparator.class.getName());
@@ -347,12 +353,32 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
 
     /**
      * Runs generate/sort/validate phases of the terasort sample.
-     * @throws Exception
+     *
+     * @throws Exception If failed.
      */
     public void testTeraSort() throws Exception {
+        checkTeraSort(false);
+    }
+
+    /**
+     * Runs generate/sort/validate phases of the terasort sample.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTeraSortGzip() throws Exception {
+        checkTeraSort(true);
+    }
+
+    /**
+     * Check terasort.
+     *
+     * @param gzip GZIP flag.
+     * @throws Exception If failed.
+     */
+    private void checkTeraSort(boolean gzip) throws Exception {
         teraGenerate();
 
-        teraSort();
+        teraSort(gzip);
 
         teraValidate();
     }