You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 11:41:43 UTC
[14/49] ignite git commit: IGNITE-4270: Hadoop: implemented striped
mapper output. This closes #1334.
IGNITE-4270: Hadoop: implemented striped mapper output. This closes #1334.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/065ca4a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/065ca4a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/065ca4a7
Branch: refs/heads/ignite-2.0
Commit: 065ca4a75a0765409a27d87c781efb215c0a6c48
Parents: 2f51b4a
Author: devozerov <vo...@gridgain.com>
Authored: Fri Dec 9 12:01:40 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Dec 9 12:01:40 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 6 +
.../processors/hadoop/HadoopJobProperty.java | 7 +
.../hadoop/HadoopMapperAwareTaskOutput.java | 32 ++
.../processors/hadoop/HadoopTaskInfo.java | 43 ++
.../shuffle/HadoopDirectShuffleMessage.java | 243 ++++++++++++
.../processors/hadoop/HadoopMapperUtils.java | 56 +++
.../hadoop/impl/v2/HadoopV2Context.java | 11 +
.../hadoop/impl/v2/HadoopV2MapTask.java | 10 +
.../hadoop/jobtracker/HadoopJobTracker.java | 4 +
.../hadoop/shuffle/HadoopShuffle.java | 23 +-
.../hadoop/shuffle/HadoopShuffleJob.java | 389 ++++++++++++++-----
.../shuffle/HadoopShuffleRemoteState.java | 5 +-
.../shuffle/direct/HadoopDirectDataInput.java | 166 ++++++++
.../shuffle/direct/HadoopDirectDataOutput.java | 221 +++++++++++
.../direct/HadoopDirectDataOutputContext.java | 100 +++++
.../direct/HadoopDirectDataOutputState.java | 54 +++
.../child/HadoopChildProcessRunner.java | 2 +-
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 22 +-
18 files changed, 1287 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 4ffb220..504e683 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -122,6 +122,7 @@ import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -170,6 +171,11 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -42:
+ msg = new HadoopDirectShuffleMessage();
+
+ break;
+
case -41:
msg = new HadoopShuffleFinishResponse();
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index e713caa..1f0ef1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -91,6 +91,13 @@ public enum HadoopJobProperty {
SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),
/**
+ * Whether to stripe mapper output for remote reducers.
+ * <p>
+ * Defaults to {@code false}.
+ */
+ SHUFFLE_MAPPER_STRIPED_OUTPUT("ignite.shuffle.mapper.striped.output"),
+
+ /**
* Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
* controls sleep duration between iterations through intermediate reducer maps.
* <p>
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java
new file mode 100644
index 0000000..1d6637c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Special output type with callback invoked when mapper finished writing data.
+ */
+public interface HadoopMapperAwareTaskOutput extends HadoopTaskOutput {
+ /**
+ * Callback invoked when mapper finished writing data.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onMapperFinished() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
index b76fb85..3509367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
@@ -46,6 +46,12 @@ public class HadoopTaskInfo implements Externalizable {
/** */
private HadoopInputSplit inputSplit;
+ /** Whether mapper index is set. */
+ private boolean mapperIdxSet;
+
+ /** Current mapper index. */
+ private int mapperIdx;
+
/**
* For {@link Externalizable}.
*/
@@ -78,6 +84,13 @@ public class HadoopTaskInfo implements Externalizable {
out.writeInt(taskNum);
out.writeInt(attempt);
out.writeObject(inputSplit);
+
+ if (mapperIdxSet) {
+ out.writeBoolean(true);
+ out.writeInt(mapperIdx);
+ }
+ else
+ out.writeBoolean(false);
}
/** {@inheritDoc} */
@@ -87,6 +100,13 @@ public class HadoopTaskInfo implements Externalizable {
taskNum = in.readInt();
attempt = in.readInt();
inputSplit = (HadoopInputSplit)in.readObject();
+
+ if (in.readBoolean()) {
+ mapperIdxSet = true;
+ mapperIdx = in.readInt();
+ }
+ else
+ mapperIdxSet = false;
}
/**
@@ -118,6 +138,29 @@ public class HadoopTaskInfo implements Externalizable {
}
/**
+ * @param mapperIdx Current mapper index.
+ */
+ public void mapperIndex(int mapperIdx) {
+ this.mapperIdx = mapperIdx;
+
+ mapperIdxSet = true;
+ }
+
+ /**
+ * @return Current mapper index or {@code null}
+ */
+ public int mapperIndex() {
+ return mapperIdx;
+ }
+
+ /**
+ * @return {@code True} if mapped index is set.
+ */
+ public boolean hasMapperIndex() {
+ return mapperIdxSet;
+ }
+
+ /**
* @return Input split.
*/
@Nullable public HadoopInputSplit inputSplit() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
new file mode 100644
index 0000000..e81dc5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Direct shuffle message.
+ */
+public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** */
+ @GridToStringInclude
+ private int reducer;
+
+ /** Count. */
+ private int cnt;
+
+ /** Buffer. */
+ private byte[] buf;
+
+ /** Buffer length (equal or less than buf.length). */
+ @GridDirectTransient
+ private transient int bufLen;
+
+ /**
+ * Default constructor.
+ */
+ public HadoopDirectShuffleMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param jobId Job ID.
+ * @param reducer Reducer.
+ * @param cnt Count.
+ * @param buf Buffer.
+ * @param bufLen Buffer length.
+ */
+ public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen) {
+ assert jobId != null;
+
+ this.jobId = jobId;
+ this.reducer = reducer;
+ this.cnt = cnt;
+ this.buf = buf;
+ this.bufLen = bufLen;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Reducer.
+ */
+ public int reducer() {
+ return reducer;
+ }
+
+ /**
+ * @return Count.
+ */
+ public int count() {
+ return cnt;
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public byte[] buffer() {
+ return buf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMessage("jobId", jobId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("reducer", reducer))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeInt("cnt", cnt))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByteArray("buf", this.buf, 0, bufLen))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ jobId = reader.readMessage("jobId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ reducer = reader.readInt("reducer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ cnt = reader.readInt("cnt");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ this.buf = reader.readByteArray("buf");
+
+ if (!reader.isLastRead())
+ return false;
+
+ bufLen = this.buf != null ? this.buf.length : 0;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(HadoopDirectShuffleMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -42;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeInt(reducer);
+ out.writeInt(cnt);
+
+ U.writeByteArray(out, buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+ jobId.readExternal(in);
+
+ reducer = in.readInt();
+ cnt = in.readInt();
+
+ buf = U.readByteArray(in);
+ bufLen = buf != null ? buf.length : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopDirectShuffleMessage.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java
new file mode 100644
index 0000000..87adcb7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Set of mapper utility methods.
+ */
+public class HadoopMapperUtils {
+ /** Thread-local mapper index. */
+ private static final ThreadLocal<Integer> MAP_IDX = new ThreadLocal<>();
+
+ /**
+ * @return Current mapper index.
+ */
+ public static int mapperIndex() {
+ Integer res = MAP_IDX.get();
+
+ return res != null ? res : -1;
+ }
+
+ /**
+ * @param idx Current mapper index.
+ */
+ public static void mapperIndex(Integer idx) {
+ MAP_IDX.set(idx);
+ }
+
+ /**
+ * Clear mapper index.
+ */
+ public static void clearMapperIndex() {
+ MAP_IDX.remove();
+ }
+
+ /**
+ * Constructor.
+ */
+ private HadoopMapperUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index 90a1bad..eec0636 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
@@ -153,6 +154,16 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc
}
}
+ /**
+ * Callback invoked from mapper thread when map is finished.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onMapperFinished() throws IgniteCheckedException {
+ if (output instanceof HadoopMapperAwareTaskOutput)
+ ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
+ }
+
/** {@inheritDoc} */
@Override public OutputCommitter getOutputCommitter() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
index 418df4e..eb3b935 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
/**
@@ -49,6 +50,11 @@ public class HadoopV2MapTask extends HadoopV2Task {
JobContextImpl jobCtx = taskCtx.jobContext();
+ if (taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
+
try {
InputSplit nativeSplit = hadoopContext().getInputSplit();
@@ -72,6 +78,8 @@ public class HadoopV2MapTask extends HadoopV2Task {
try {
mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+
+ hadoopContext().onMapperFinished();
}
finally {
closeWriter();
@@ -92,6 +100,8 @@ public class HadoopV2MapTask extends HadoopV2Task {
throw new IgniteCheckedException(e);
}
finally {
+ HadoopMapperUtils.clearMapperIndex();
+
if (err != null)
abort(outputFormat);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 36782bf..a725534 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -1018,6 +1018,8 @@ public class HadoopJobTracker extends HadoopComponent {
if (state == null)
state = initState(jobId);
+ int mapperIdx = 0;
+
for (HadoopInputSplit split : mappers) {
if (state.addMapper(split)) {
if (log.isDebugEnabled())
@@ -1026,6 +1028,8 @@ public class HadoopJobTracker extends HadoopComponent {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
+ taskInfo.mapperIndex(mapperIdx++);
+
if (tasks == null)
tasks = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 82bbd32..8ffea8c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
import org.apache.ignite.internal.processors.hadoop.HadoopContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -102,8 +104,8 @@ public class HadoopShuffle extends HadoopComponent {
private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
- HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
- ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), true);
+ HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, ctx.jobTracker().job(jobId, null),
+ mem, plan.reducers(), plan.reducers(ctx.localNodeId()), localMappersCount(plan), true);
UUID[] rdcAddrs = new UUID[plan.reducers()];
@@ -123,6 +125,18 @@ public class HadoopShuffle extends HadoopComponent {
}
/**
+ * Get number of local mappers.
+ *
+ * @param plan Plan.
+ * @return Number of local mappers.
+ */
+ private int localMappersCount(HadoopMapReducePlan plan) {
+ Collection<HadoopInputSplit> locMappers = plan.mappers(ctx.localNodeId());
+
+ return F.isEmpty(locMappers) ? 0 : locMappers.size();
+ }
+
+ /**
* @param nodeId Node ID to send message to.
* @param msg Message to send.
* @throws IgniteCheckedException If send failed.
@@ -195,6 +209,11 @@ public class HadoopShuffle extends HadoopComponent {
job(m.jobId()).onShuffleMessage(src, m);
}
+ else if (msg instanceof HadoopDirectShuffleMessage) {
+ HadoopDirectShuffleMessage m = (HadoopDirectShuffleMessage)msg;
+
+ job(m.jobId()).onDirectShuffleMessage(src, m);
+ }
else if (msg instanceof HadoopShuffleAck) {
HadoopShuffleAck m = (HadoopShuffleAck)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 0a3a0ae..214a335 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -17,20 +17,16 @@
package org.apache.ignite.internal.processors.hadoop.shuffle;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
@@ -41,6 +37,9 @@ import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutputContext;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutputState;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -55,9 +54,19 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -121,6 +130,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** Message size. */
private final int msgSize;
+ /** Whether to strip mappers for remote execution. */
+ private final boolean stripeMappers;
+
/** Local shuffle states. */
private volatile HashMap<T, HadoopShuffleLocalState> locShuffleStates = new HashMap<>();
@@ -143,11 +155,12 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @param mem Memory.
* @param totalReducerCnt Amount of reducers in the Job.
* @param locReducers Reducers will work on current node.
+ * @param locMappersCnt Number of mappers running on the given node.
* @param embedded Whether shuffle is running in embedded mode.
* @throws IgniteCheckedException If error.
*/
public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
- int totalReducerCnt, int[] locReducers, boolean embedded) throws IgniteCheckedException {
+ int totalReducerCnt, int[] locReducers, int locMappersCnt, boolean embedded) throws IgniteCheckedException {
this.locReduceAddr = locReduceAddr;
this.totalReducerCnt = totalReducerCnt;
this.job = job;
@@ -155,6 +168,27 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
this.log = log.getLogger(HadoopShuffleJob.class);
this.embedded = embedded;
+ // No stripes for combiner.
+ boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
+
+ if (stripeMappers0) {
+ if (job.info().hasCombiner()) {
+ log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
+ job.id() + ']');
+
+ stripeMappers0 = false;
+ }
+
+ if (!embedded) {
+ log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
+ job.id() + ']');
+
+ stripeMappers0 = false;
+ }
+ }
+
+ stripeMappers = stripeMappers0;
+
msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt);
@@ -169,9 +203,20 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
needPartitioner = totalReducerCnt > 1;
+ // Size of local map is always equal to total reducer number to allow index-based lookup.
locMaps = new AtomicReferenceArray<>(totalReducerCnt);
- rmtMaps = new AtomicReferenceArray<>(totalReducerCnt);
- msgs = new HadoopShuffleMessage[totalReducerCnt];
+
+ // Size of remote map:
+ // - If there are no local mappers, then we will not send anything, so set to 0;
+ // - If output is not striped, then match it to total reducer count, the same way as for local maps.
+ // - If output is striped, then multiply previous value by number of local mappers.
+ int rmtMapsSize = locMappersCnt == 0 ? 0 : totalReducerCnt;
+
+ if (stripeMappers)
+ rmtMapsSize *= locMappersCnt;
+
+ rmtMaps = new AtomicReferenceArray<>(rmtMapsSize);
+ msgs = new HadoopShuffleMessage[rmtMapsSize];
throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
}
@@ -208,24 +253,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
this.io = io;
- if (!flushed) {
- snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
- @Override protected void body() throws InterruptedException {
- try {
- while (!isCancelled()) {
- if (throttle > 0)
- Thread.sleep(throttle);
-
- collectUpdatesAndSend(false);
+ if (!stripeMappers) {
+ if (!flushed) {
+ snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
+ @Override protected void body() throws InterruptedException {
+ try {
+ while (!isCancelled()) {
+ if (throttle > 0)
+ Thread.sleep(throttle);
+
+ collectUpdatesAndSend(false);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IllegalStateException(e);
}
}
- catch (IgniteCheckedException e) {
- throw new IllegalStateException(e);
- }
- }
- };
+ };
- new IgniteThread(snd).start();
+ new IgniteThread(snd).start();
+ }
}
ioInitLatch.countDown();
@@ -306,6 +353,46 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
/**
+ * Process shuffle message.
+ *
+ * @param src Source.
+ * @param msg Message.
+ * @throws IgniteCheckedException Exception.
+ */
+ public void onDirectShuffleMessage(T src, HadoopDirectShuffleMessage msg) throws IgniteCheckedException {
+ assert msg.buffer() != null;
+
+ HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get();
+
+ HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
+
+ perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+
+ HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer());
+
+ HadoopSerialization keySer = taskCtx.keySerialization();
+ HadoopSerialization valSer = taskCtx.valueSerialization();
+
+ // Add data from message to the map.
+ try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
+ HadoopDirectDataInput in = new HadoopDirectDataInput(msg.buffer());
+
+ Object key = null;
+ Object val = null;
+
+ for (int i = 0; i < msg.count(); i++) {
+ key = keySer.read(in, key);
+ val = valSer.read(in, val);
+
+ adder.write(key, val);
+ }
+ }
+
+ if (localShuffleState(src).onShuffleMessage())
+ sendFinishResponse(src, msg.jobId());
+ }
+
+ /**
* @param ack Shuffle ack.
*/
@SuppressWarnings("ConstantConditions")
@@ -467,88 +554,149 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
/**
- * Sends map updates to remote reducers.
+ * Send updates to remote reducers.
+ *
+ * @param flush Flush flag.
+ * @throws IgniteCheckedException If failed.
*/
private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
- for (int i = 0; i < rmtMaps.length(); i++) {
- HadoopMultimap map = rmtMaps.get(i);
+ for (int i = 0; i < rmtMaps.length(); i++)
+ collectUpdatesAndSend(i, flush);
+ }
+
+ /**
+ * Send updates to concrete remote reducer.
+ *
+ * @param rmtMapIdx Remote map index.
+ * @param flush Flush flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void collectUpdatesAndSend(int rmtMapIdx, boolean flush) throws IgniteCheckedException {
+ final int rmtRdcIdx = stripeMappers ? rmtMapIdx % totalReducerCnt : rmtMapIdx;
- if (map == null)
- continue; // Skip empty map and local node.
+ HadoopMultimap map = rmtMaps.get(rmtMapIdx);
- if (msgs[i] == null)
- msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize);
+ if (map == null)
+ return;
- final int idx = i;
+ if (msgs[rmtMapIdx] == null)
+ msgs[rmtMapIdx] = new HadoopShuffleMessage(job.id(), rmtRdcIdx, msgSize);
- map.visit(false, new HadoopMultimap.Visitor() {
- /** */
- private long keyPtr;
+ visit(map, rmtMapIdx, rmtRdcIdx);
- /** */
- private int keySize;
+ if (flush && msgs[rmtMapIdx].offset() != 0)
+ send(rmtMapIdx, rmtRdcIdx, 0);
+ }
- /** */
- private boolean keyAdded;
+ /**
+ * Flush remote direct context.
+ *
+ * @param rmtMapIdx Remote map index.
+ * @param rmtDirectCtx Remote direct context.
+ * @param reset Whether to perform reset.
+ */
+ private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, boolean reset) {
+ if (rmtDirectCtx == null)
+ return;
- /** {@inheritDoc} */
- @Override public void onKey(long keyPtr, int keySize) {
- this.keyPtr = keyPtr;
- this.keySize = keySize;
+ int cnt = rmtDirectCtx.count();
- keyAdded = false;
- }
+ if (cnt == 0)
+ return;
- private boolean tryAdd(long valPtr, int valSize) {
- HadoopShuffleMessage msg = msgs[idx];
+ int rmtRdcIdx = stripeMappers ? rmtMapIdx % totalReducerCnt : rmtMapIdx;
- if (!keyAdded) { // Add key and value.
- int size = keySize + valSize;
+ HadoopDirectDataOutputState state = rmtDirectCtx.state();
- if (!msg.available(size, false))
- return false;
+ if (reset)
+ rmtDirectCtx.reset();
- msg.addKey(keyPtr, keySize);
- msg.addValue(valPtr, valSize);
+ HadoopDirectShuffleMessage msg = new HadoopDirectShuffleMessage(job.id(), rmtRdcIdx, cnt,
+ state.buffer(), state.bufferLength());
- keyAdded = true;
+ T nodeId = reduceAddrs[rmtRdcIdx];
- return true;
- }
+ io.apply(nodeId, msg);
+
+ remoteShuffleState(nodeId).onShuffleMessage();
+ }
+
+ /**
+ * Visit output map.
+ *
+ * @param map Map.
+ * @param rmtMapIdx Remote map index.
+ * @param rmtRdcIdx Remote reducer index.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void visit(HadoopMultimap map, final int rmtMapIdx, final int rmtRdcIdx) throws IgniteCheckedException {
+ map.visit(false, new HadoopMultimap.Visitor() {
+ /** */
+ private long keyPtr;
- if (!msg.available(valSize, true))
+ /** */
+ private int keySize;
+
+ /** */
+ private boolean keyAdded;
+
+ /** {@inheritDoc} */
+ @Override public void onKey(long keyPtr, int keySize) {
+ this.keyPtr = keyPtr;
+ this.keySize = keySize;
+
+ keyAdded = false;
+ }
+
+ private boolean tryAdd(long valPtr, int valSize) {
+ HadoopShuffleMessage msg = msgs[rmtMapIdx];
+
+ if (!keyAdded) { // Add key and value.
+ int size = keySize + valSize;
+
+ if (!msg.available(size, false))
return false;
+ msg.addKey(keyPtr, keySize);
msg.addValue(valPtr, valSize);
+ keyAdded = true;
+
return true;
}
- /** {@inheritDoc} */
- @Override public void onValue(long valPtr, int valSize) {
- if (tryAdd(valPtr, valSize))
- return;
+ if (!msg.available(valSize, true))
+ return false;
- send(idx, keySize + valSize);
+ msg.addValue(valPtr, valSize);
- keyAdded = false;
+ return true;
+ }
- if (!tryAdd(valPtr, valSize))
- throw new IllegalStateException();
- }
- });
+ /** {@inheritDoc} */
+ @Override public void onValue(long valPtr, int valSize) {
+ if (tryAdd(valPtr, valSize))
+ return;
- if (flush && msgs[i].offset() != 0)
- send(i, 0);
- }
+ send(rmtMapIdx, rmtRdcIdx, keySize + valSize);
+
+ keyAdded = false;
+
+ if (!tryAdd(valPtr, valSize))
+ throw new IllegalStateException();
+ }
+ });
}
/**
- * @param idx Index of message.
+ * Send message.
+ *
+ * @param rmtMapIdx Remote map index.
+ * @param rmtRdcIdx Remote reducer index.
* @param newBufMinSize Min new buffer size.
*/
- private void send(final int idx, int newBufMinSize) {
- HadoopShuffleMessage msg = msgs[idx];
+ private void send(int rmtMapIdx, int rmtRdcIdx, int newBufMinSize) {
+ HadoopShuffleMessage msg = msgs[rmtMapIdx];
final long msgId = msg.id();
@@ -566,10 +714,10 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
try {
- io.apply(reduceAddrs[idx], msg);
+ io.apply(reduceAddrs[rmtRdcIdx], msg);
if (embedded)
- remoteShuffleState(reduceAddrs[idx]).onShuffleMessage();
+ remoteShuffleState(reduceAddrs[rmtRdcIdx]).onShuffleMessage();
}
catch (GridClosureException e) {
if (fut != null)
@@ -593,7 +741,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
});
}
- msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
+ msgs[rmtMapIdx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), rmtRdcIdx,
Math.max(msgSize, newBufMinSize));
}
@@ -639,31 +787,33 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
if (totalReducerCnt == 0)
return new GridFinishedFuture<>();
- U.await(ioInitLatch);
+ if (!stripeMappers) {
+ U.await(ioInitLatch);
- GridWorker snd0 = snd;
+ GridWorker snd0 = snd;
- if (snd0 != null) {
- if (log.isDebugEnabled())
- log.debug("Cancelling sender thread.");
+ if (snd0 != null) {
+ if (log.isDebugEnabled())
+ log.debug("Cancelling sender thread.");
- snd0.cancel();
+ snd0.cancel();
- try {
- snd0.join();
+ try {
+ snd0.join();
- if (log.isDebugEnabled())
- log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException(e);
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
}
- }
- collectUpdatesAndSend(true); // With flush.
+ collectUpdatesAndSend(true); // With flush.
- if (log.isDebugEnabled())
- log.debug("Finished sending collected updates to remote reducers: " + job.id());
+ if (log.isDebugEnabled())
+ log.debug("Finished sending collected updates to remote reducers: " + job.id());
+ }
GridCompoundFuture fut = new GridCompoundFuture<>();
@@ -700,8 +850,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
if (log.isDebugEnabled())
log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
-
}
+
return fut;
}
@@ -775,13 +925,17 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/**
* Partitioned output.
*/
- private class PartitionedOutput implements HadoopTaskOutput {
+ public class PartitionedOutput implements HadoopMapperAwareTaskOutput {
/** */
private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()];
/** */
private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()];
+ /** Remote direct contexts. */
+ private final HadoopDirectDataOutputContext[] rmtDirectCtxs =
+ new HadoopDirectDataOutputContext[rmtMaps.length()];
+
/** */
private HadoopPartitioner partitioner;
@@ -819,16 +973,53 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx);
}
else {
- out = rmtAdders[part];
+ if (stripeMappers) {
+ int mapperIdx = HadoopMapperUtils.mapperIndex();
- if (out == null)
- rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx);
+ assert mapperIdx >= 0;
+
+ int idx = totalReducerCnt * mapperIdx + part;
+
+ HadoopDirectDataOutputContext rmtDirectCtx = rmtDirectCtxs[idx];
+
+ if (rmtDirectCtx == null) {
+ rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, taskCtx);
+
+ rmtDirectCtxs[idx] = rmtDirectCtx;
+ }
+
+ if (rmtDirectCtx.write(key, val))
+ sendShuffleMessage(idx, rmtDirectCtx, true);
+
+ return;
+ }
+ else {
+ out = rmtAdders[part];
+
+ if (out == null)
+ rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx);
+ }
}
out.write(key, val);
}
/** {@inheritDoc} */
+ @Override public void onMapperFinished() throws IgniteCheckedException {
+ if (stripeMappers) {
+ int mapperIdx = HadoopMapperUtils.mapperIndex();
+
+ assert mapperIdx >= 0;
+
+ for (int i = 0; i < totalReducerCnt; i++) {
+ int idx = totalReducerCnt * mapperIdx + i;
+
+ sendShuffleMessage(idx, rmtDirectCtxs[idx], false);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
for (HadoopTaskOutput adder : locAdders) {
if (adder != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
index 5ffaa55..4331124 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
@@ -17,17 +17,14 @@
package org.apache.ignite.internal.processors.hadoop.shuffle;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.lang.IgniteInClosure;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
/**
* Remote shuffle state.
*/
-class HadoopShuffleRemoteState<T> {
+class HadoopShuffleRemoteState {
/** Message count. */
private final AtomicLong msgCnt = new AtomicLong();
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
new file mode 100644
index 0000000..e3a713a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+
+/**
+ * Hadoop data input used for direct communication.
+ */
+public class HadoopDirectDataInput extends InputStream implements DataInput {
+ /** Data buffer. */
+ private final byte[] buf;
+
+ /** Position. */
+ private int pos;
+
+ /**
+ * Constructor.
+ *
+ * @param buf Buffer.
+ */
+ public HadoopDirectDataInput(byte[] buf) {
+ this.buf = buf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ return readByte();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(@NotNull byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException {
+ System.arraycopy(buf, pos, b, off, len);
+
+ pos += len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int skipBytes(int n) throws IOException {
+ pos += n;
+
+ return n;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() throws IOException {
+ return readByte() == 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() throws IOException {
+ byte res = GridUnsafe.getByte(buf, BYTE_ARR_OFF + pos);
+
+ pos += 1;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readUnsignedByte() throws IOException {
+ return readByte() & 0xff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() throws IOException {
+ short res = GridUnsafe.getShort(buf, BYTE_ARR_OFF + pos);
+
+ pos += 2;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readUnsignedShort() throws IOException {
+ return readShort() & 0xffff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() throws IOException {
+ char res = GridUnsafe.getChar(buf, BYTE_ARR_OFF + pos);
+
+ pos += 2;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() throws IOException {
+ int res = GridUnsafe.getInt(buf, BYTE_ARR_OFF + pos);
+
+ pos += 4;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() throws IOException {
+ long res = GridUnsafe.getLong(buf, BYTE_ARR_OFF + pos);
+
+ pos += 8;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() throws IOException {
+ float res = GridUnsafe.getFloat(buf, BYTE_ARR_OFF + pos);
+
+ pos += 4;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() throws IOException {
+ double res = GridUnsafe.getDouble(buf, BYTE_ARR_OFF + pos);
+
+ pos += 8;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readLine() throws IOException {
+ // TODO: Create ticket!
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public String readUTF() throws IOException {
+ byte[] bytes = new byte[readShort()];
+
+ if (bytes.length != 0)
+ readFully(bytes);
+
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
new file mode 100644
index 0000000..151e552
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+
+/**
+ * Hadoop data output for direct communication.
+ */
+public class HadoopDirectDataOutput extends OutputStream implements DataOutput {
+ /** Flush size. */
+ private final int flushSize;
+
+ /** Data buffer. */
+ private byte[] buf;
+
+ /** Buffer size. */
+ private int bufSize;
+
+ /** Position. */
+ private int pos;
+
+ /**
+ * Constructor.
+ *
+ * @param flushSize Flush size.
+ */
+ public HadoopDirectDataOutput(int flushSize) {
+ this(flushSize, flushSize);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param flushSize Flush size.
+ * @param allocSize Allocation size.
+ */
+ public HadoopDirectDataOutput(int flushSize, int allocSize) {
+ this.flushSize = flushSize;
+
+ buf = new byte[allocSize];
+ bufSize = allocSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(@NotNull byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
+ int writePos = ensure(len);
+
+ System.arraycopy(b, off, buf, writePos, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int val) throws IOException {
+ writeByte(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) throws IOException {
+ writeByte(val ? (byte)1 : (byte)0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(int val) throws IOException {
+ int writePos = ensure(1);
+
+ buf[writePos] = (byte)val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(int val) throws IOException {
+ int writePos = ensure(2);
+
+ GridUnsafe.putShort(buf, BYTE_ARR_OFF + writePos, (short)val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(int val) throws IOException {
+ int writePos = ensure(2);
+
+ GridUnsafe.putChar(buf, BYTE_ARR_OFF + writePos, (char)val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) throws IOException {
+ int writePos = ensure(4);
+
+ GridUnsafe.putInt(buf, BYTE_ARR_OFF + writePos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) throws IOException {
+ int writePos = ensure(8);
+
+ GridUnsafe.putLong(buf, BYTE_ARR_OFF + writePos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) throws IOException {
+ int writePos = ensure(4);
+
+ GridUnsafe.putFloat(buf, BYTE_ARR_OFF + writePos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) throws IOException {
+ int writePos = ensure(8);
+
+ GridUnsafe.putDouble(buf, BYTE_ARR_OFF + writePos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBytes(@NotNull String str) throws IOException {
+ for(int i = 0; i < str.length(); ++i)
+ write((byte)str.charAt(i));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChars(@NotNull String str) throws IOException {
+ for (int i = 0; i < str.length(); ++i)
+ writeChar(str.charAt(i));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeUTF(@NotNull String str) throws IOException {
+ byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+ int len = bytes.length;
+
+ if (len > 65535)
+ throw new UTFDataFormatException("UTF8 form of string is longer than 65535 bytes: " + str);
+
+ writeShort((short)len);
+ write(bytes);
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public byte[] buffer() {
+ return buf;
+ }
+
+ /**
+ * @return Position.
+ */
+ public int position() {
+ return pos;
+ }
+
+ /**
+ * @return Whether buffer is ready for flush.
+ */
+ public boolean readyForFlush() {
+ return pos >= flushSize;
+ }
+
+ /**
+ * Ensure that the given amount of bytes is available within the stream, then shift the position.
+ *
+ * @param cnt Count.
+ * @return Position
+ */
+ private int ensure(int cnt) {
+ int pos0 = pos;
+
+ if (pos0 + cnt > bufSize)
+ grow(pos0 + cnt);
+
+ pos += cnt;
+
+ return pos0;
+ }
+
+ /**
+ * Grow array up to the given count.
+ *
+ * @param cnt Count.
+ */
+ private void grow(int cnt) {
+ int bufSize0 = (int)(bufSize * 1.1);
+
+ if (bufSize0 < cnt)
+ bufSize0 = cnt;
+
+ byte[] buf0 = new byte[bufSize0];
+
+ System.arraycopy(buf, 0, buf0, 0, pos);
+
+ buf = buf0;
+ bufSize = bufSize0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
new file mode 100644
index 0000000..bc70ef3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+
+/**
+ * Hadoop data output context for direct communication.
+ */
+public class HadoopDirectDataOutputContext {
+ /** Flush size. */
+ private final int flushSize;
+
+ /** Key serialization. */
+ private final HadoopSerialization keySer;
+
+ /** Value serialization. */
+ private final HadoopSerialization valSer;
+
+ /** Data output. */
+ private HadoopDirectDataOutput out;
+
+ /** Number of keys written. */
+ private int cnt;
+
+ /**
+ * Constructor.
+ *
+ * @param flushSize Flush size.
+ * @param taskCtx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopDirectDataOutputContext(int flushSize, HadoopTaskContext taskCtx)
+ throws IgniteCheckedException {
+ this.flushSize = flushSize;
+
+ keySer = taskCtx.keySerialization();
+ valSer = taskCtx.valueSerialization();
+
+ out = new HadoopDirectDataOutput(flushSize);
+ }
+
+ /**
+ * Write key-value pair.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @return Whether flush is needed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean write(Object key, Object val) throws IgniteCheckedException {
+ keySer.write(out, key);
+ valSer.write(out, val);
+
+ cnt++;
+
+ return out.readyForFlush();
+ }
+
+ /**
+ * @return Key-value pairs count.
+ */
+ public int count() {
+ return cnt;
+ }
+
+ /**
+ * @return State.
+ */
+ public HadoopDirectDataOutputState state() {
+ return new HadoopDirectDataOutputState(out.buffer(), out.position());
+ }
+
+ /**
+ * Reset buffer.
+ */
+ public void reset() {
+ int allocSize = Math.max(flushSize, out.position());
+
+ out = new HadoopDirectDataOutput(flushSize, allocSize);
+ cnt = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
new file mode 100644
index 0000000..a9c12e3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+/**
+ * Hadoop data output state for direct communication.
+ */
+public class HadoopDirectDataOutputState {
+ /** Buffer. */
+ private final byte[] buf;
+
+ /** Buffer length. */
+ private final int bufLen;
+
+ /**
+ * Constructor.
+ *
+ * @param buf Buffer.
+ * @param bufLen Buffer length.
+ */
+ public HadoopDirectDataOutputState(byte[] buf, int bufLen) {
+ this.buf = buf;
+ this.bufLen = bufLen;
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public byte[] buffer() {
+ return buf;
+ }
+
+ /**
+ * @return Length.
+ */
+ public int bufferLength() {
+ return bufLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index cb08c00..3336120 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -151,7 +151,7 @@ public class HadoopChildProcessRunner {
job.initialize(true, nodeDesc.processId());
shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
- req.totalReducerCount(), req.localReducers(), false);
+ req.totalReducerCount(), req.localReducers(), 0, false);
initializeExecutors(req);
http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index b04deeb..8897a38 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
@@ -54,12 +55,28 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
return cfg;
}
+ /*
+ * @throws Exception If fails.
+ */
+ public void testMultiReducerWholeMapReduceExecution() throws Exception {
+ checkMultiReducerWholeMapReduceExecution(false);
+ }
+
+ /*
+ * @throws Exception If fails.
+ */
+ public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception {
+ checkMultiReducerWholeMapReduceExecution(true);
+ }
+
/**
* Tests whole job execution with all phases in old and new versions of API with definition of custom
* Serialization, Partitioner and IO formats.
+ *
+ * @param striped Whether output should be striped or not.
* @throws Exception If fails.
*/
- public void testMultiReducerWholeMapReduceExecution() throws Exception {
+ public void checkMultiReducerWholeMapReduceExecution(boolean striped) throws Exception {
IgfsPath inDir = new IgfsPath(PATH_INPUT);
igfs.mkdirs(inDir);
@@ -81,6 +98,9 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
JobConf jobConf = new JobConf();
+ if (striped)
+ jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true");
+
jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
//To split into about 6-7 items for v2