You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/19 10:27:33 UTC
[43/51] [abbrv] [partial] ignite git commit: IGNITE-3916: Created
separate module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
new file mode 100644
index 0000000..3eb819b
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING;
+
+/**
+ * Submit job task.
+ */
+public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop,
+ HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+ UUID nodeId = UUID.fromString(args.<String>get(0));
+ Integer id = args.get(1);
+ HadoopDefaultJobInfo info = args.get(2);
+
+ assert nodeId != null;
+ assert id != null;
+ assert info != null;
+
+ HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+ hadoop.submit(jobId, info);
+
+ HadoopJobStatus res = hadoop.status(jobId);
+
+ if (res == null) // Submission failed.
+ res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
+
+ return res;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
new file mode 100644
index 0000000..c3227ae
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop protocol task adapter.
+ */
+public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable HadoopProtocolTaskArguments arg) {
+ return Collections.singletonMap(new Job(arg), subgrid.get(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ return ComputeJobResultPolicy.REDUCE;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public R reduce(List<ComputeJobResult> results) {
+ if (!F.isEmpty(results)) {
+ ComputeJobResult res = results.get(0);
+
+ return res.getData();
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Job wrapper.
+ */
+ private class Job implements ComputeJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ @SuppressWarnings("UnusedDeclaration")
+ @JobContextResource
+ private ComputeJobContext jobCtx;
+
+ /** Argument. */
+ private final HadoopProtocolTaskArguments args;
+
+ /**
+ * Constructor.
+ *
+ * @param args Job argument.
+ */
+ private Job(HadoopProtocolTaskArguments args) {
+ this.args = args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ try {
+ return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /**
+ * Run the task.
+ *
+ * @param jobCtx Job context.
+ * @param hadoop Hadoop facade.
+ * @param args Arguments.
+ * @return Job result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args)
+ throws IgniteCheckedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
new file mode 100644
index 0000000..e497454
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task arguments.
+ */
+public class HadoopProtocolTaskArguments implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Arguments. */
+ private Object[] args;
+
+ /**
+ * {@link Externalizable} support.
+ */
+ public HadoopProtocolTaskArguments() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param args Arguments.
+ */
+ public HadoopProtocolTaskArguments(Object... args) {
+ this.args = args;
+ }
+
+ /**
+ * @param idx Argument index.
+ * @return Argument.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable public <T> T get(int idx) {
+ return (args != null && args.length > idx) ? (T)args[idx] : null;
+ }
+
+ /**
+ * @return Size.
+ */
+ public int size() {
+ return args != null ? args.length : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeArray(out, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ args = U.readArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProtocolTaskArguments.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
new file mode 100644
index 0000000..769bdc4
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
+import org.apache.ignite.internal.processors.hadoop.HadoopContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * Shuffle.
+ */
+public class HadoopShuffle extends HadoopComponent {
+ /** */
+ private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
+
+ /** */
+ protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ /** {@inheritDoc} */
+ @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+ super.start(ctx);
+
+ ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
+ new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ return onMessageReceived(nodeId, (HadoopMessage)msg);
+ }
+ });
+ }
+
+ /**
+ * Stops shuffle.
+ *
+ * @param cancel If should cancel all ongoing activities.
+ */
+ @Override public void stop(boolean cancel) {
+ for (HadoopShuffleJob job : jobs.values()) {
+ try {
+ job.close();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close job.", e);
+ }
+ }
+
+ jobs.clear();
+ }
+
+ /**
+ * Creates new shuffle job.
+ *
+ * @param jobId Job ID.
+ * @return Created shuffle job.
+ * @throws IgniteCheckedException If job creation failed.
+ */
+ private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
+ HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
+
+ HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
+ ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+
+ UUID[] rdcAddrs = new UUID[plan.reducers()];
+
+ for (int i = 0; i < rdcAddrs.length; i++) {
+ UUID nodeId = plan.nodeForReducer(i);
+
+ assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
+
+ rdcAddrs[i] = nodeId;
+ }
+
+ boolean init = job.initializeReduceAddresses(rdcAddrs);
+
+ assert init;
+
+ return job;
+ }
+
+ /**
+ * @param nodeId Node ID to send message to.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException If send failed.
+ */
+ private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
+ ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
+
+ ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+ }
+
+ /**
+ * @param jobId Task info.
+ * @return Shuffle job.
+ */
+ private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
+ HadoopShuffleJob<UUID> res = jobs.get(jobId);
+
+ if (res == null) {
+ res = newJob(jobId);
+
+ HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
+
+ if (old != null) {
+ res.close();
+
+ res = old;
+ }
+ else if (res.reducersInitialized())
+ startSending(res);
+ }
+
+ return res;
+ }
+
+ /**
+ * Starts message sending thread.
+ *
+ * @param shuffleJob Job to start sending for.
+ */
+ private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
+ shuffleJob.startSending(ctx.kernalContext().gridName(),
+ new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
+ @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException {
+ send0(dest, msg);
+ }
+ }
+ );
+ }
+
+ /**
+ * Message received callback.
+ *
+ * @param src Sender node ID.
+ * @param msg Received message.
+ * @return {@code True}.
+ */
+ public boolean onMessageReceived(UUID src, HadoopMessage msg) {
+ if (msg instanceof HadoopShuffleMessage) {
+ HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+
+ try {
+ job(m.jobId()).onShuffleMessage(m);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Message handling failed.", e);
+ }
+
+ try {
+ // Reply with ack.
+ send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
+ }
+ }
+ else if (msg instanceof HadoopShuffleAck) {
+ HadoopShuffleAck m = (HadoopShuffleAck)msg;
+
+ try {
+ job(m.jobId()).onShuffleAck(m);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Message handling failed.", e);
+ }
+ }
+ else
+ throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
+ ", msg=" + msg + ']');
+
+ return true;
+ }
+
+ /**
+ * @param taskCtx Task info.
+ * @return Output.
+ */
+ public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return job(taskCtx.taskInfo().jobId()).output(taskCtx);
+ }
+
+ /**
+ * @param taskCtx Task info.
+ * @return Input.
+ */
+ public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return job(taskCtx.taskInfo().jobId()).input(taskCtx);
+ }
+
+ /**
+ * @param jobId Job id.
+ */
+ public void jobFinished(HadoopJobId jobId) {
+ HadoopShuffleJob job = jobs.remove(jobId);
+
+ if (job != null) {
+ try {
+ job.close();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close job: " + jobId, e);
+ }
+ }
+ }
+
+ /**
+ * Flushes all the outputs for the given job to remote nodes.
+ *
+ * @param jobId Job ID.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> flush(HadoopJobId jobId) {
+ HadoopShuffleJob job = jobs.get(jobId);
+
+ if (job == null)
+ return new GridFinishedFuture<>();
+
+ try {
+ return job.flush();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @return Memory.
+ */
+ public GridUnsafeMemory memory() {
+ return mem;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
new file mode 100644
index 0000000..6013ec6
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Acknowledgement message.
+ */
+public class HadoopShuffleAck implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private long msgId;
+
+ /** */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /**
+ *
+ */
+ public HadoopShuffleAck() {
+ // No-op.
+ }
+
+ /**
+ * @param msgId Message ID.
+ */
+ public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
+ assert jobId != null;
+
+ this.msgId = msgId;
+ this.jobId = jobId;
+ }
+
+ /**
+ * @return Message ID.
+ */
+ public long id() {
+ return msgId;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+ out.writeLong(msgId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+
+ jobId.readExternal(in);
+ msgId = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopShuffleAck.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
new file mode 100644
index 0000000..b940c72
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.thread.IgniteThread;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
+
+/**
+ * Shuffle job.
+ */
+public class HadoopShuffleJob<T> implements AutoCloseable {
+ /** */
+ private static final int MSG_BUF_SIZE = 128 * 1024;
+
+ /** */
+ private final HadoopJob job;
+
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /** */
+ private final boolean needPartitioner;
+
+ /** Collection of task contexts for each reduce task. */
+ private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
+
+ /** Reducers addresses. */
+ private T[] reduceAddrs;
+
+ /** Local reducers address. */
+ private final T locReduceAddr;
+
+ /** */
+ private final HadoopShuffleMessage[] msgs;
+
+ /** */
+ private final AtomicReferenceArray<HadoopMultimap> maps;
+
+ /** */
+ private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
+
+ /** */
+ protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs =
+ new ConcurrentHashMap<>();
+
+ /** */
+ private volatile GridWorker snd;
+
+ /** Latch for remote addresses waiting. */
+ private final CountDownLatch ioInitLatch = new CountDownLatch(1);
+
+ /** Finished flag. Set on flush or close. */
+ private volatile boolean flushed;
+
+ /** */
+ private final IgniteLogger log;
+
+ /**
+ * @param locReduceAddr Local reducer address.
+ * @param log Logger.
+ * @param job Job.
+ * @param mem Memory.
+ * @param totalReducerCnt Amount of reducers in the Job.
+ * @param locReducers Reducers will work on current node.
+ * @throws IgniteCheckedException If error.
+ */
+ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
+ int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+ this.locReduceAddr = locReduceAddr;
+ this.job = job;
+ this.mem = mem;
+ this.log = log.getLogger(HadoopShuffleJob.class);
+
+ if (!F.isEmpty(locReducers)) {
+ for (int rdc : locReducers) {
+ HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
+
+ reducersCtx.put(rdc, job.getTaskContext(taskInfo));
+ }
+ }
+
+ needPartitioner = totalReducerCnt > 1;
+
+ maps = new AtomicReferenceArray<>(totalReducerCnt);
+ msgs = new HadoopShuffleMessage[totalReducerCnt];
+ }
+
+ /**
+ * @param reduceAddrs Addresses of reducers.
+ * @return {@code True} if addresses were initialized by this call.
+ */
+ public boolean initializeReduceAddresses(T[] reduceAddrs) {
+ if (this.reduceAddrs == null) {
+ this.reduceAddrs = reduceAddrs;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return {@code True} if reducers addresses were initialized.
+ */
+ public boolean reducersInitialized() {
+ return reduceAddrs != null;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @param io IO Closure for sending messages.
+ */
+ @SuppressWarnings("BusyWait")
+ public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) {
+ assert snd == null;
+ assert io != null;
+
+ this.io = io;
+
+ if (!flushed) {
+ snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
+ @Override protected void body() throws InterruptedException {
+ try {
+ while (!isCancelled()) {
+ Thread.sleep(5);
+
+ collectUpdatesAndSend(false);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+
+ new IgniteThread(snd).start();
+ }
+
+ ioInitLatch.countDown();
+ }
+
+ /**
+ * @param maps Maps.
+ * @param idx Index.
+ * @return Map.
+ */
+ private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx) {
+ HadoopMultimap map = maps.get(idx);
+
+ if (map == null) { // Create new map.
+ map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
+ new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
+ new HadoopSkipList(job.info(), mem);
+
+ if (!maps.compareAndSet(idx, null, map)) {
+ map.close();
+
+ return maps.get(idx);
+ }
+ }
+
+ return map;
+ }
+
+ /**
+ * @param msg Message.
+ * @throws IgniteCheckedException Exception.
+ */
+ public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException {
+ assert msg.buffer() != null;
+ assert msg.offset() > 0;
+
+ HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
+
+ HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
+
+ perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+
+ HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
+
+ // Add data from message to the map.
+ try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
+ final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
+ final UnsafeValue val = new UnsafeValue(msg.buffer());
+
+ msg.visit(new HadoopShuffleMessage.Visitor() {
+ /** */
+ private HadoopMultimap.Key key;
+
+ @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException {
+ dataInput.bytes(buf, off, off + len);
+
+ key = adder.addKey(dataInput, key);
+ }
+
+ @Override public void onValue(byte[] buf, int off, int len) {
+ val.off = off;
+ val.size = len;
+
+ key.add(val);
+ }
+ });
+ }
+ }
+
+ /**
+ * @param ack Shuffle ack.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void onShuffleAck(HadoopShuffleAck ack) {
+ IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup = sentMsgs.get(ack.id());
+
+ if (tup != null)
+ tup.get2().onDone();
+ else
+ log.warning("Received shuffle ack for not registered shuffle id: " + ack);
+ }
+
+ /**
+ * Unsafe value.
+ */
+ private static class UnsafeValue implements HadoopMultimap.Value {
+ /** */
+ private final byte[] buf;
+
+ /** */
+ private int off;
+
+ /** */
+ private int size;
+
+ /**
+ * @param buf Buffer.
+ */
+ private UnsafeValue(byte[] buf) {
+ assert buf != null;
+
+ this.buf = buf;
+ }
+
+ /** */
+ @Override public int size() {
+ return size;
+ }
+
+ /** */
+ @Override public void copyTo(long ptr) {
+ GridUnsafe.copyMemory(buf, GridUnsafe.BYTE_ARR_OFF + off, null, ptr, size);
+ }
+ }
+
+ /**
+ * Sends map updates to remote reducers.
+ */
+ private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
+ for (int i = 0; i < maps.length(); i++) {
+ HadoopMultimap map = maps.get(i);
+
+ if (map == null || locReduceAddr.equals(reduceAddrs[i]))
+ continue; // Skip empty map and local node.
+
+ if (msgs[i] == null)
+ msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+
+ final int idx = i;
+
+ map.visit(false, new HadoopMultimap.Visitor() {
+ /** */
+ private long keyPtr;
+
+ /** */
+ private int keySize;
+
+ /** */
+ private boolean keyAdded;
+
+ /** {@inheritDoc} */
+ @Override public void onKey(long keyPtr, int keySize) {
+ this.keyPtr = keyPtr;
+ this.keySize = keySize;
+
+ keyAdded = false;
+ }
+
+ private boolean tryAdd(long valPtr, int valSize) {
+ HadoopShuffleMessage msg = msgs[idx];
+
+ if (!keyAdded) { // Add key and value.
+ int size = keySize + valSize;
+
+ if (!msg.available(size, false))
+ return false;
+
+ msg.addKey(keyPtr, keySize);
+ msg.addValue(valPtr, valSize);
+
+ keyAdded = true;
+
+ return true;
+ }
+
+ if (!msg.available(valSize, true))
+ return false;
+
+ msg.addValue(valPtr, valSize);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onValue(long valPtr, int valSize) {
+ if (tryAdd(valPtr, valSize))
+ return;
+
+ send(idx, keySize + valSize);
+
+ keyAdded = false;
+
+ if (!tryAdd(valPtr, valSize))
+ throw new IllegalStateException();
+ }
+ });
+
+ if (flush && msgs[i].offset() != 0)
+ send(i, 0);
+ }
+ }
+
+ /**
+ * @param idx Index of message.
+ * @param newBufMinSize Min new buffer size.
+ */
+ private void send(final int idx, int newBufMinSize) {
+ final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+
+ HadoopShuffleMessage msg = msgs[idx];
+
+ final long msgId = msg.id();
+
+ IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
+ new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut));
+
+ assert old == null;
+
+ try {
+ io.apply(reduceAddrs[idx], msg);
+ }
+ catch (GridClosureException e) {
+ fut.onDone(U.unwrap(e));
+ }
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ f.get();
+
+ // Clean up the future from map only if there was no exception.
+ // Otherwise flush() should fail.
+ sentMsgs.remove(msgId);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send message.", e);
+ }
+ }
+ });
+
+ msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
+ Math.max(MSG_BUF_SIZE, newBufMinSize));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ if (snd != null) {
+ snd.cancel();
+
+ try {
+ snd.join();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+
+ close(maps);
+ }
+
+ /**
+ * @param maps Maps.
+ */
+ private void close(AtomicReferenceArray<HadoopMultimap> maps) {
+ for (int i = 0; i < maps.length(); i++) {
+ HadoopMultimap map = maps.get(i);
+
+ if (map != null)
+ map.close();
+ }
+ }
+
+ /**
+ * @return Future.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Flushing job " + job.id() + " on address " + locReduceAddr);
+
+ flushed = true;
+
+ if (maps.length() == 0)
+ return new GridFinishedFuture<>();
+
+ U.await(ioInitLatch);
+
+ GridWorker snd0 = snd;
+
+ if (snd0 != null) {
+ if (log.isDebugEnabled())
+ log.debug("Cancelling sender thread.");
+
+ snd0.cancel();
+
+ try {
+ snd0.join();
+
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+
+ collectUpdatesAndSend(true); // With flush.
+
+ if (log.isDebugEnabled())
+ log.debug("Finished sending collected updates to remote reducers: " + job.id());
+
+ GridCompoundFuture fut = new GridCompoundFuture<>();
+
+ for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values())
+ fut.add(tup.get2());
+
+ fut.markInitialized();
+
+ if (log.isDebugEnabled())
+ log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+
+ return fut;
+ }
+
+ /**
+ * @param taskCtx Task context.
+ * @return Output.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ switch (taskCtx.taskInfo().type()) {
+ case MAP:
+ assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined.";
+
+ case COMBINE:
+ return new PartitionedOutput(taskCtx);
+
+ default:
+ throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
+ }
+ }
+
+ /**
+ * @param taskCtx Task context.
+ * @return Input.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ switch (taskCtx.taskInfo().type()) {
+ case REDUCE:
+ int reducer = taskCtx.taskInfo().taskNumber();
+
+ HadoopMultimap m = maps.get(reducer);
+
+ if (m != null)
+ return m.input(taskCtx);
+
+ return new HadoopTaskInput() { // Empty input.
+ @Override public boolean next() {
+ return false;
+ }
+
+ @Override public Object key() {
+ throw new IllegalStateException();
+ }
+
+ @Override public Iterator<?> values() {
+ throw new IllegalStateException();
+ }
+
+ @Override public void close() {
+ // No-op.
+ }
+ };
+
+ default:
+ throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
+ }
+ }
+
+ /**
+ * Partitioned output.
+ */
+ private class PartitionedOutput implements HadoopTaskOutput {
+ /** */
+ private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
+
+ /** */
+ private HadoopPartitioner partitioner;
+
+ /** */
+ private final HadoopTaskContext taskCtx;
+
+ /**
+ * Constructor.
+ * @param taskCtx Task context.
+ */
+ private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ this.taskCtx = taskCtx;
+
+ if (needPartitioner)
+ partitioner = taskCtx.partitioner();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ int part = 0;
+
+ if (partitioner != null) {
+ part = partitioner.partition(key, val, adders.length);
+
+ if (part < 0 || part >= adders.length)
+ throw new IgniteCheckedException("Invalid partition: " + part);
+ }
+
+ HadoopTaskOutput out = adders[part];
+
+ if (out == null)
+ adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
+
+ out.write(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ for (HadoopTaskOutput adder : adders) {
+ if (adder != null)
+ adder.close();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
new file mode 100644
index 0000000..69dfe64
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Shuffle message.
+ */
+public class HadoopShuffleMessage implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final AtomicLong ids = new AtomicLong();
+
+ /** */
+ private static final byte MARKER_KEY = (byte)17;
+
+ /** */
+ private static final byte MARKER_VALUE = (byte)31;
+
+ /** */
+ @GridToStringInclude
+ private long msgId;
+
+ /** */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** */
+ @GridToStringInclude
+ private int reducer;
+
+ /** */
+ private byte[] buf;
+
+ /** */
+ @GridToStringInclude
+ private int off;
+
+ /**
+ *
+ */
+ public HadoopShuffleMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param size Size.
+ */
+ public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
+ assert jobId != null;
+
+ buf = new byte[size];
+
+ this.jobId = jobId;
+ this.reducer = reducer;
+
+ msgId = ids.incrementAndGet();
+ }
+
+ /**
+ * @return Message ID.
+ */
+ public long id() {
+ return msgId;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Reducer.
+ */
+ public int reducer() {
+ return reducer;
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public byte[] buffer() {
+ return buf;
+ }
+
+ /**
+ * @return Offset.
+ */
+ public int offset() {
+ return off;
+ }
+
+ /**
+ * @param size Size.
+ * @param valOnly Only value wll be added.
+ * @return {@code true} If this message can fit additional data of this size
+ */
+ public boolean available(int size, boolean valOnly) {
+ size += valOnly ? 5 : 10;
+
+ if (off + size > buf.length) {
+ if (off == 0) { // Resize if requested size is too big.
+ buf = new byte[size];
+
+ return true;
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param keyPtr Key pointer.
+ * @param keySize Key size.
+ */
+ public void addKey(long keyPtr, int keySize) {
+ add(MARKER_KEY, keyPtr, keySize);
+ }
+
+ /**
+ * @param valPtr Value pointer.
+ * @param valSize Value size.
+ */
+ public void addValue(long valPtr, int valSize) {
+ add(MARKER_VALUE, valPtr, valSize);
+ }
+
+ /**
+ * @param marker Marker.
+ * @param ptr Pointer.
+ * @param size Size.
+ */
+ private void add(byte marker, long ptr, int size) {
+ buf[off++] = marker;
+
+ GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
+
+ off += 4;
+
+ GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size);
+
+ off += size;
+ }
+
+ /**
+ * @param v Visitor.
+ */
+ public void visit(Visitor v) throws IgniteCheckedException {
+ for (int i = 0; i < off;) {
+ byte marker = buf[i++];
+
+ int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
+
+ i += 4;
+
+ if (marker == MARKER_VALUE)
+ v.onValue(buf, i, size);
+ else if (marker == MARKER_KEY)
+ v.onKey(buf, i, size);
+ else
+ throw new IllegalStateException();
+
+ i += size;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+ out.writeLong(msgId);
+ out.writeInt(reducer);
+ out.writeInt(off);
+ U.writeByteArray(out, buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+
+ jobId.readExternal(in);
+ msgId = in.readLong();
+ reducer = in.readInt();
+ off = in.readInt();
+ buf = U.readByteArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopShuffleMessage.class, this);
+ }
+
+ /**
+ * Visitor.
+ */
+ public static interface Visitor {
+ /**
+ * @param buf Buffer.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
+
+ /**
+ * @param buf Buffer.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
new file mode 100644
index 0000000..ffa7871
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import java.io.DataInput;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multimap for map reduce intermediate results.
+ */
+public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
+ /** */
+ private final AtomicReference<State> state = new AtomicReference<>(State.READING_WRITING);
+
+ /** */
+ private volatile AtomicLongArray oldTbl;
+
+ /** */
+ private volatile AtomicLongArray newTbl;
+
+ /** */
+ private final AtomicInteger keys = new AtomicInteger();
+
+ /** */
+ private final CopyOnWriteArrayList<AdderImpl> adders = new CopyOnWriteArrayList<>();
+
+ /** */
+ private final AtomicInteger inputs = new AtomicInteger();
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ * @param cap Initial capacity.
+ */
+ public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
+ super(jobInfo, mem);
+
+ assert U.isPow2(cap);
+
+ newTbl = oldTbl = new AtomicLongArray(cap);
+ }
+
+ /**
+ * @return Number of keys.
+ */
+ public long keys() {
+ int res = keys.get();
+
+ for (AdderImpl adder : adders)
+ res += adder.locKeys.get();
+
+ return res;
+ }
+
+ /**
+ * @return Current table capacity.
+ */
+ @Override public int capacity() {
+ return oldTbl.length();
+ }
+
+ /**
+ * @return Adder object.
+ * @param ctx Task context.
+ */
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
+ if (inputs.get() != 0)
+ throw new IllegalStateException("Active inputs.");
+
+ if (state.get() == State.CLOSING)
+ throw new IllegalStateException("Closed.");
+
+ return new AdderImpl(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ assert inputs.get() == 0 : inputs.get();
+ assert adders.isEmpty() : adders.size();
+
+ state(State.READING_WRITING, State.CLOSING);
+
+ if (keys() == 0)
+ return;
+
+ super.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long meta(int idx) {
+ return oldTbl.get(idx);
+ }
+
+ /**
+ * Incrementally visits all the keys and values in the map.
+ *
+ * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
+ * @param v Visitor.
+ * @return {@code false} If visiting was impossible due to rehashing.
+ */
+ @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+ if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) {
+ assert state.get() != State.CLOSING;
+
+ return false; // Can not visit while rehashing happens.
+ }
+
+ AtomicLongArray tbl0 = oldTbl;
+
+ for (int i = 0; i < tbl0.length(); i++) {
+ long meta = tbl0.get(i);
+
+ while (meta != 0) {
+ long valPtr = value(meta);
+
+ long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
+
+ if (valPtr != lastVisited) {
+ v.onKey(key(meta), keySize(meta));
+
+ lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
+
+ do {
+ v.onValue(valPtr + 12, valueSize(valPtr));
+
+ valPtr = nextValue(valPtr);
+ }
+ while (valPtr != lastVisited);
+ }
+
+ meta = collision(meta);
+ }
+ }
+
+ state(State.VISITING, State.READING_WRITING);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ inputs.incrementAndGet();
+
+ if (!adders.isEmpty())
+ throw new IllegalStateException("Active adders.");
+
+ State s = state.get();
+
+ if (s == State.CLOSING)
+ throw new IllegalStateException("Closed.");
+
+ assert s != State.REHASHING;
+
+ return new Input(taskCtx) {
+ @Override public void close() throws IgniteCheckedException {
+ if (inputs.decrementAndGet() < 0)
+ throw new IllegalStateException();
+
+ super.close();
+ }
+ };
+ }
+
+ /**
+ * @param fromTbl Table.
+ */
+ private void rehashIfNeeded(AtomicLongArray fromTbl) {
+ if (fromTbl.length() == Integer.MAX_VALUE)
+ return;
+
+ long keys0 = keys();
+
+ if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash.
+ return;
+
+ if (fromTbl != newTbl) // Check if someone else have done the job.
+ return;
+
+ if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) {
+ assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash.
+
+ return;
+ }
+
+ if (fromTbl != newTbl) { // Double check.
+ state(State.REHASHING, State.READING_WRITING); // Switch back.
+
+ return;
+ }
+
+ // Calculate new table capacity.
+ int newLen = fromTbl.length();
+
+ do {
+ newLen <<= 1;
+ }
+ while (newLen < keys0);
+
+ if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4.
+ newLen <<= 1;
+
+ // This is our target table for rehashing.
+ AtomicLongArray toTbl = new AtomicLongArray(newLen);
+
+ // Make the new table visible before rehashing.
+ newTbl = toTbl;
+
+ // Rehash.
+ int newMask = newLen - 1;
+
+ long failedMeta = 0;
+
+ GridLongList collisions = new GridLongList(16);
+
+ for (int i = 0; i < fromTbl.length(); i++) { // Scan source table.
+ long meta = fromTbl.get(i);
+
+ assert meta != -1;
+
+ if (meta == 0) { // No entry.
+ failedMeta = 0;
+
+ if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved.
+ i--; // Retry.
+
+ continue;
+ }
+
+ do { // Collect all the collisions before the last one failed to nullify or 0.
+ collisions.add(meta);
+
+ meta = collision(meta);
+ }
+ while (meta != failedMeta);
+
+ do { // Go from the last to the first to avoid 'in-flight' state for meta entries.
+ meta = collisions.remove();
+
+ int addr = keyHash(meta) & newMask;
+
+ for (;;) { // Move meta entry to the new table.
+ long toCollision = toTbl.get(addr);
+
+ collision(meta, toCollision);
+
+ if (toTbl.compareAndSet(addr, toCollision, meta))
+ break;
+ }
+ }
+ while (!collisions.isEmpty());
+
+ // Here 'meta' will be a root pointer in old table.
+ if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved.
+ failedMeta = meta;
+
+ i--; // Retry the same address in table because new keys were added.
+ }
+ else
+ failedMeta = 0;
+ }
+
+ // Now old and new tables will be the same again.
+ oldTbl = toTbl;
+
+ state(State.REHASHING, State.READING_WRITING);
+ }
+
+ /**
+ * Switch state.
+ *
+ * @param oldState Expected state.
+ * @param newState New state.
+ */
+ private void state(State oldState, State newState) {
+ if (!state.compareAndSet(oldState, newState))
+ throw new IllegalStateException();
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Value pointer.
+ */
+ @Override protected long value(long meta) {
+ return mem.readLongVolatile(meta + 16);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param oldValPtr Old value.
+ * @param newValPtr New value.
+ * @return {@code true} If succeeded.
+ */
+ private boolean casValue(long meta, long oldValPtr, long newValPtr) {
+ return mem.casLong(meta + 16, oldValPtr, newValPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Collision pointer.
+ */
+ @Override protected long collision(long meta) {
+ return mem.readLongVolatile(meta + 24);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param collision Collision pointer.
+ */
+ @Override protected void collision(long meta, long collision) {
+ assert meta != collision : meta;
+
+ mem.writeLongVolatile(meta + 24, collision);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Last visited value pointer.
+ */
+ private long lastVisitedValue(long meta) {
+ return mem.readLong(meta + 32);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param valPtr Last visited value pointer.
+ */
+ private void lastVisitedValue(long meta, long valPtr) {
+ mem.writeLong(meta + 32, valPtr);
+ }
+
+ /**
+ * Adder. Must not be shared between threads.
+ */
+ private class AdderImpl extends AdderBase {
+ /** */
+ private final Reader keyReader;
+
+ /** */
+ private final AtomicInteger locKeys = new AtomicInteger();
+
+ /** */
+ private final Random rnd = new GridRandom();
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ keyReader = new Reader(keySer);
+
+ rehashIfNeeded(oldTbl);
+
+ adders.add(this);
+ }
+
+ /**
+ * @param in Data input.
+ * @param reuse Reusable key.
+ * @return Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+ KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
+
+ k.tmpKey = keySer.read(in, k.tmpKey);
+
+ k.meta = add(k.tmpKey, null);
+
+ return k;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ A.notNull(val, "val");
+
+ add(key, val);
+ }
+
+ /**
+ * @param tbl Table.
+ */
+ private void incrementKeys(AtomicLongArray tbl) {
+ locKeys.lazySet(locKeys.get() + 1);
+
+ if (rnd.nextInt(tbl.length()) < 512)
+ rehashIfNeeded(tbl);
+ }
+
+ /**
+ * @param keyHash Key hash.
+ * @param keySize Key size.
+ * @param keyPtr Key pointer.
+ * @param valPtr Value page pointer.
+ * @param collisionPtr Pointer to meta with hash collision.
+ * @param lastVisitedVal Last visited value pointer.
+ * @return Created meta page pointer.
+ */
+ private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) {
+ long meta = allocate(40);
+
+ mem.writeInt(meta, keyHash);
+ mem.writeInt(meta + 4, keySize);
+ mem.writeLong(meta + 8, keyPtr);
+ mem.writeLong(meta + 16, valPtr);
+ mem.writeLong(meta + 24, collisionPtr);
+ mem.writeLong(meta + 32, lastVisitedVal);
+
+ return meta;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @return Updated or created meta page pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
+ AtomicLongArray tbl = oldTbl;
+
+ int keyHash = U.hash(key.hashCode());
+
+ long newMetaPtr = 0;
+
+ long valPtr = 0;
+
+ if (val != null) {
+ valPtr = write(12, val, valSer);
+ int valSize = writtenSize() - 12;
+
+ valueSize(valPtr, valSize);
+ }
+
+ for (AtomicLongArray old = null;;) {
+ int addr = keyHash & (tbl.length() - 1);
+
+ long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address.
+
+ if (metaPtrRoot == -1) { // The cell was already moved by rehashing.
+ AtomicLongArray n = newTbl; // Need to read newTbl first here.
+ AtomicLongArray o = oldTbl;
+
+ tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours.
+
+ old = null;
+
+ continue;
+ }
+
+ if (metaPtrRoot != 0) { // Not empty slot.
+ long metaPtr = metaPtrRoot;
+
+ do { // Scan all the collisions.
+ if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key.
+ if (newMetaPtr != 0) // Deallocate new meta if one was allocated.
+ localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer.
+
+ if (valPtr != 0) { // Add value if it exists.
+ long nextValPtr;
+
+ // Values are linked to each other to a stack like structure.
+ // Replace the last value in meta with ours and link it as next.
+ do {
+ nextValPtr = value(metaPtr);
+
+ nextValue(valPtr, nextValPtr);
+ }
+ while (!casValue(metaPtr, nextValPtr, valPtr));
+ }
+
+ return metaPtr;
+ }
+
+ metaPtr = collision(metaPtr);
+ }
+ while (metaPtr != 0);
+
+ // Here we did not find our key, need to check if it was moved by rehashing to the new table.
+ if (old == null) { // If the old table already set, then we will just try to update it.
+ AtomicLongArray n = newTbl;
+
+ if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one.
+ old = tbl;
+ tbl = n;
+
+ continue;
+ }
+ }
+ }
+
+ if (old != null) { // We just checked new table but did not find our key as well as in the old one.
+ tbl = old; // Try to add new key to the old table.
+
+ addr = keyHash & (tbl.length() - 1);
+
+ old = null;
+ }
+
+ if (newMetaPtr == 0) { // Allocate new meta page.
+ long keyPtr = write(0, key, keySer);
+ int keySize = writtenSize();
+
+ if (valPtr != 0)
+ nextValue(valPtr, 0);
+
+ newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0);
+ }
+ else // Update new meta with root pointer collision.
+ collision(newMetaPtr, metaPtrRoot);
+
+ if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one.
+ incrementKeys(tbl);
+
+ return newMetaPtr;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ if (!adders.remove(this))
+ throw new IllegalStateException();
+
+ keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok.
+
+ super.close();
+ }
+
+ /**
+ * Key.
+ */
+ private class KeyImpl implements Key {
+ /** */
+ private long meta;
+
+ /** */
+ private Object tmpKey;
+
+ /**
+ * @return Meta pointer for the key.
+ */
+ public long address() {
+ return meta;
+ }
+
+ /**
+ * @param val Value.
+ */
+ @Override public void add(Value val) {
+ int size = val.size();
+
+ long valPtr = allocate(size + 12);
+
+ val.copyTo(valPtr + 12);
+
+ valueSize(valPtr, size);
+
+ long nextVal;
+
+ do {
+ nextVal = value(meta);
+
+ nextValue(valPtr, nextVal);
+ }
+ while(!casValue(meta, nextVal, valPtr));
+ }
+ }
+ }
+
+ /**
+ * Current map state.
+ */
+ private enum State {
+ /** */
+ REHASHING,
+
+ /** */
+ VISITING,
+
+ /** */
+ READING_WRITING,
+
+ /** */
+ CLOSING
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
new file mode 100644
index 0000000..c32e9af
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Hash multimap.
+ */
+public class HadoopHashMultimap extends HadoopHashMultimapBase {
+ /** */
+ private long[] tbl;
+
+ /** */
+ private int keys;
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ * @param cap Initial capacity.
+ */
+ public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
+ super(jobInfo, mem);
+
+ assert U.isPow2(cap) : cap;
+
+ tbl = new long[cap];
+ }
+
+ /** {@inheritDoc} */
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
+ return new AdderImpl(ctx);
+ }
+
+ /**
+ * Rehash.
+ */
+ private void rehash() {
+ long[] newTbl = new long[tbl.length << 1];
+
+ int newMask = newTbl.length - 1;
+
+ for (long meta : tbl) {
+ while (meta != 0) {
+ long collision = collision(meta);
+
+ int idx = keyHash(meta) & newMask;
+
+ collision(meta, newTbl[idx]);
+
+ newTbl[idx] = meta;
+
+ meta = collision;
+ }
+ }
+
+ tbl = newTbl;
+ }
+
+ /**
+ * @return Keys count.
+ */
+ public int keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int capacity() {
+ return tbl.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long meta(int idx) {
+ return tbl[idx];
+ }
+
+ /**
+ * Adder.
+ */
+ private class AdderImpl extends AdderBase {
+ /** */
+ private final Reader keyReader;
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ keyReader = new Reader(keySer);
+ }
+
+ /**
+ * @param keyHash Key hash.
+ * @param keySize Key size.
+ * @param keyPtr Key pointer.
+ * @param valPtr Value page pointer.
+ * @param collisionPtr Pointer to meta with hash collision.
+ * @return Created meta page pointer.
+ */
+ private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) {
+ long meta = allocate(32);
+
+ mem.writeInt(meta, keyHash);
+ mem.writeInt(meta + 4, keySize);
+ mem.writeLong(meta + 8, keyPtr);
+ mem.writeLong(meta + 16, valPtr);
+ mem.writeLong(meta + 24, collisionPtr);
+
+ return meta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ A.notNull(val, "val");
+
+ int keyHash = U.hash(key.hashCode());
+
+ // Write value.
+ long valPtr = write(12, val, valSer);
+ int valSize = writtenSize() - 12;
+
+ valueSize(valPtr, valSize);
+
+ // Find position in table.
+ int idx = keyHash & (tbl.length - 1);
+
+ long meta = tbl[idx];
+
+ // Search for our key in collisions.
+ while (meta != 0) {
+ if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key.
+ nextValue(valPtr, value(meta));
+
+ value(meta, valPtr);
+
+ return;
+ }
+
+ meta = collision(meta);
+ }
+
+ // Write key.
+ long keyPtr = write(0, key, keySer);
+ int keySize = writtenSize();
+
+ nextValue(valPtr, 0);
+
+ tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]);
+
+ if (++keys > (tbl.length >>> 2) * 3)
+ rehash();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
new file mode 100644
index 0000000..8d9b3c3
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+
+/**
+ * Base class for hash multimaps.
+ */
+public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ */
+ protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+ super(jobInfo, mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+ throw new UnsupportedOperationException("visit");
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return new Input(taskCtx);
+ }
+
+ /**
+ * @return Hash table capacity.
+ */
+ public abstract int capacity();
+
+ /**
+ * @param idx Index in hash table.
+ * @return Meta page pointer.
+ */
+ protected abstract long meta(int idx);
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key hash.
+ */
+ protected int keyHash(long meta) {
+ return mem.readInt(meta);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key size.
+ */
+ protected int keySize(long meta) {
+ return mem.readInt(meta + 4);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key pointer.
+ */
+ protected long key(long meta) {
+ return mem.readLong(meta + 8);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Value pointer.
+ */
+ protected long value(long meta) {
+ return mem.readLong(meta + 16);
+ }
+ /**
+ * @param meta Meta pointer.
+ * @param val Value pointer.
+ */
+ protected void value(long meta, long val) {
+ mem.writeLong(meta + 16, val);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Collision pointer.
+ */
+ protected long collision(long meta) {
+ return mem.readLong(meta + 24);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param collision Collision pointer.
+ */
+ protected void collision(long meta, long collision) {
+ assert meta != collision : meta;
+
+ mem.writeLong(meta + 24, collision);
+ }
+
+ /**
+ * Reader for key and value.
+ */
+ protected class Reader extends ReaderBase {
+ /**
+ * @param ser Serialization.
+ */
+ protected Reader(HadoopSerialization ser) {
+ super(ser);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key.
+ */
+ public Object readKey(long meta) {
+ assert meta > 0 : meta;
+
+ try {
+ return read(key(meta), keySize(meta));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * Task input.
+ */
+ protected class Input implements HadoopTaskInput {
+ /** */
+ private int idx = -1;
+
+ /** */
+ private long metaPtr;
+
+ /** */
+ private final int cap;
+
+ /** */
+ private final Reader keyReader;
+
+ /** */
+ private final Reader valReader;
+
+ /**
+ * @param taskCtx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ cap = capacity();
+
+ keyReader = new Reader(taskCtx.keySerialization());
+ valReader = new Reader(taskCtx.valueSerialization());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (metaPtr != 0) {
+ metaPtr = collision(metaPtr);
+
+ if (metaPtr != 0)
+ return true;
+ }
+
+ while (++idx < cap) { // Scan table.
+ metaPtr = meta(idx);
+
+ if (metaPtr != 0)
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return keyReader.readKey(metaPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ return new ValueIterator(value(metaPtr), valReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ keyReader.close();
+ valReader.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
new file mode 100644
index 0000000..5b71c47
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import java.io.DataInput;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multimap for hadoop intermediate results.
+ */
+@SuppressWarnings("PublicInnerClass")
+public interface HadoopMultimap extends AutoCloseable {
+ /**
+ * Incrementally visits all the keys and values in the map.
+ *
+ * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
+ * @param v Visitor.
+ * @return {@code false} If visiting was impossible.
+ */
+ public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException;
+
+ /**
+ * @param ctx Task context.
+ * @return Adder.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException;
+
+ /**
+ * @param taskCtx Task context.
+ * @return Task input.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopTaskInput input(HadoopTaskContext taskCtx)
+ throws IgniteCheckedException;
+
+ /** {@inheritDoc} */
+ @Override public void close();
+
+ /**
+ * Adder.
+ */
+ public interface Adder extends HadoopTaskOutput {
+ /**
+ * @param in Data input.
+ * @param reuse Reusable key.
+ * @return Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException;
+ }
+
+ /**
+ * Key add values to.
+ */
+ public interface Key {
+ /**
+ * @param val Value.
+ */
+ public void add(Value val);
+ }
+
+ /**
+ * Value.
+ */
+ public interface Value {
+ /**
+ * @return Size in bytes.
+ */
+ public int size();
+
+ /**
+ * @param ptr Pointer.
+ */
+ public void copyTo(long ptr);
+ }
+
+ /**
+ * Key and values visitor.
+ */
+ public interface Visitor {
+ /**
+ * @param keyPtr Key pointer.
+ * @param keySize Key size.
+ */
+ public void onKey(long keyPtr, int keySize) throws IgniteCheckedException;
+
+ /**
+ * @param valPtr Value pointer.
+ * @param valSize Value size.
+ */
+ public void onValue(long valPtr, int valSize) throws IgniteCheckedException;
+ }
+}
\ No newline at end of file