You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:29 UTC
[13/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings (2).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
deleted file mode 100644
index 342cbab..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.IgniteFs.*;
-
-/**
- * Default map-reduce planner implementation.
- */
-public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner {
- /** Injected grid. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** Logger. */
- @SuppressWarnings("UnusedDeclaration")
- @LoggerResource
- private IgniteLogger log;
-
- /** {@inheritDoc} */
- @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top,
- @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException {
- // Convert collection of topology nodes to collection of topology node IDs.
- Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
-
- for (ClusterNode topNode : top)
- topIds.add(topNode.id());
-
- Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input());
-
- int rdcCnt = job.info().reducers();
-
- if (rdcCnt < 0)
- throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt);
-
- Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt);
-
- return new GridHadoopDefaultMapReducePlan(mappers, reducers);
- }
-
- /**
- * Create plan for mappers.
- *
- * @param top Topology nodes.
- * @param topIds Topology node IDs.
- * @param splits Splits.
- * @return Mappers map.
- * @throws IgniteCheckedException If failed.
- */
- private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
- Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException {
- Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>();
-
- Map<String, Collection<UUID>> nodes = hosts(top);
-
- Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load.
-
- for (UUID nodeId : topIds)
- nodeLoads.put(nodeId, 0);
-
- for (GridHadoopInputSplit split : splits) {
- UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
-
- if (log.isDebugEnabled())
- log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']');
-
- Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId);
-
- if (nodeSplits == null) {
- nodeSplits = new ArrayList<>();
-
- mappers.put(nodeId, nodeSplits);
- }
-
- nodeSplits.add(split);
-
- // Updated node load.
- nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
- }
-
- return mappers;
- }
-
- /**
- * Groups nodes by host names.
- *
- * @param top Topology to group.
- * @return Map.
- */
- private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) {
- Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
- for (ClusterNode node : top) {
- for (String host : node.hostNames()) {
- Collection<UUID> nodeIds = grouped.get(host);
-
- if (nodeIds == null) {
- // Expecting 1-2 nodes per host.
- nodeIds = new ArrayList<>(2);
-
- grouped.put(host, nodeIds);
- }
-
- nodeIds.add(node.id());
- }
- }
-
- return grouped;
- }
-
- /**
- * Determine the best node for this split.
- *
- * @param split Split.
- * @param topIds Topology node IDs.
- * @param nodes Nodes.
- * @param nodeLoads Node load tracker.
- * @return Node ID.
- */
- @SuppressWarnings("unchecked")
- private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes,
- Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
- if (split instanceof GridHadoopFileBlock) {
- GridHadoopFileBlock split0 = (GridHadoopFileBlock)split;
-
- if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
- IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(split0.file().getAuthority());
-
- IgfsEx igfs = null;
-
- if (F.eq(ignite.name(), endpoint.grid()))
- igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
-
- if (igfs != null && !igfs.isProxy(split0.file())) {
- Collection<IgfsBlockLocation> blocks;
-
- try {
- blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length());
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
-
- assert blocks != null;
-
- if (blocks.size() == 1)
- // Fast-path, split consists of one IGFS block (as in most cases).
- return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false);
- else {
- // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes.
- Map<UUID, Long> nodeMap = new HashMap<>();
-
- List<UUID> bestNodeIds = null;
- long bestLen = -1L;
-
- for (IgfsBlockLocation block : blocks) {
- for (UUID blockNodeId : block.nodeIds()) {
- if (topIds.contains(blockNodeId)) {
- Long oldLen = nodeMap.get(blockNodeId);
- long newLen = oldLen == null ? block.length() : oldLen + block.length();
-
- nodeMap.put(blockNodeId, newLen);
-
- if (bestNodeIds == null || bestLen < newLen) {
- bestNodeIds = new ArrayList<>(1);
-
- bestNodeIds.add(blockNodeId);
-
- bestLen = newLen;
- }
- else if (bestLen == newLen) {
- assert !F.isEmpty(bestNodeIds);
-
- bestNodeIds.add(blockNodeId);
- }
- }
- }
- }
-
- if (bestNodeIds != null) {
- return bestNodeIds.size() == 1 ? bestNodeIds.get(0) :
- bestNode(bestNodeIds, topIds, nodeLoads, true);
- }
- }
- }
- }
- }
-
- // Cannot use local IGFS for some reason, try selecting the node by host.
- Collection<UUID> blockNodes = null;
-
- for (String host : split.hosts()) {
- Collection<UUID> hostNodes = nodes.get(host);
-
- if (!F.isEmpty(hostNodes)) {
- if (blockNodes == null)
- blockNodes = new ArrayList<>(hostNodes);
- else
- blockNodes.addAll(hostNodes);
- }
- }
-
- return bestNode(blockNodes, topIds, nodeLoads, false);
- }
-
- /**
- * Finds the best (the least loaded) node among the candidates.
- *
- * @param candidates Candidates.
- * @param topIds Topology node IDs.
- * @param nodeLoads Known node loads.
- * @param skipTopCheck Whether to skip topology check.
- * @return The best node.
- */
- private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads,
- boolean skipTopCheck) {
- UUID bestNode = null;
- int bestLoad = Integer.MAX_VALUE;
-
- if (candidates != null) {
- for (UUID candidate : candidates) {
- if (skipTopCheck || topIds.contains(candidate)) {
- int load = nodeLoads.get(candidate);
-
- if (bestNode == null || bestLoad > load) {
- bestNode = candidate;
- bestLoad = load;
-
- if (bestLoad == 0)
- break; // Minimum load possible, no need for further iterations.
- }
- }
- }
- }
-
- if (bestNode == null) {
- // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one.
- bestLoad = Integer.MAX_VALUE;
-
- for (UUID nodeId : topIds) {
- int load = nodeLoads.get(nodeId);
-
- if (bestNode == null || bestLoad > load) {
- bestNode = nodeId;
- bestLoad = load;
-
- if (bestLoad == 0)
- break; // Minimum load possible, no need for further iterations.
- }
- }
- }
-
- assert bestNode != null;
-
- return bestNode;
- }
-
- /**
- * Create plan for reducers.
- *
- * @param top Topology.
- * @param mappers Mappers map.
- * @param reducerCnt Reducers count.
- * @return Reducers map.
- */
- private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
- Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) {
- // Determine initial node weights.
- int totalWeight = 0;
-
- List<WeightedNode> nodes = new ArrayList<>(top.size());
-
- for (ClusterNode node : top) {
- Collection<GridHadoopInputSplit> split = mappers.get(node.id());
-
- int weight = reducerNodeWeight(node, split != null ? split.size() : 0);
-
- nodes.add(new WeightedNode(node.id(), weight, weight));
-
- totalWeight += weight;
- }
-
- // Adjust weights.
- int totalAdjustedWeight = 0;
-
- for (WeightedNode node : nodes) {
- node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
-
- node.weight = Math.round(node.floatWeight);
-
- totalAdjustedWeight += node.weight;
- }
-
- // Apply redundant/lost reducers.
- Collections.sort(nodes);
-
- if (totalAdjustedWeight > reducerCnt) {
- // Too much reducers set.
- ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1);
-
- while (totalAdjustedWeight != reducerCnt) {
- if (!iter.hasPrevious())
- iter = nodes.listIterator(nodes.size() - 1);
-
- WeightedNode node = iter.previous();
-
- if (node.weight > 0) {
- node.weight -= 1;
-
- totalAdjustedWeight--;
- }
- }
- }
- else if (totalAdjustedWeight < reducerCnt) {
- // Not enough reducers set.
- ListIterator<WeightedNode> iter = nodes.listIterator(0);
-
- while (totalAdjustedWeight != reducerCnt) {
- if (!iter.hasNext())
- iter = nodes.listIterator(0);
-
- WeightedNode node = iter.next();
-
- if (node.floatWeight > 0.0f) {
- node.weight += 1;
-
- totalAdjustedWeight++;
- }
- }
- }
-
- int idx = 0;
-
- Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
-
- for (WeightedNode node : nodes) {
- if (node.weight > 0) {
- int[] arr = new int[node.weight];
-
- for (int i = 0; i < arr.length; i++)
- arr[i] = idx++;
-
- reducers.put(node.nodeId, arr);
- }
- }
-
- return reducers;
- }
-
- /**
- * Calculate node weight based on node metrics and data co-location.
- *
- * @param node Node.
- * @param splitCnt Splits mapped to this node.
- * @return Node weight.
- */
- @SuppressWarnings("UnusedParameters")
- protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
- return splitCnt;
- }
-
- /**
- * Weighted node.
- */
- private static class WeightedNode implements Comparable<WeightedNode> {
- /** Node ID. */
- private final UUID nodeId;
-
- /** Weight. */
- private int weight;
-
- /** Floating point weight. */
- private float floatWeight;
-
- /**
- * Constructor.
- *
- * @param nodeId Node ID.
- * @param weight Weight.
- * @param floatWeight Floating point weight.
- */
- private WeightedNode(UUID nodeId, int weight, float floatWeight) {
- this.nodeId = nodeId;
- this.weight = weight;
- this.floatWeight = floatWeight;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return nodeId.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(@NotNull WeightedNode other) {
- float res = other.floatWeight - floatWeight;
-
- return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
new file mode 100644
index 0000000..9ec2b5b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Map-reduce plan.
+ */
+public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mappers map. */
+ private Map<UUID, Collection<GridHadoopInputSplit>> mappers;
+
+ /** Reducers map. */
+ private Map<UUID, int[]> reducers;
+
+ /** Mappers count. */
+ private int mappersCnt;
+
+ /** Reducers count. */
+ private int reducersCnt;
+
+ /**
+ * @param mappers Mappers map.
+ * @param reducers Reducers map.
+ */
+ public HadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers,
+ Map<UUID, int[]> reducers) {
+ this.mappers = mappers;
+ this.reducers = reducers;
+
+ if (mappers != null) {
+ for (Collection<GridHadoopInputSplit> splits : mappers.values())
+ mappersCnt += splits.size();
+ }
+
+ if (reducers != null) {
+ for (int[] rdcrs : reducers.values())
+ reducersCnt += rdcrs.length;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int mappers() {
+ return mappersCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ return reducersCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeForReducer(int reducer) {
+ assert reducer >= 0 && reducer < reducersCnt : reducer;
+
+ for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
+ for (int r : entry.getValue()) {
+ if (r == reducer)
+ return entry.getKey();
+ }
+ }
+
+ throw new IllegalStateException("Not found reducer index: " + reducer);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) {
+ return mappers.get(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public int[] reducers(UUID nodeId) {
+ return reducers.get(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<UUID> mapperNodeIds() {
+ return mappers.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<UUID> reducerNodeIds() {
+ return reducers.keySet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
new file mode 100644
index 0000000..01a7471
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.IgniteFs.*;
+
+/**
+ * Default map-reduce planner implementation.
+ */
+public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner {
+ /** Injected grid. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Logger. */
+ @SuppressWarnings("UnusedDeclaration")
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top,
+ @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+ // Convert collection of topology nodes to collection of topology node IDs.
+ Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
+
+ for (ClusterNode topNode : top)
+ topIds.add(topNode.id());
+
+ Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input());
+
+ int rdcCnt = job.info().reducers();
+
+ if (rdcCnt < 0)
+ throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt);
+
+ Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt);
+
+ return new HadoopDefaultMapReducePlan(mappers, reducers);
+ }
+
+ /**
+ * Create plan for mappers.
+ *
+ * @param top Topology nodes.
+ * @param topIds Topology node IDs.
+ * @param splits Splits.
+ * @return Mappers map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
+ Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException {
+ Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>();
+
+ Map<String, Collection<UUID>> nodes = hosts(top);
+
+ Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load.
+
+ for (UUID nodeId : topIds)
+ nodeLoads.put(nodeId, 0);
+
+ for (GridHadoopInputSplit split : splits) {
+ UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
+
+ if (log.isDebugEnabled())
+ log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']');
+
+ Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId);
+
+ if (nodeSplits == null) {
+ nodeSplits = new ArrayList<>();
+
+ mappers.put(nodeId, nodeSplits);
+ }
+
+ nodeSplits.add(split);
+
+ // Updated node load.
+ nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
+ }
+
+ return mappers;
+ }
+
+ /**
+ * Groups nodes by host names.
+ *
+ * @param top Topology to group.
+ * @return Map.
+ */
+ private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) {
+ Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
+
+ for (ClusterNode node : top) {
+ for (String host : node.hostNames()) {
+ Collection<UUID> nodeIds = grouped.get(host);
+
+ if (nodeIds == null) {
+ // Expecting 1-2 nodes per host.
+ nodeIds = new ArrayList<>(2);
+
+ grouped.put(host, nodeIds);
+ }
+
+ nodeIds.add(node.id());
+ }
+ }
+
+ return grouped;
+ }
+
+ /**
+ * Determine the best node for this split.
+ *
+ * @param split Split.
+ * @param topIds Topology node IDs.
+ * @param nodes Nodes.
+ * @param nodeLoads Node load tracker.
+ * @return Node ID.
+ */
+ @SuppressWarnings("unchecked")
+ private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes,
+ Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
+ if (split instanceof GridHadoopFileBlock) {
+ GridHadoopFileBlock split0 = (GridHadoopFileBlock)split;
+
+ if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+ IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(split0.file().getAuthority());
+
+ IgfsEx igfs = null;
+
+ if (F.eq(ignite.name(), endpoint.grid()))
+ igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+ if (igfs != null && !igfs.isProxy(split0.file())) {
+ Collection<IgfsBlockLocation> blocks;
+
+ try {
+ blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length());
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ assert blocks != null;
+
+ if (blocks.size() == 1)
+ // Fast-path, split consists of one IGFS block (as in most cases).
+ return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false);
+ else {
+ // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes.
+ Map<UUID, Long> nodeMap = new HashMap<>();
+
+ List<UUID> bestNodeIds = null;
+ long bestLen = -1L;
+
+ for (IgfsBlockLocation block : blocks) {
+ for (UUID blockNodeId : block.nodeIds()) {
+ if (topIds.contains(blockNodeId)) {
+ Long oldLen = nodeMap.get(blockNodeId);
+ long newLen = oldLen == null ? block.length() : oldLen + block.length();
+
+ nodeMap.put(blockNodeId, newLen);
+
+ if (bestNodeIds == null || bestLen < newLen) {
+ bestNodeIds = new ArrayList<>(1);
+
+ bestNodeIds.add(blockNodeId);
+
+ bestLen = newLen;
+ }
+ else if (bestLen == newLen) {
+ assert !F.isEmpty(bestNodeIds);
+
+ bestNodeIds.add(blockNodeId);
+ }
+ }
+ }
+ }
+
+ if (bestNodeIds != null) {
+ return bestNodeIds.size() == 1 ? bestNodeIds.get(0) :
+ bestNode(bestNodeIds, topIds, nodeLoads, true);
+ }
+ }
+ }
+ }
+ }
+
+ // Cannot use local IGFS for some reason, try selecting the node by host.
+ Collection<UUID> blockNodes = null;
+
+ for (String host : split.hosts()) {
+ Collection<UUID> hostNodes = nodes.get(host);
+
+ if (!F.isEmpty(hostNodes)) {
+ if (blockNodes == null)
+ blockNodes = new ArrayList<>(hostNodes);
+ else
+ blockNodes.addAll(hostNodes);
+ }
+ }
+
+ return bestNode(blockNodes, topIds, nodeLoads, false);
+ }
+
+ /**
+ * Finds the best (the least loaded) node among the candidates.
+ *
+ * @param candidates Candidates.
+ * @param topIds Topology node IDs.
+ * @param nodeLoads Known node loads.
+ * @param skipTopCheck Whether to skip topology check.
+ * @return The best node.
+ */
+ private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads,
+ boolean skipTopCheck) {
+ UUID bestNode = null;
+ int bestLoad = Integer.MAX_VALUE;
+
+ if (candidates != null) {
+ for (UUID candidate : candidates) {
+ if (skipTopCheck || topIds.contains(candidate)) {
+ int load = nodeLoads.get(candidate);
+
+ if (bestNode == null || bestLoad > load) {
+ bestNode = candidate;
+ bestLoad = load;
+
+ if (bestLoad == 0)
+ break; // Minimum load possible, no need for further iterations.
+ }
+ }
+ }
+ }
+
+ if (bestNode == null) {
+ // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one.
+ bestLoad = Integer.MAX_VALUE;
+
+ for (UUID nodeId : topIds) {
+ int load = nodeLoads.get(nodeId);
+
+ if (bestNode == null || bestLoad > load) {
+ bestNode = nodeId;
+ bestLoad = load;
+
+ if (bestLoad == 0)
+ break; // Minimum load possible, no need for further iterations.
+ }
+ }
+ }
+
+ assert bestNode != null;
+
+ return bestNode;
+ }
+
+ /**
+ * Create plan for reducers.
+ *
+ * @param top Topology.
+ * @param mappers Mappers map.
+ * @param reducerCnt Reducers count.
+ * @return Reducers map.
+ */
+ private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
+ Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) {
+ // Determine initial node weights.
+ int totalWeight = 0;
+
+ List<WeightedNode> nodes = new ArrayList<>(top.size());
+
+ for (ClusterNode node : top) {
+ Collection<GridHadoopInputSplit> split = mappers.get(node.id());
+
+ int weight = reducerNodeWeight(node, split != null ? split.size() : 0);
+
+ nodes.add(new WeightedNode(node.id(), weight, weight));
+
+ totalWeight += weight;
+ }
+
+ // Adjust weights.
+ int totalAdjustedWeight = 0;
+
+ for (WeightedNode node : nodes) {
+ node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
+
+ node.weight = Math.round(node.floatWeight);
+
+ totalAdjustedWeight += node.weight;
+ }
+
+ // Apply redundant/lost reducers.
+ Collections.sort(nodes);
+
+ if (totalAdjustedWeight > reducerCnt) {
+ // Too much reducers set.
+ ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1);
+
+ while (totalAdjustedWeight != reducerCnt) {
+ if (!iter.hasPrevious())
+ iter = nodes.listIterator(nodes.size() - 1);
+
+ WeightedNode node = iter.previous();
+
+ if (node.weight > 0) {
+ node.weight -= 1;
+
+ totalAdjustedWeight--;
+ }
+ }
+ }
+ else if (totalAdjustedWeight < reducerCnt) {
+ // Not enough reducers set.
+ ListIterator<WeightedNode> iter = nodes.listIterator(0);
+
+ while (totalAdjustedWeight != reducerCnt) {
+ if (!iter.hasNext())
+ iter = nodes.listIterator(0);
+
+ WeightedNode node = iter.next();
+
+ if (node.floatWeight > 0.0f) {
+ node.weight += 1;
+
+ totalAdjustedWeight++;
+ }
+ }
+ }
+
+ int idx = 0;
+
+ Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
+
+ for (WeightedNode node : nodes) {
+ if (node.weight > 0) {
+ int[] arr = new int[node.weight];
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = idx++;
+
+ reducers.put(node.nodeId, arr);
+ }
+ }
+
+ return reducers;
+ }
+
+ /**
+ * Calculate node weight based on node metrics and data co-location.
+ *
+ * @param node Node.
+ * @param splitCnt Splits mapped to this node.
+ * @return Node weight.
+ */
+ @SuppressWarnings("UnusedParameters")
+ protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
+ return splitCnt;
+ }
+
+ /**
+ * Weighted node.
+ */
+ private static class WeightedNode implements Comparable<WeightedNode> {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Weight. */
+ private int weight;
+
+ /** Floating point weight. */
+ private float floatWeight;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeId Node ID.
+ * @param weight Weight.
+ * @param floatWeight Floating point weight.
+ */
+ private WeightedNode(UUID nodeId, int weight, float floatWeight) {
+ this.nodeId = nodeId;
+ this.weight = weight;
+ this.floatWeight = floatWeight;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return nodeId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull WeightedNode other) {
+ float res = other.floatWeight - floatWeight;
+
+ return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java
deleted file mode 100644
index 37073d9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.util.*;
-
-/**
- * Task to get job counters.
- */
-public class GridHadoopProtocolJobCountersTask extends GridHadoopProtocolTaskAdapter<GridHadoopCounters> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop,
- GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
-
- UUID nodeId = UUID.fromString(args.<String>get(0));
- Integer id = args.get(1);
-
- assert nodeId != null;
- assert id != null;
-
- return hadoop.counters(new GridHadoopJobId(nodeId, id));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java
deleted file mode 100644
index de4f89c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-
-/**
- * Job status task.
- */
-public class GridHadoopProtocolJobStatusTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Default poll delay */
- private static final long DFLT_POLL_DELAY = 100L;
-
- /** Attribute for held status. */
- private static final String ATTR_HELD = "held";
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop,
- GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
- UUID nodeId = UUID.fromString(args.<String>get(0));
- Integer id = args.get(1);
- Long pollDelay = args.get(2);
-
- assert nodeId != null;
- assert id != null;
-
- GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
-
- if (pollDelay == null)
- pollDelay = DFLT_POLL_DELAY;
-
- if (pollDelay > 0) {
- IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
-
- if (fut != null) {
- if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true))
- return hadoop.status(jobId);
- else {
- fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut0) {
- jobCtx.callcc();
- }
- });
-
- jobCtx.setAttribute(ATTR_HELD, true);
-
- return jobCtx.holdcc(pollDelay);
- }
- }
- else
- return null;
- }
- else
- return hadoop.status(jobId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java
deleted file mode 100644
index 384bc23..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.util.*;
-
-/**
- * Kill job task.
- */
-public class GridHadoopProtocolKillJobTask extends GridHadoopProtocolTaskAdapter<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop,
- GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
- UUID nodeId = UUID.fromString(args.<String>get(0));
- Integer id = args.get(1);
-
- assert nodeId != null;
- assert id != null;
-
- GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
-
- return hadoop.kill(jobId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java
deleted file mode 100644
index f76f3b6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Task to get the next job ID.
- */
-public class GridHadoopProtocolNextTaskIdTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobId> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop,
- GridHadoopProtocolTaskArguments args) {
- return hadoop.nextJobId();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
deleted file mode 100644
index 8fdab9d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
-
-/**
- * Submit job task.
- */
-public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop,
- GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
- UUID nodeId = UUID.fromString(args.<String>get(0));
- Integer id = args.get(1);
- HadoopDefaultJobInfo info = args.get(2);
-
- assert nodeId != null;
- assert id != null;
- assert info != null;
-
- GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
-
- hadoop.submit(jobId, info);
-
- GridHadoopJobStatus res = hadoop.status(jobId);
-
- if (res == null) // Submission failed.
- res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
-
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java
deleted file mode 100644
index 086545c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Hadoop protocol task adapter.
- */
-public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<GridHadoopProtocolTaskArguments, R> {
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable GridHadoopProtocolTaskArguments arg) {
- return Collections.singletonMap(new Job(arg), subgrid.get(0));
- }
-
- /** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
- return ComputeJobResultPolicy.REDUCE;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public R reduce(List<ComputeJobResult> results) {
- if (!F.isEmpty(results)) {
- ComputeJobResult res = results.get(0);
-
- return res.getData();
- }
- else
- return null;
- }
-
- /**
- * Job wrapper.
- */
- private class Job implements ComputeJob {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** */
- @SuppressWarnings("UnusedDeclaration")
- @JobContextResource
- private ComputeJobContext jobCtx;
-
- /** Argument. */
- private final GridHadoopProtocolTaskArguments args;
-
- /**
- * Constructor.
- *
- * @param args Job argument.
- */
- private Job(GridHadoopProtocolTaskArguments args) {
- this.args = args;
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Object execute() {
- try {
- return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
- }
-
- /**
- * Run the task.
- *
- * @param jobCtx Job context.
- * @param hadoop Hadoop facade.
- * @param args Arguments.
- * @return Job result.
- * @throws IgniteCheckedException If failed.
- */
- public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, GridHadoopProtocolTaskArguments args)
- throws IgniteCheckedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java
deleted file mode 100644
index ae91a52..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Task arguments.
- */
-public class GridHadoopProtocolTaskArguments implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Arguments. */
- private Object[] args;
-
- /**
- * {@link Externalizable} support.
- */
- public GridHadoopProtocolTaskArguments() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param args Arguments.
- */
- public GridHadoopProtocolTaskArguments(Object... args) {
- this.args = args;
- }
-
- /**
- * @param idx Argument index.
- * @return Argument.
- */
- @SuppressWarnings("unchecked")
- @Nullable public <T> T get(int idx) {
- return (args != null && args.length > idx) ? (T)args[idx] : null;
- }
-
- /**
- * @return Size.
- */
- public int size() {
- return args != null ? args.length : 0;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeArray(out, args);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- args = U.readArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopProtocolTaskArguments.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
index 66fb230..3a766c3 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
@@ -82,7 +82,7 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
- GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null);
+ GridHadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null);
conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
@@ -99,8 +99,8 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
- GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(),
- new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
+ GridHadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(),
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
if (status == null)
throw new IOException("Failed to submit job (null status obtained): " + jobId);
@@ -135,8 +135,8 @@ public class HadoopClientProtocol implements ClientProtocol {
/** {@inheritDoc} */
@Override public void killJob(JobID jobId) throws IOException, InterruptedException {
try {
- cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(),
- new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+ cli.compute().execute(HadoopProtocolKillJobTask.class.getName(),
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
}
catch (GridClientException e) {
throw new IOException("Failed to kill job: " + jobId, e);
@@ -159,11 +159,11 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
- GridHadoopProtocolTaskArguments args = delay >= 0 ?
- new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
- new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
+ HadoopProtocolTaskArguments args = delay >= 0 ?
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
- GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args);
+ GridHadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args);
if (status == null)
throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
@@ -178,8 +178,8 @@ public class HadoopClientProtocol implements ClientProtocol {
/** {@inheritDoc} */
@Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
try {
- final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(),
- new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+ final GridHadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(),
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
if (counters == null)
throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
new file mode 100644
index 0000000..56da194
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.util.*;
+
+/**
+ * Task to get job counters.
+ */
+public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<GridHadoopCounters> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+
+ UUID nodeId = UUID.fromString(args.<String>get(0));
+ Integer id = args.get(1);
+
+ assert nodeId != null;
+ assert id != null;
+
+ return hadoop.counters(new GridHadoopJobId(nodeId, id));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
new file mode 100644
index 0000000..ac70c44
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Job status task.
+ */
+public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridHadoopJobStatus> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Default poll delay */
+ private static final long DFLT_POLL_DELAY = 100L;
+
+ /** Attribute for held status. */
+ private static final String ATTR_HELD = "held";
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop,
+ HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+ UUID nodeId = UUID.fromString(args.<String>get(0));
+ Integer id = args.get(1);
+ Long pollDelay = args.get(2);
+
+ assert nodeId != null;
+ assert id != null;
+
+ GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
+
+ if (pollDelay == null)
+ pollDelay = DFLT_POLL_DELAY;
+
+ if (pollDelay > 0) {
+ IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+ if (fut != null) {
+ if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true))
+ return hadoop.status(jobId);
+ else {
+ fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut0) {
+ jobCtx.callcc();
+ }
+ });
+
+ jobCtx.setAttribute(ATTR_HELD, true);
+
+ return jobCtx.holdcc(pollDelay);
+ }
+ }
+ else
+ return null;
+ }
+ else
+ return hadoop.status(jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
new file mode 100644
index 0000000..8522ab0
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.util.*;
+
+/**
+ * Kill job task.
+ */
+public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+ UUID nodeId = UUID.fromString(args.<String>get(0));
+ Integer id = args.get(1);
+
+ assert nodeId != null;
+ assert id != null;
+
+ GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
+
+ return hadoop.kill(jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
new file mode 100644
index 0000000..357e12d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Task to get the next job ID.
+ */
+public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<GridHadoopJobId> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ HadoopProtocolTaskArguments args) {
+ return hadoop.nextJobId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
new file mode 100644
index 0000000..df03c79
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
+
+/**
+ * Submit job task.
+ */
+public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<GridHadoopJobStatus> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+ UUID nodeId = UUID.fromString(args.<String>get(0));
+ Integer id = args.get(1);
+ HadoopDefaultJobInfo info = args.get(2);
+
+ assert nodeId != null;
+ assert id != null;
+ assert info != null;
+
+ GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
+
+ hadoop.submit(jobId, info);
+
+ GridHadoopJobStatus res = hadoop.status(jobId);
+
+ if (res == null) // Submission failed.
+ res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
new file mode 100644
index 0000000..6938d1c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Hadoop protocol task adapter.
+ */
+public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable HadoopProtocolTaskArguments arg) {
+ return Collections.singletonMap(new Job(arg), subgrid.get(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ return ComputeJobResultPolicy.REDUCE;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public R reduce(List<ComputeJobResult> results) {
+ if (!F.isEmpty(results)) {
+ ComputeJobResult res = results.get(0);
+
+ return res.getData();
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Job wrapper.
+ */
+ private class Job implements ComputeJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ @SuppressWarnings("UnusedDeclaration")
+ @JobContextResource
+ private ComputeJobContext jobCtx;
+
+ /** Argument. */
+ private final HadoopProtocolTaskArguments args;
+
+ /**
+ * Constructor.
+ *
+ * @param args Job argument.
+ */
+ private Job(HadoopProtocolTaskArguments args) {
+ this.args = args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ try {
+ return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /**
+ * Run the task.
+ *
+ * @param jobCtx Job context.
+ * @param hadoop Hadoop facade.
+ * @param args Arguments.
+ * @return Job result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, HadoopProtocolTaskArguments args)
+ throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
new file mode 100644
index 0000000..5c470ba
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Task arguments.
+ */
+public class HadoopProtocolTaskArguments implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Arguments. */
+ private Object[] args;
+
+ /**
+ * {@link Externalizable} support.
+ */
+ public HadoopProtocolTaskArguments() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param args Arguments.
+ */
+ public HadoopProtocolTaskArguments(Object... args) {
+ this.args = args;
+ }
+
+ /**
+ * @param idx Argument index.
+ * @return Argument.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable public <T> T get(int idx) {
+ return (args != null && args.length > idx) ? (T)args[idx] : null;
+ }
+
+ /**
+ * @return Size.
+ */
+ public int size() {
+ return args != null ? args.length : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeArray(out, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ args = U.readArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProtocolTaskArguments.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java
deleted file mode 100644
index a8a52a9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Acknowledgement message.
- */
-public class GridHadoopShuffleAck implements GridHadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- @GridToStringInclude
- private long msgId;
-
- /** */
- @GridToStringInclude
- private GridHadoopJobId jobId;
-
- /**
- *
- */
- public GridHadoopShuffleAck() {
- // No-op.
- }
-
- /**
- * @param msgId Message ID.
- */
- public GridHadoopShuffleAck(long msgId, GridHadoopJobId jobId) {
- assert jobId != null;
-
- this.msgId = msgId;
- this.jobId = jobId;
- }
-
- /**
- * @return Message ID.
- */
- public long id() {
- return msgId;
- }
-
- /**
- * @return Job ID.
- */
- public GridHadoopJobId jobId() {
- return jobId;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
- out.writeLong(msgId);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new GridHadoopJobId();
-
- jobId.readExternal(in);
- msgId = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopShuffleAck.class, this);
- }
-}