You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:10 UTC
[78/92] [abbrv] ignite git commit: Preparing big move.
Preparing big move.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79b3eff1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79b3eff1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79b3eff1
Branch: refs/heads/ignite-3949
Commit: 79b3eff1b64e96b544b0c1549eff273e17f6724a
Parents: facedf5
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 17:10:14 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 17:10:14 2016 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopContext.java | 200 +++++
.../hadoop/HadoopTaskCancelledException.java | 35 +
.../impl/HadoopTaskCancelledException.java | 35 -
.../hadoop/impl/shuffle/HadoopShuffleAck.java | 92 ---
.../hadoop/impl/shuffle/HadoopShuffleJob.java | 612 ----------------
.../impl/shuffle/HadoopShuffleMessage.java | 242 ------
.../HadoopConcurrentHashMultimap.java | 616 ----------------
.../shuffle/collections/HadoopHashMultimap.java | 176 -----
.../collections/HadoopHashMultimapBase.java | 211 ------
.../shuffle/collections/HadoopMultimap.java | 113 ---
.../shuffle/collections/HadoopMultimapBase.java | 435 -----------
.../shuffle/collections/HadoopSkipList.java | 733 -------------------
.../shuffle/streams/HadoopDataInStream.java | 171 -----
.../shuffle/streams/HadoopDataOutStream.java | 130 ----
.../shuffle/streams/HadoopOffheapBuffer.java | 122 ---
.../hadoop/impl/v1/HadoopV1MapTask.java | 2 +-
.../hadoop/impl/v1/HadoopV1ReduceTask.java | 2 +-
.../processors/hadoop/impl/v1/HadoopV1Task.java | 2 +-
.../hadoop/impl/v2/HadoopV2Context.java | 2 +-
.../hadoop/impl/v2/HadoopV2TaskContext.java | 2 +-
.../hadoop/shuffle/HadoopShuffle.java | 263 +++++++
.../hadoop/shuffle/HadoopShuffleAck.java | 92 +++
.../hadoop/shuffle/HadoopShuffleJob.java | 612 ++++++++++++++++
.../hadoop/shuffle/HadoopShuffleMessage.java | 242 ++++++
.../HadoopConcurrentHashMultimap.java | 616 ++++++++++++++++
.../shuffle/collections/HadoopHashMultimap.java | 176 +++++
.../collections/HadoopHashMultimapBase.java | 211 ++++++
.../shuffle/collections/HadoopMultimap.java | 113 +++
.../shuffle/collections/HadoopMultimapBase.java | 435 +++++++++++
.../shuffle/collections/HadoopSkipList.java | 733 +++++++++++++++++++
.../shuffle/streams/HadoopDataInStream.java | 171 +++++
.../shuffle/streams/HadoopDataOutStream.java | 130 ++++
.../shuffle/streams/HadoopOffheapBuffer.java | 122 +++
.../impl/HadoopTaskExecutionSelfTest.java | 1 +
.../collections/HadoopAbstractMapTest.java | 175 -----
.../HadoopConcurrentHashMultimapSelftest.java | 280 -------
.../collections/HadoopHashMapSelfTest.java | 133 ----
.../collections/HadoopSkipListSelfTest.java | 320 --------
.../streams/HadoopDataStreamSelfTest.java | 153 ----
.../collections/HadoopAbstractMapTest.java | 175 +++++
.../HadoopConcurrentHashMultimapSelftest.java | 278 +++++++
.../collections/HadoopHashMapSelfTest.java | 131 ++++
.../collections/HadoopSkipListSelfTest.java | 318 ++++++++
.../streams/HadoopDataStreamSelfTest.java | 151 ++++
.../testsuites/IgniteHadoopTestSuite.java | 8 +-
.../processors/hadoop/HadoopCommonUtils.java | 3 +
46 files changed, 5217 insertions(+), 4758 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
new file mode 100644
index 0000000..4326ad2
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Hadoop accelerator context.
+ */
+public class HadoopContext {
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Hadoop configuration. */
+ private HadoopConfiguration cfg;
+
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** External task executor. */
+ private HadoopTaskExecutorAdapter taskExecutor;
+
+ /** */
+ private HadoopShuffle shuffle;
+
+ /** Managers list. */
+ private List<HadoopComponent> components = new ArrayList<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public HadoopContext(
+ GridKernalContext ctx,
+ HadoopConfiguration cfg,
+ HadoopJobTracker jobTracker,
+ HadoopTaskExecutorAdapter taskExecutor,
+ HadoopShuffle shuffle
+ ) {
+ this.ctx = ctx;
+ this.cfg = cfg;
+
+ this.jobTracker = add(jobTracker);
+ this.taskExecutor = add(taskExecutor);
+ this.shuffle = add(shuffle);
+ }
+
+ /**
+ * Gets list of managers.
+ *
+ * @return List of managers.
+ */
+ public List<HadoopComponent> components() {
+ return components;
+ }
+
+ /**
+ * Gets kernal context.
+ *
+ * @return Grid kernal context instance.
+ */
+ public GridKernalContext kernalContext() {
+ return ctx;
+ }
+
+ /**
+ * Gets Hadoop configuration.
+ *
+ * @return Hadoop configuration.
+ */
+ public HadoopConfiguration configuration() {
+ return cfg;
+ }
+
+ /**
+ * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
+ *
+ * @return Local node ID.
+ */
+ public UUID localNodeId() {
+ return ctx.localNodeId();
+ }
+
+ /**
+ * Gets local node order.
+ *
+ * @return Local node order.
+ */
+ public long localNodeOrder() {
+ assert ctx.discovery() != null;
+
+ return ctx.discovery().localNode().order();
+ }
+
+ /**
+ * @return Hadoop-enabled nodes.
+ */
+ public Collection<ClusterNode> nodes() {
+ return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx());
+ }
+
+ /**
+ * @return {@code True} if
+ */
+ public boolean jobUpdateLeader() {
+ long minOrder = Long.MAX_VALUE;
+ ClusterNode minOrderNode = null;
+
+ for (ClusterNode node : nodes()) {
+ if (node.order() < minOrder) {
+ minOrder = node.order();
+ minOrderNode = node;
+ }
+ }
+
+ assert minOrderNode != null;
+
+ return localNodeId().equals(minOrderNode.id());
+ }
+
+ /**
+ * @param meta Job metadata.
+ * @return {@code true} If local node is participating in job execution.
+ */
+ public boolean isParticipating(HadoopJobMetadata meta) {
+ UUID locNodeId = localNodeId();
+
+ if (locNodeId.equals(meta.submitNodeId()))
+ return true;
+
+ HadoopMapReducePlan plan = meta.mapReducePlan();
+
+ return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
+ }
+
+ /**
+ * @return Jon tracker instance.
+ */
+ public HadoopJobTracker jobTracker() {
+ return jobTracker;
+ }
+
+ /**
+ * @return Task executor.
+ */
+ public HadoopTaskExecutorAdapter taskExecutor() {
+ return taskExecutor;
+ }
+
+ /**
+ * @return Shuffle.
+ */
+ public HadoopShuffle shuffle() {
+ return shuffle;
+ }
+
+ /**
+ * @return Map-reduce planner.
+ */
+ public HadoopMapReducePlanner planner() {
+ return cfg.getMapReducePlanner();
+ }
+
+ /**
+ * Adds component.
+ *
+ * @param c Component to add.
+ * @return Added manager.
+ */
+ private <C extends HadoopComponent> C add(C c) {
+ components.add(c);
+
+ return c;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
new file mode 100644
index 0000000..1dc8674
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteException;
+
+/**
+ * Exception that throws when the task is cancelling.
+ */
+public class HadoopTaskCancelledException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param msg Exception message.
+ */
+ public HadoopTaskCancelledException(String msg) {
+ super(msg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java
deleted file mode 100644
index 85df551..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import org.apache.ignite.IgniteException;
-
-/**
- * Exception that throws when the task is cancelling.
- */
-public class HadoopTaskCancelledException extends IgniteException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param msg Exception message.
- */
- public HadoopTaskCancelledException(String msg) {
- super(msg);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleAck.java
deleted file mode 100644
index 3c177be..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleAck.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Acknowledgement message.
- */
-public class HadoopShuffleAck implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- @GridToStringInclude
- private long msgId;
-
- /** */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /**
- *
- */
- public HadoopShuffleAck() {
- // No-op.
- }
-
- /**
- * @param msgId Message ID.
- */
- public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
- assert jobId != null;
-
- this.msgId = msgId;
- this.jobId = jobId;
- }
-
- /**
- * @return Message ID.
- */
- public long id() {
- return msgId;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
- out.writeLong(msgId);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
-
- jobId.readExternal(in);
- msgId = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopShuffleAck.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleJob.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleJob.java
deleted file mode 100644
index 6e31d36..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleJob.java
+++ /dev/null
@@ -1,612 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopConcurrentHashMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopSkipList;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.thread.IgniteThread;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-
-/**
- * Shuffle job.
- */
-public class HadoopShuffleJob<T> implements AutoCloseable {
- /** */
- private static final int MSG_BUF_SIZE = 128 * 1024;
-
- /** */
- private final HadoopJob job;
-
- /** */
- private final GridUnsafeMemory mem;
-
- /** */
- private final boolean needPartitioner;
-
- /** Collection of task contexts for each reduce task. */
- private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
-
- /** Reducers addresses. */
- private T[] reduceAddrs;
-
- /** Local reducers address. */
- private final T locReduceAddr;
-
- /** */
- private final HadoopShuffleMessage[] msgs;
-
- /** */
- private final AtomicReferenceArray<HadoopMultimap> maps;
-
- /** */
- private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
-
- /** */
- protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs =
- new ConcurrentHashMap<>();
-
- /** */
- private volatile GridWorker snd;
-
- /** Latch for remote addresses waiting. */
- private final CountDownLatch ioInitLatch = new CountDownLatch(1);
-
- /** Finished flag. Set on flush or close. */
- private volatile boolean flushed;
-
- /** */
- private final IgniteLogger log;
-
- /**
- * @param locReduceAddr Local reducer address.
- * @param log Logger.
- * @param job Job.
- * @param mem Memory.
- * @param totalReducerCnt Amount of reducers in the Job.
- * @param locReducers Reducers will work on current node.
- * @throws IgniteCheckedException If error.
- */
- public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
- int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
- this.locReduceAddr = locReduceAddr;
- this.job = job;
- this.mem = mem;
- this.log = log.getLogger(HadoopShuffleJob.class);
-
- if (!F.isEmpty(locReducers)) {
- for (int rdc : locReducers) {
- HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
-
- reducersCtx.put(rdc, job.getTaskContext(taskInfo));
- }
- }
-
- needPartitioner = totalReducerCnt > 1;
-
- maps = new AtomicReferenceArray<>(totalReducerCnt);
- msgs = new HadoopShuffleMessage[totalReducerCnt];
- }
-
- /**
- * @param reduceAddrs Addresses of reducers.
- * @return {@code True} if addresses were initialized by this call.
- */
- public boolean initializeReduceAddresses(T[] reduceAddrs) {
- if (this.reduceAddrs == null) {
- this.reduceAddrs = reduceAddrs;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return {@code True} if reducers addresses were initialized.
- */
- public boolean reducersInitialized() {
- return reduceAddrs != null;
- }
-
- /**
- * @param gridName Grid name.
- * @param io IO Closure for sending messages.
- */
- @SuppressWarnings("BusyWait")
- public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) {
- assert snd == null;
- assert io != null;
-
- this.io = io;
-
- if (!flushed) {
- snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
- @Override protected void body() throws InterruptedException {
- try {
- while (!isCancelled()) {
- Thread.sleep(5);
-
- collectUpdatesAndSend(false);
- }
- }
- catch (IgniteCheckedException e) {
- throw new IllegalStateException(e);
- }
- }
- };
-
- new IgniteThread(snd).start();
- }
-
- ioInitLatch.countDown();
- }
-
- /**
- * @param maps Maps.
- * @param idx Index.
- * @return Map.
- */
- private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx) {
- HadoopMultimap map = maps.get(idx);
-
- if (map == null) { // Create new map.
- map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
- new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
- new HadoopSkipList(job.info(), mem);
-
- if (!maps.compareAndSet(idx, null, map)) {
- map.close();
-
- return maps.get(idx);
- }
- }
-
- return map;
- }
-
- /**
- * @param msg Message.
- * @throws IgniteCheckedException Exception.
- */
- public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException {
- assert msg.buffer() != null;
- assert msg.offset() > 0;
-
- HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
-
- HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
-
- perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
-
- HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
-
- // Add data from message to the map.
- try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
- final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
- final UnsafeValue val = new UnsafeValue(msg.buffer());
-
- msg.visit(new HadoopShuffleMessage.Visitor() {
- /** */
- private HadoopMultimap.Key key;
-
- @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException {
- dataInput.bytes(buf, off, off + len);
-
- key = adder.addKey(dataInput, key);
- }
-
- @Override public void onValue(byte[] buf, int off, int len) {
- val.off = off;
- val.size = len;
-
- key.add(val);
- }
- });
- }
- }
-
- /**
- * @param ack Shuffle ack.
- */
- @SuppressWarnings("ConstantConditions")
- public void onShuffleAck(HadoopShuffleAck ack) {
- IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup = sentMsgs.get(ack.id());
-
- if (tup != null)
- tup.get2().onDone();
- else
- log.warning("Received shuffle ack for not registered shuffle id: " + ack);
- }
-
- /**
- * Unsafe value.
- */
- private static class UnsafeValue implements HadoopMultimap.Value {
- /** */
- private final byte[] buf;
-
- /** */
- private int off;
-
- /** */
- private int size;
-
- /**
- * @param buf Buffer.
- */
- private UnsafeValue(byte[] buf) {
- assert buf != null;
-
- this.buf = buf;
- }
-
- /** */
- @Override public int size() {
- return size;
- }
-
- /** */
- @Override public void copyTo(long ptr) {
- GridUnsafe.copyMemory(buf, GridUnsafe.BYTE_ARR_OFF + off, null, ptr, size);
- }
- }
-
- /**
- * Sends map updates to remote reducers.
- */
- private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
- for (int i = 0; i < maps.length(); i++) {
- HadoopMultimap map = maps.get(i);
-
- if (map == null || locReduceAddr.equals(reduceAddrs[i]))
- continue; // Skip empty map and local node.
-
- if (msgs[i] == null)
- msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
-
- final int idx = i;
-
- map.visit(false, new HadoopMultimap.Visitor() {
- /** */
- private long keyPtr;
-
- /** */
- private int keySize;
-
- /** */
- private boolean keyAdded;
-
- /** {@inheritDoc} */
- @Override public void onKey(long keyPtr, int keySize) {
- this.keyPtr = keyPtr;
- this.keySize = keySize;
-
- keyAdded = false;
- }
-
- private boolean tryAdd(long valPtr, int valSize) {
- HadoopShuffleMessage msg = msgs[idx];
-
- if (!keyAdded) { // Add key and value.
- int size = keySize + valSize;
-
- if (!msg.available(size, false))
- return false;
-
- msg.addKey(keyPtr, keySize);
- msg.addValue(valPtr, valSize);
-
- keyAdded = true;
-
- return true;
- }
-
- if (!msg.available(valSize, true))
- return false;
-
- msg.addValue(valPtr, valSize);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void onValue(long valPtr, int valSize) {
- if (tryAdd(valPtr, valSize))
- return;
-
- send(idx, keySize + valSize);
-
- keyAdded = false;
-
- if (!tryAdd(valPtr, valSize))
- throw new IllegalStateException();
- }
- });
-
- if (flush && msgs[i].offset() != 0)
- send(i, 0);
- }
- }
-
- /**
- * @param idx Index of message.
- * @param newBufMinSize Min new buffer size.
- */
- private void send(final int idx, int newBufMinSize) {
- final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
-
- HadoopShuffleMessage msg = msgs[idx];
-
- final long msgId = msg.id();
-
- IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
- new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut));
-
- assert old == null;
-
- try {
- io.apply(reduceAddrs[idx], msg);
- }
- catch (GridClosureException e) {
- fut.onDone(U.unwrap(e));
- }
-
- fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- try {
- f.get();
-
- // Clean up the future from map only if there was no exception.
- // Otherwise flush() should fail.
- sentMsgs.remove(msgId);
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to send message.", e);
- }
- }
- });
-
- msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
- Math.max(MSG_BUF_SIZE, newBufMinSize));
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- if (snd != null) {
- snd.cancel();
-
- try {
- snd.join();
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-
- close(maps);
- }
-
- /**
- * @param maps Maps.
- */
- private void close(AtomicReferenceArray<HadoopMultimap> maps) {
- for (int i = 0; i < maps.length(); i++) {
- HadoopMultimap map = maps.get(i);
-
- if (map != null)
- map.close();
- }
- }
-
- /**
- * @return Future.
- */
- @SuppressWarnings("unchecked")
- public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Flushing job " + job.id() + " on address " + locReduceAddr);
-
- flushed = true;
-
- if (maps.length() == 0)
- return new GridFinishedFuture<>();
-
- U.await(ioInitLatch);
-
- GridWorker snd0 = snd;
-
- if (snd0 != null) {
- if (log.isDebugEnabled())
- log.debug("Cancelling sender thread.");
-
- snd0.cancel();
-
- try {
- snd0.join();
-
- if (log.isDebugEnabled())
- log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-
- collectUpdatesAndSend(true); // With flush.
-
- if (log.isDebugEnabled())
- log.debug("Finished sending collected updates to remote reducers: " + job.id());
-
- GridCompoundFuture fut = new GridCompoundFuture<>();
-
- for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values())
- fut.add(tup.get2());
-
- fut.markInitialized();
-
- if (log.isDebugEnabled())
- log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
-
- return fut;
- }
-
- /**
- * @param taskCtx Task context.
- * @return Output.
- * @throws IgniteCheckedException If failed.
- */
- public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- switch (taskCtx.taskInfo().type()) {
- case MAP:
- assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined.";
-
- case COMBINE:
- return new PartitionedOutput(taskCtx);
-
- default:
- throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
- }
- }
-
- /**
- * @param taskCtx Task context.
- * @return Input.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- switch (taskCtx.taskInfo().type()) {
- case REDUCE:
- int reducer = taskCtx.taskInfo().taskNumber();
-
- HadoopMultimap m = maps.get(reducer);
-
- if (m != null)
- return m.input(taskCtx);
-
- return new HadoopTaskInput() { // Empty input.
- @Override public boolean next() {
- return false;
- }
-
- @Override public Object key() {
- throw new IllegalStateException();
- }
-
- @Override public Iterator<?> values() {
- throw new IllegalStateException();
- }
-
- @Override public void close() {
- // No-op.
- }
- };
-
- default:
- throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
- }
- }
-
- /**
- * Partitioned output.
- */
- private class PartitionedOutput implements HadoopTaskOutput {
- /** */
- private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
-
- /** */
- private HadoopPartitioner partitioner;
-
- /** */
- private final HadoopTaskContext taskCtx;
-
- /**
- * Constructor.
- * @param taskCtx Task context.
- */
- private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- this.taskCtx = taskCtx;
-
- if (needPartitioner)
- partitioner = taskCtx.partitioner();
- }
-
- /** {@inheritDoc} */
- @Override public void write(Object key, Object val) throws IgniteCheckedException {
- int part = 0;
-
- if (partitioner != null) {
- part = partitioner.partition(key, val, adders.length);
-
- if (part < 0 || part >= adders.length)
- throw new IgniteCheckedException("Invalid partition: " + part);
- }
-
- HadoopTaskOutput out = adders[part];
-
- if (out == null)
- adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
-
- out.write(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- for (HadoopTaskOutput adder : adders) {
- if (adder != null)
- adder.close();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleMessage.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleMessage.java
deleted file mode 100644
index 78497d0..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffleMessage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Shuffle message.
- */
-public class HadoopShuffleMessage implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private static final AtomicLong ids = new AtomicLong();
-
- /** */
- private static final byte MARKER_KEY = (byte)17;
-
- /** */
- private static final byte MARKER_VALUE = (byte)31;
-
- /** */
- @GridToStringInclude
- private long msgId;
-
- /** */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /** */
- @GridToStringInclude
- private int reducer;
-
- /** */
- private byte[] buf;
-
- /** */
- @GridToStringInclude
- private int off;
-
- /**
- *
- */
- public HadoopShuffleMessage() {
- // No-op.
- }
-
- /**
- * @param size Size.
- */
- public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
- assert jobId != null;
-
- buf = new byte[size];
-
- this.jobId = jobId;
- this.reducer = reducer;
-
- msgId = ids.incrementAndGet();
- }
-
- /**
- * @return Message ID.
- */
- public long id() {
- return msgId;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @return Reducer.
- */
- public int reducer() {
- return reducer;
- }
-
- /**
- * @return Buffer.
- */
- public byte[] buffer() {
- return buf;
- }
-
- /**
- * @return Offset.
- */
- public int offset() {
- return off;
- }
-
- /**
- * @param size Size.
- * @param valOnly Only value wll be added.
- * @return {@code true} If this message can fit additional data of this size
- */
- public boolean available(int size, boolean valOnly) {
- size += valOnly ? 5 : 10;
-
- if (off + size > buf.length) {
- if (off == 0) { // Resize if requested size is too big.
- buf = new byte[size];
-
- return true;
- }
-
- return false;
- }
-
- return true;
- }
-
- /**
- * @param keyPtr Key pointer.
- * @param keySize Key size.
- */
- public void addKey(long keyPtr, int keySize) {
- add(MARKER_KEY, keyPtr, keySize);
- }
-
- /**
- * @param valPtr Value pointer.
- * @param valSize Value size.
- */
- public void addValue(long valPtr, int valSize) {
- add(MARKER_VALUE, valPtr, valSize);
- }
-
- /**
- * @param marker Marker.
- * @param ptr Pointer.
- * @param size Size.
- */
- private void add(byte marker, long ptr, int size) {
- buf[off++] = marker;
-
- GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
- off += 4;
-
- GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
- off += size;
- }
-
- /**
- * @param v Visitor.
- */
- public void visit(Visitor v) throws IgniteCheckedException {
- for (int i = 0; i < off;) {
- byte marker = buf[i++];
-
- int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
-
- i += 4;
-
- if (marker == MARKER_VALUE)
- v.onValue(buf, i, size);
- else if (marker == MARKER_KEY)
- v.onKey(buf, i, size);
- else
- throw new IllegalStateException();
-
- i += size;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
- out.writeLong(msgId);
- out.writeInt(reducer);
- out.writeInt(off);
- U.writeByteArray(out, buf);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
-
- jobId.readExternal(in);
- msgId = in.readLong();
- reducer = in.readInt();
- off = in.readInt();
- buf = U.readByteArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopShuffleMessage.class, this);
- }
-
- /**
- * Visitor.
- */
- public static interface Visitor {
- /**
- * @param buf Buffer.
- * @param off Offset.
- * @param len Length.
- */
- public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
-
- /**
- * @param buf Buffer.
- * @param off Offset.
- * @param len Length.
- */
- public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimap.java
deleted file mode 100644
index cc73cf9..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimap.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import java.io.DataInput;
-import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Multimap for map reduce intermediate results.
- */
-public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
- /** */
- private final AtomicReference<State> state = new AtomicReference<>(State.READING_WRITING);
-
- /** */
- private volatile AtomicLongArray oldTbl;
-
- /** */
- private volatile AtomicLongArray newTbl;
-
- /** */
- private final AtomicInteger keys = new AtomicInteger();
-
- /** */
- private final CopyOnWriteArrayList<AdderImpl> adders = new CopyOnWriteArrayList<>();
-
- /** */
- private final AtomicInteger inputs = new AtomicInteger();
-
- /**
- * @param jobInfo Job info.
- * @param mem Memory.
- * @param cap Initial capacity.
- */
- public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
- super(jobInfo, mem);
-
- assert U.isPow2(cap);
-
- newTbl = oldTbl = new AtomicLongArray(cap);
- }
-
- /**
- * @return Number of keys.
- */
- public long keys() {
- int res = keys.get();
-
- for (AdderImpl adder : adders)
- res += adder.locKeys.get();
-
- return res;
- }
-
- /**
- * @return Current table capacity.
- */
- @Override public int capacity() {
- return oldTbl.length();
- }
-
- /**
- * @return Adder object.
- * @param ctx Task context.
- */
- @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
- if (inputs.get() != 0)
- throw new IllegalStateException("Active inputs.");
-
- if (state.get() == State.CLOSING)
- throw new IllegalStateException("Closed.");
-
- return new AdderImpl(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- assert inputs.get() == 0 : inputs.get();
- assert adders.isEmpty() : adders.size();
-
- state(State.READING_WRITING, State.CLOSING);
-
- if (keys() == 0)
- return;
-
- super.close();
- }
-
- /** {@inheritDoc} */
- @Override protected long meta(int idx) {
- return oldTbl.get(idx);
- }
-
- /**
- * Incrementally visits all the keys and values in the map.
- *
- * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
- * @param v Visitor.
- * @return {@code false} If visiting was impossible due to rehashing.
- */
- @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
- if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) {
- assert state.get() != State.CLOSING;
-
- return false; // Can not visit while rehashing happens.
- }
-
- AtomicLongArray tbl0 = oldTbl;
-
- for (int i = 0; i < tbl0.length(); i++) {
- long meta = tbl0.get(i);
-
- while (meta != 0) {
- long valPtr = value(meta);
-
- long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
-
- if (valPtr != lastVisited) {
- v.onKey(key(meta), keySize(meta));
-
- lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
-
- do {
- v.onValue(valPtr + 12, valueSize(valPtr));
-
- valPtr = nextValue(valPtr);
- }
- while (valPtr != lastVisited);
- }
-
- meta = collision(meta);
- }
- }
-
- state(State.VISITING, State.READING_WRITING);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- inputs.incrementAndGet();
-
- if (!adders.isEmpty())
- throw new IllegalStateException("Active adders.");
-
- State s = state.get();
-
- if (s == State.CLOSING)
- throw new IllegalStateException("Closed.");
-
- assert s != State.REHASHING;
-
- return new Input(taskCtx) {
- @Override public void close() throws IgniteCheckedException {
- if (inputs.decrementAndGet() < 0)
- throw new IllegalStateException();
-
- super.close();
- }
- };
- }
-
- /**
- * @param fromTbl Table.
- */
- private void rehashIfNeeded(AtomicLongArray fromTbl) {
- if (fromTbl.length() == Integer.MAX_VALUE)
- return;
-
- long keys0 = keys();
-
- if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash.
- return;
-
- if (fromTbl != newTbl) // Check if someone else have done the job.
- return;
-
- if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) {
- assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash.
-
- return;
- }
-
- if (fromTbl != newTbl) { // Double check.
- state(State.REHASHING, State.READING_WRITING); // Switch back.
-
- return;
- }
-
- // Calculate new table capacity.
- int newLen = fromTbl.length();
-
- do {
- newLen <<= 1;
- }
- while (newLen < keys0);
-
- if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4.
- newLen <<= 1;
-
- // This is our target table for rehashing.
- AtomicLongArray toTbl = new AtomicLongArray(newLen);
-
- // Make the new table visible before rehashing.
- newTbl = toTbl;
-
- // Rehash.
- int newMask = newLen - 1;
-
- long failedMeta = 0;
-
- GridLongList collisions = new GridLongList(16);
-
- for (int i = 0; i < fromTbl.length(); i++) { // Scan source table.
- long meta = fromTbl.get(i);
-
- assert meta != -1;
-
- if (meta == 0) { // No entry.
- failedMeta = 0;
-
- if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved.
- i--; // Retry.
-
- continue;
- }
-
- do { // Collect all the collisions before the last one failed to nullify or 0.
- collisions.add(meta);
-
- meta = collision(meta);
- }
- while (meta != failedMeta);
-
- do { // Go from the last to the first to avoid 'in-flight' state for meta entries.
- meta = collisions.remove();
-
- int addr = keyHash(meta) & newMask;
-
- for (;;) { // Move meta entry to the new table.
- long toCollision = toTbl.get(addr);
-
- collision(meta, toCollision);
-
- if (toTbl.compareAndSet(addr, toCollision, meta))
- break;
- }
- }
- while (!collisions.isEmpty());
-
- // Here 'meta' will be a root pointer in old table.
- if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved.
- failedMeta = meta;
-
- i--; // Retry the same address in table because new keys were added.
- }
- else
- failedMeta = 0;
- }
-
- // Now old and new tables will be the same again.
- oldTbl = toTbl;
-
- state(State.REHASHING, State.READING_WRITING);
- }
-
- /**
- * Switch state.
- *
- * @param oldState Expected state.
- * @param newState New state.
- */
- private void state(State oldState, State newState) {
- if (!state.compareAndSet(oldState, newState))
- throw new IllegalStateException();
- }
-
- /**
- * @param meta Meta pointer.
- * @return Value pointer.
- */
- @Override protected long value(long meta) {
- return mem.readLongVolatile(meta + 16);
- }
-
- /**
- * @param meta Meta pointer.
- * @param oldValPtr Old value.
- * @param newValPtr New value.
- * @return {@code true} If succeeded.
- */
- private boolean casValue(long meta, long oldValPtr, long newValPtr) {
- return mem.casLong(meta + 16, oldValPtr, newValPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Collision pointer.
- */
- @Override protected long collision(long meta) {
- return mem.readLongVolatile(meta + 24);
- }
-
- /**
- * @param meta Meta pointer.
- * @param collision Collision pointer.
- */
- @Override protected void collision(long meta, long collision) {
- assert meta != collision : meta;
-
- mem.writeLongVolatile(meta + 24, collision);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Last visited value pointer.
- */
- private long lastVisitedValue(long meta) {
- return mem.readLong(meta + 32);
- }
-
- /**
- * @param meta Meta pointer.
- * @param valPtr Last visited value pointer.
- */
- private void lastVisitedValue(long meta, long valPtr) {
- mem.writeLong(meta + 32, valPtr);
- }
-
- /**
- * Adder. Must not be shared between threads.
- */
- private class AdderImpl extends AdderBase {
- /** */
- private final Reader keyReader;
-
- /** */
- private final AtomicInteger locKeys = new AtomicInteger();
-
- /** */
- private final Random rnd = new GridRandom();
-
- /**
- * @param ctx Task context.
- * @throws IgniteCheckedException If failed.
- */
- private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
- super(ctx);
-
- keyReader = new Reader(keySer);
-
- rehashIfNeeded(oldTbl);
-
- adders.add(this);
- }
-
- /**
- * @param in Data input.
- * @param reuse Reusable key.
- * @return Key.
- * @throws IgniteCheckedException If failed.
- */
- @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
- KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
-
- k.tmpKey = keySer.read(in, k.tmpKey);
-
- k.meta = add(k.tmpKey, null);
-
- return k;
- }
-
- /** {@inheritDoc} */
- @Override public void write(Object key, Object val) throws IgniteCheckedException {
- A.notNull(val, "val");
-
- add(key, val);
- }
-
- /**
- * @param tbl Table.
- */
- private void incrementKeys(AtomicLongArray tbl) {
- locKeys.lazySet(locKeys.get() + 1);
-
- if (rnd.nextInt(tbl.length()) < 512)
- rehashIfNeeded(tbl);
- }
-
- /**
- * @param keyHash Key hash.
- * @param keySize Key size.
- * @param keyPtr Key pointer.
- * @param valPtr Value page pointer.
- * @param collisionPtr Pointer to meta with hash collision.
- * @param lastVisitedVal Last visited value pointer.
- * @return Created meta page pointer.
- */
- private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) {
- long meta = allocate(40);
-
- mem.writeInt(meta, keyHash);
- mem.writeInt(meta + 4, keySize);
- mem.writeLong(meta + 8, keyPtr);
- mem.writeLong(meta + 16, valPtr);
- mem.writeLong(meta + 24, collisionPtr);
- mem.writeLong(meta + 32, lastVisitedVal);
-
- return meta;
- }
-
- /**
- * @param key Key.
- * @param val Value.
- * @return Updated or created meta page pointer.
- * @throws IgniteCheckedException If failed.
- */
- private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
- AtomicLongArray tbl = oldTbl;
-
- int keyHash = U.hash(key.hashCode());
-
- long newMetaPtr = 0;
-
- long valPtr = 0;
-
- if (val != null) {
- valPtr = write(12, val, valSer);
- int valSize = writtenSize() - 12;
-
- valueSize(valPtr, valSize);
- }
-
- for (AtomicLongArray old = null;;) {
- int addr = keyHash & (tbl.length() - 1);
-
- long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address.
-
- if (metaPtrRoot == -1) { // The cell was already moved by rehashing.
- AtomicLongArray n = newTbl; // Need to read newTbl first here.
- AtomicLongArray o = oldTbl;
-
- tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours.
-
- old = null;
-
- continue;
- }
-
- if (metaPtrRoot != 0) { // Not empty slot.
- long metaPtr = metaPtrRoot;
-
- do { // Scan all the collisions.
- if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key.
- if (newMetaPtr != 0) // Deallocate new meta if one was allocated.
- localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer.
-
- if (valPtr != 0) { // Add value if it exists.
- long nextValPtr;
-
- // Values are linked to each other to a stack like structure.
- // Replace the last value in meta with ours and link it as next.
- do {
- nextValPtr = value(metaPtr);
-
- nextValue(valPtr, nextValPtr);
- }
- while (!casValue(metaPtr, nextValPtr, valPtr));
- }
-
- return metaPtr;
- }
-
- metaPtr = collision(metaPtr);
- }
- while (metaPtr != 0);
-
- // Here we did not find our key, need to check if it was moved by rehashing to the new table.
- if (old == null) { // If the old table already set, then we will just try to update it.
- AtomicLongArray n = newTbl;
-
- if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one.
- old = tbl;
- tbl = n;
-
- continue;
- }
- }
- }
-
- if (old != null) { // We just checked new table but did not find our key as well as in the old one.
- tbl = old; // Try to add new key to the old table.
-
- addr = keyHash & (tbl.length() - 1);
-
- old = null;
- }
-
- if (newMetaPtr == 0) { // Allocate new meta page.
- long keyPtr = write(0, key, keySer);
- int keySize = writtenSize();
-
- if (valPtr != 0)
- nextValue(valPtr, 0);
-
- newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0);
- }
- else // Update new meta with root pointer collision.
- collision(newMetaPtr, metaPtrRoot);
-
- if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one.
- incrementKeys(tbl);
-
- return newMetaPtr;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- if (!adders.remove(this))
- throw new IllegalStateException();
-
- keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok.
-
- super.close();
- }
-
- /**
- * Key.
- */
- private class KeyImpl implements Key {
- /** */
- private long meta;
-
- /** */
- private Object tmpKey;
-
- /**
- * @return Meta pointer for the key.
- */
- public long address() {
- return meta;
- }
-
- /**
- * @param val Value.
- */
- @Override public void add(Value val) {
- int size = val.size();
-
- long valPtr = allocate(size + 12);
-
- val.copyTo(valPtr + 12);
-
- valueSize(valPtr, size);
-
- long nextVal;
-
- do {
- nextVal = value(meta);
-
- nextValue(valPtr, nextVal);
- }
- while(!casValue(meta, nextVal, valPtr));
- }
- }
- }
-
- /**
- * Current map state.
- */
- private enum State {
- /** */
- REHASHING,
-
- /** */
- VISITING,
-
- /** */
- READING_WRITING,
-
- /** */
- CLOSING
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimap.java
deleted file mode 100644
index 4464324..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimap.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Hash multimap.
- */
-public class HadoopHashMultimap extends HadoopHashMultimapBase {
- /** */
- private long[] tbl;
-
- /** */
- private int keys;
-
- /**
- * @param jobInfo Job info.
- * @param mem Memory.
- * @param cap Initial capacity.
- */
- public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
- super(jobInfo, mem);
-
- assert U.isPow2(cap) : cap;
-
- tbl = new long[cap];
- }
-
- /** {@inheritDoc} */
- @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
- return new AdderImpl(ctx);
- }
-
- /**
- * Rehash.
- */
- private void rehash() {
- long[] newTbl = new long[tbl.length << 1];
-
- int newMask = newTbl.length - 1;
-
- for (long meta : tbl) {
- while (meta != 0) {
- long collision = collision(meta);
-
- int idx = keyHash(meta) & newMask;
-
- collision(meta, newTbl[idx]);
-
- newTbl[idx] = meta;
-
- meta = collision;
- }
- }
-
- tbl = newTbl;
- }
-
- /**
- * @return Keys count.
- */
- public int keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public int capacity() {
- return tbl.length;
- }
-
- /** {@inheritDoc} */
- @Override protected long meta(int idx) {
- return tbl[idx];
- }
-
- /**
- * Adder.
- */
- private class AdderImpl extends AdderBase {
- /** */
- private final Reader keyReader;
-
- /**
- * @param ctx Task context.
- * @throws IgniteCheckedException If failed.
- */
- protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
- super(ctx);
-
- keyReader = new Reader(keySer);
- }
-
- /**
- * @param keyHash Key hash.
- * @param keySize Key size.
- * @param keyPtr Key pointer.
- * @param valPtr Value page pointer.
- * @param collisionPtr Pointer to meta with hash collision.
- * @return Created meta page pointer.
- */
- private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) {
- long meta = allocate(32);
-
- mem.writeInt(meta, keyHash);
- mem.writeInt(meta + 4, keySize);
- mem.writeLong(meta + 8, keyPtr);
- mem.writeLong(meta + 16, valPtr);
- mem.writeLong(meta + 24, collisionPtr);
-
- return meta;
- }
-
- /** {@inheritDoc} */
- @Override public void write(Object key, Object val) throws IgniteCheckedException {
- A.notNull(val, "val");
-
- int keyHash = U.hash(key.hashCode());
-
- // Write value.
- long valPtr = write(12, val, valSer);
- int valSize = writtenSize() - 12;
-
- valueSize(valPtr, valSize);
-
- // Find position in table.
- int idx = keyHash & (tbl.length - 1);
-
- long meta = tbl[idx];
-
- // Search for our key in collisions.
- while (meta != 0) {
- if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key.
- nextValue(valPtr, value(meta));
-
- value(meta, valPtr);
-
- return;
- }
-
- meta = collision(meta);
- }
-
- // Write key.
- long keyPtr = write(0, key, keySer);
- int keySize = writtenSize();
-
- nextValue(valPtr, 0);
-
- tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]);
-
- if (++keys > (tbl.length >>> 2) * 3)
- rehash();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimapBase.java
deleted file mode 100644
index 988f629..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMultimapBase.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import java.util.Iterator;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-
-/**
- * Base class for hash multimaps.
- */
-public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
- /**
- * @param jobInfo Job info.
- * @param mem Memory.
- */
- protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
- super(jobInfo, mem);
- }
-
- /** {@inheritDoc} */
- @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
- throw new UnsupportedOperationException("visit");
- }
-
- /** {@inheritDoc} */
- @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return new Input(taskCtx);
- }
-
- /**
- * @return Hash table capacity.
- */
- public abstract int capacity();
-
- /**
- * @param idx Index in hash table.
- * @return Meta page pointer.
- */
- protected abstract long meta(int idx);
-
- /**
- * @param meta Meta pointer.
- * @return Key hash.
- */
- protected int keyHash(long meta) {
- return mem.readInt(meta);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key size.
- */
- protected int keySize(long meta) {
- return mem.readInt(meta + 4);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key pointer.
- */
- protected long key(long meta) {
- return mem.readLong(meta + 8);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Value pointer.
- */
- protected long value(long meta) {
- return mem.readLong(meta + 16);
- }
- /**
- * @param meta Meta pointer.
- * @param val Value pointer.
- */
- protected void value(long meta, long val) {
- mem.writeLong(meta + 16, val);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Collision pointer.
- */
- protected long collision(long meta) {
- return mem.readLong(meta + 24);
- }
-
- /**
- * @param meta Meta pointer.
- * @param collision Collision pointer.
- */
- protected void collision(long meta, long collision) {
- assert meta != collision : meta;
-
- mem.writeLong(meta + 24, collision);
- }
-
- /**
- * Reader for key and value.
- */
- protected class Reader extends ReaderBase {
- /**
- * @param ser Serialization.
- */
- protected Reader(HadoopSerialization ser) {
- super(ser);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key.
- */
- public Object readKey(long meta) {
- assert meta > 0 : meta;
-
- try {
- return read(key(meta), keySize(meta));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-
- /**
- * Task input.
- */
- protected class Input implements HadoopTaskInput {
- /** */
- private int idx = -1;
-
- /** */
- private long metaPtr;
-
- /** */
- private final int cap;
-
- /** */
- private final Reader keyReader;
-
- /** */
- private final Reader valReader;
-
- /**
- * @param taskCtx Task context.
- * @throws IgniteCheckedException If failed.
- */
- public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- cap = capacity();
-
- keyReader = new Reader(taskCtx.keySerialization());
- valReader = new Reader(taskCtx.valueSerialization());
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() {
- if (metaPtr != 0) {
- metaPtr = collision(metaPtr);
-
- if (metaPtr != 0)
- return true;
- }
-
- while (++idx < cap) { // Scan table.
- metaPtr = meta(idx);
-
- if (metaPtr != 0)
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public Object key() {
- return keyReader.readKey(metaPtr);
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<?> values() {
- return new ValueIterator(value(metaPtr), valReader);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- keyReader.close();
- valReader.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimap.java
deleted file mode 100644
index fc76936..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimap.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import java.io.DataInput;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Multimap for hadoop intermediate results.
- */
-@SuppressWarnings("PublicInnerClass")
-public interface HadoopMultimap extends AutoCloseable {
- /**
- * Incrementally visits all the keys and values in the map.
- *
- * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
- * @param v Visitor.
- * @return {@code false} If visiting was impossible.
- */
- public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException;
-
- /**
- * @param ctx Task context.
- * @return Adder.
- * @throws IgniteCheckedException If failed.
- */
- public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException;
-
- /**
- * @param taskCtx Task context.
- * @return Task input.
- * @throws IgniteCheckedException If failed.
- */
- public HadoopTaskInput input(HadoopTaskContext taskCtx)
- throws IgniteCheckedException;
-
- /** {@inheritDoc} */
- @Override public void close();
-
- /**
- * Adder.
- */
- public interface Adder extends HadoopTaskOutput {
- /**
- * @param in Data input.
- * @param reuse Reusable key.
- * @return Key.
- * @throws IgniteCheckedException If failed.
- */
- public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException;
- }
-
- /**
- * Key add values to.
- */
- public interface Key {
- /**
- * @param val Value.
- */
- public void add(Value val);
- }
-
- /**
- * Value.
- */
- public interface Value {
- /**
- * @return Size in bytes.
- */
- public int size();
-
- /**
- * @param ptr Pointer.
- */
- public void copyTo(long ptr);
- }
-
- /**
- * Key and values visitor.
- */
- public interface Visitor {
- /**
- * @param keyPtr Key pointer.
- * @param keySize Key size.
- */
- public void onKey(long keyPtr, int keySize) throws IgniteCheckedException;
-
- /**
- * @param valPtr Value pointer.
- * @param valSize Value size.
- */
- public void onValue(long valPtr, int valSize) throws IgniteCheckedException;
- }
-}
\ No newline at end of file