You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/09 17:03:06 UTC
[34/40] ignite git commit: IGNITE-4516: Hadoop: implemented optional
compression for remote shuffle output. This closes #1399.
IGNITE-4516: Hadoop: implemented optional compression for remote shuffle output. This closes #1399.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d62542b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d62542b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d62542b9
Branch: refs/heads/ignite-comm-balance-master
Commit: d62542b9bbfff5a221915f2bd1d23ecfee9814cf
Parents: 2774d87
Author: devozerov <vo...@gridgain.com>
Authored: Thu Jan 5 11:30:42 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Jan 5 11:30:42 2017 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopJobProperty.java | 7 +++
.../shuffle/HadoopDirectShuffleMessage.java | 34 +++++++++++-
.../hadoop/shuffle/HadoopShuffleJob.java | 57 +++++++++++++++++---
.../shuffle/direct/HadoopDirectDataOutput.java | 14 +++++
.../direct/HadoopDirectDataOutputContext.java | 48 +++++++++++++++--
.../direct/HadoopDirectDataOutputState.java | 14 ++++-
.../hadoop/impl/HadoopTeraSortTest.java | 32 +++++++++--
7 files changed, 188 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index 4398acd..4dd3bf5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -103,6 +103,13 @@ public enum HadoopJobProperty {
SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),
/**
+ * Whether shuffle message should be compressed with GZIP.
+ * <p>
+ * Defaults to {@code false}.
+ */
+ SHUFFLE_MSG_GZIP("ignite.shuffle.message.gzip"),
+
+ /**
* Whether to stripe mapper output for remote reducers.
* <p>
* Defaults to {@code false}.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
index e81dc5f..f596100 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
@@ -57,6 +57,9 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
@GridDirectTransient
private transient int bufLen;
+ /** Data length. */
+ private int dataLen;
+
/**
* Default constructor.
*/
@@ -72,8 +75,9 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
* @param cnt Count.
* @param buf Buffer.
* @param bufLen Buffer length.
+ * @param dataLen Data length.
*/
- public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen) {
+ public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen, int dataLen) {
assert jobId != null;
this.jobId = jobId;
@@ -81,6 +85,7 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
this.cnt = cnt;
this.buf = buf;
this.bufLen = bufLen;
+ this.dataLen = dataLen;
}
/**
@@ -111,6 +116,13 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
return buf;
}
+ /**
+ * @return Data length.
+ */
+ public int dataLength() {
+ return dataLen;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -147,6 +159,12 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
writer.incrementState();
+ case 4:
+ if (!writer.writeInt("dataLen", dataLen))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -194,6 +212,14 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
reader.incrementState();
+ case 4:
+ dataLen = reader.readInt("dataLen");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(HadoopDirectShuffleMessage.class);
@@ -206,7 +232,7 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 5;
}
/** {@inheritDoc} */
@@ -222,6 +248,8 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
out.writeInt(cnt);
U.writeByteArray(out, buf);
+
+ out.writeInt(dataLen);
}
/** {@inheritDoc} */
@@ -234,6 +262,8 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
buf = U.readByteArray(in);
bufLen = buf != null ? buf.length : 0;
+
+ dataLen = in.readInt();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 1c546a1..0394865 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -56,6 +56,8 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -63,10 +65,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.zip.GZIPInputStream;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_GZIP;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -79,6 +83,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
private static final int DFLT_SHUFFLE_MSG_SIZE = 1024 * 1024;
/** */
+ private static final boolean DFLT_SHUFFLE_MSG_GZIP = false;
+
+ /** */
private final HadoopJob job;
/** */
@@ -130,6 +137,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** Message size. */
private final int msgSize;
+ /** Whether to GZIP shuffle messages. */
+ private final boolean msgGzip;
+
/** Whether to strip mappers for remote execution. */
private final boolean stripeMappers;
@@ -190,6 +200,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
stripeMappers = stripeMappers0;
msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
+ msgGzip = get(job.info(), SHUFFLE_MSG_GZIP, DFLT_SHUFFLE_MSG_GZIP);
locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt);
@@ -360,22 +371,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @throws IgniteCheckedException Exception.
*/
public void onDirectShuffleMessage(T src, HadoopDirectShuffleMessage msg) throws IgniteCheckedException {
- assert msg.buffer() != null;
+ byte[] buf = extractBuffer(msg);
- HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get();
+ assert buf != null;
+
+ int rdc = msg.reducer();
+
+ HadoopTaskContext taskCtx = locReducersCtx.get(rdc).get();
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
- perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+ perfCntr.onShuffleMessage(rdc, U.currentTimeMillis());
- HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer());
+ HadoopMultimap map = getOrCreateMap(locMaps, rdc);
HadoopSerialization keySer = taskCtx.keySerialization();
HadoopSerialization valSer = taskCtx.valueSerialization();
// Add data from message to the map.
try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
- HadoopDirectDataInput in = new HadoopDirectDataInput(msg.buffer());
+ HadoopDirectDataInput in = new HadoopDirectDataInput(buf);
Object key = null;
Object val = null;
@@ -393,6 +408,31 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
/**
+ * Extract buffer from direct shuffle message.
+ *
+ * @param msg Message.
+ * @return Buffer.
+ */
+ private byte[] extractBuffer(HadoopDirectShuffleMessage msg) throws IgniteCheckedException {
+ if (msgGzip) {
+ byte[] res = new byte[msg.dataLength()];
+
+ try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(msg.buffer()), res.length)) {
+ int len = in.read(res, 0, res.length);
+
+ assert len == res.length;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to uncompress direct shuffle message.", e);
+ }
+
+ return res;
+ }
+ else
+ return msg.buffer();
+ }
+
+ /**
* @param ack Shuffle ack.
*/
@SuppressWarnings("ConstantConditions")
@@ -595,7 +635,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @param rmtDirectCtx Remote direct context.
* @param reset Whether to perform reset.
*/
- private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, boolean reset) {
+ private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx,
+ boolean reset) {
if (rmtDirectCtx == null)
return;
@@ -612,7 +653,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
rmtDirectCtx.reset();
HadoopDirectShuffleMessage msg = new HadoopDirectShuffleMessage(job.id(), rmtRdcIdx, cnt,
- state.buffer(), state.bufferLength());
+ state.buffer(), state.bufferLength(), state.dataLength());
T nodeId = reduceAddrs[rmtRdcIdx];
@@ -983,7 +1024,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
HadoopDirectDataOutputContext rmtDirectCtx = rmtDirectCtxs[idx];
if (rmtDirectCtx == null) {
- rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, taskCtx);
+ rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, msgGzip, taskCtx);
rmtDirectCtxs[idx] = rmtDirectCtx;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
index 151e552..5840994 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
@@ -170,6 +170,13 @@ public class HadoopDirectDataOutput extends OutputStream implements DataOutput {
}
/**
+ * @return Buffer length (how much memory is allocated).
+ */
+ public int bufferLength() {
+ return bufSize;
+ }
+
+ /**
* @return Position.
*/
public int position() {
@@ -184,6 +191,13 @@ public class HadoopDirectDataOutput extends OutputStream implements DataOutput {
}
/**
+ * Reset the stream.
+ */
+ public void reset() {
+ pos = 0;
+ }
+
+ /**
* Ensure that the given amount of bytes is available within the stream, then shift the position.
*
* @param cnt Count.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
index bc70ef3..454278b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
@@ -18,16 +18,29 @@
package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
/**
* Hadoop data output context for direct communication.
*/
public class HadoopDirectDataOutputContext {
+ /** Initial allocation size for GZIP output. We start with very low value, but then it will grow if needed. */
+ private static final int GZIP_OUT_MIN_ALLOC_SIZE = 1024;
+
+ /** GZIP buffer size. We should remove it when we implement efficient direct GZIP output. */
+ private static final int GZIP_BUFFER_SIZE = 8096;
+
/** Flush size. */
private final int flushSize;
+ /** Whether to perform GZIP. */
+ private final boolean gzip;
+
/** Key serialization. */
private final HadoopSerialization keySer;
@@ -37,6 +50,9 @@ public class HadoopDirectDataOutputContext {
/** Data output. */
private HadoopDirectDataOutput out;
+ /** Data output for GZIP. */
+ private HadoopDirectDataOutput gzipOut;
+
/** Number of keys written. */
private int cnt;
@@ -44,17 +60,22 @@ public class HadoopDirectDataOutputContext {
* Constructor.
*
* @param flushSize Flush size.
+ * @param gzip Whether to perform GZIP.
* @param taskCtx Task context.
* @throws IgniteCheckedException If failed.
*/
- public HadoopDirectDataOutputContext(int flushSize, HadoopTaskContext taskCtx)
+ public HadoopDirectDataOutputContext(int flushSize, boolean gzip, HadoopTaskContext taskCtx)
throws IgniteCheckedException {
this.flushSize = flushSize;
+ this.gzip = gzip;
keySer = taskCtx.keySerialization();
valSer = taskCtx.valueSerialization();
out = new HadoopDirectDataOutput(flushSize);
+
+ if (gzip)
+ gzipOut = new HadoopDirectDataOutput(Math.max(flushSize / 8, GZIP_OUT_MIN_ALLOC_SIZE));
}
/**
@@ -85,16 +106,35 @@ public class HadoopDirectDataOutputContext {
* @return State.
*/
public HadoopDirectDataOutputState state() {
- return new HadoopDirectDataOutputState(out.buffer(), out.position());
+ if (gzip) {
+ try {
+ try (GZIPOutputStream gzip = new GZIPOutputStream(gzipOut, GZIP_BUFFER_SIZE)) {
+ gzip.write(out.buffer(), 0, out.position());
+ }
+
+ return new HadoopDirectDataOutputState(gzipOut.buffer(), gzipOut.position(), out.position());
+ }
+ catch (IOException e) {
+ throw new IgniteException("Failed to compress.", e);
+ }
+ }
+ else
+ return new HadoopDirectDataOutputState(out.buffer(), out.position(), out.position());
}
/**
* Reset buffer.
*/
public void reset() {
- int allocSize = Math.max(flushSize, out.position());
+ if (gzip) {
+ // In GZIP mode we do not expose normal output to the outside. Hence, no need for reallocation, just reset.
+ out.reset();
+
+ gzipOut = new HadoopDirectDataOutput(gzipOut.bufferLength());
+ }
+ else
+ out = new HadoopDirectDataOutput(flushSize, out.bufferLength());
- out = new HadoopDirectDataOutput(flushSize, allocSize);
cnt = 0;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
index a9c12e3..cadde7a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
@@ -27,15 +27,20 @@ public class HadoopDirectDataOutputState {
/** Buffer length. */
private final int bufLen;
+ /** Data length. */
+ private final int dataLen;
+
/**
* Constructor.
*
* @param buf Buffer.
* @param bufLen Buffer length.
+ * @param dataLen Data length.
*/
- public HadoopDirectDataOutputState(byte[] buf, int bufLen) {
+ public HadoopDirectDataOutputState(byte[] buf, int bufLen, int dataLen) {
this.buf = buf;
this.bufLen = bufLen;
+ this.dataLen = dataLen;
}
/**
@@ -51,4 +56,11 @@ public class HadoopDirectDataOutputState {
public int bufferLength() {
return bufLen;
}
+
+ /**
+ * @return Original data length.
+ */
+ public int dataLength() {
+ return dataLen;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index b1fa91f..d237180 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -137,8 +137,10 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
/**
* Does actual test TeraSort job Through Ignite API
+ *
+ * @param gzip Whether to use GZIP.
*/
- protected final void teraSort() throws Exception {
+ protected final void teraSort(boolean gzip) throws Exception {
System.out.println("TeraSort ===============================================================");
getFileSystem().delete(new Path(sortOutDir), true);
@@ -164,6 +166,10 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
jobConf.set("mapred.max.split.size", String.valueOf(splitSize));
jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), true);
+ jobConf.setInt(HadoopJobProperty.SHUFFLE_MSG_SIZE.propertyName(), 4096);
+
+ if (gzip)
+ jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MSG_GZIP.propertyName(), true);
jobConf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(),
TextPartiallyRawComparator.class.getName());
@@ -347,12 +353,32 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
/**
* Runs generate/sort/validate phases of the terasort sample.
- * @throws Exception
+ *
+ * @throws Exception If failed.
*/
public void testTeraSort() throws Exception {
+ checkTeraSort(false);
+ }
+
+ /**
+ * Runs generate/sort/validate phases of the terasort sample.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTeraSortGzip() throws Exception {
+ checkTeraSort(true);
+ }
+
+ /**
+ * Check terasort.
+ *
+ * @param gzip GZIP flag.
+ * @throws Exception If failed.
+ */
+ private void checkTeraSort(boolean gzip) throws Exception {
teraGenerate();
- teraSort();
+ teraSort(gzip);
teraValidate();
}