You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/09 08:38:29 UTC
[15/50] [abbrv] ignite git commit: IGNITE-4054: Hadoop: added
map-reduce plan debug output.
IGNITE-4054: Hadoop: added map-reduce plan debug output.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf266e97
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf266e97
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf266e97
Branch: refs/heads/master
Commit: bf266e971579cad0f40d233090c93f3acec06d0a
Parents: 2a90fca
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Oct 17 11:26:12 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Oct 26 11:07:50 2016 +0300
----------------------------------------------------------------------
.../hadoop/HadoopMapReducePlanner.java | 1 +
.../processors/hadoop/HadoopExternalSplit.java | 8 +++
.../processors/hadoop/HadoopSplitWrapper.java | 9 +++
.../hadoop/jobtracker/HadoopJobTracker.java | 61 ++++++++++++++++++++
4 files changed, 79 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
index 185994f..0009c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
@@ -33,6 +33,7 @@ public interface HadoopMapReducePlanner {
* @param top Topology.
* @param oldPlan Old plan in case of partial failure.
* @return Map reduce plan.
+ * @throws IgniteCheckedException If an error occurs.
*/
public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
@Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
index bd767b3..a9b4532 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Arrays;
/**
* Split serialized in external file.
@@ -85,4 +88,9 @@ public class HadoopExternalSplit extends HadoopInputSplit {
@Override public int hashCode() {
return (int)(off ^ (off >>> 32));
}
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(HadoopExternalSplit.class, this, "hosts", Arrays.toString(hosts));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
index 511aa5a..fb6d0f3 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.hadoop;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Arrays;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -33,6 +36,7 @@ public class HadoopSplitWrapper extends HadoopInputSplit {
private static final long serialVersionUID = 0L;
/** Native hadoop input split. */
+ @GridToStringExclude
private byte[] bytes;
/** */
@@ -116,4 +120,9 @@ public class HadoopSplitWrapper extends HadoopInputSplit {
@Override public int hashCode() {
return id;
}
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(HadoopSplitWrapper.class, this, "hosts", Arrays.toString(hosts));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index bffb82b..36782bf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -313,6 +314,8 @@ public class HadoopJobTracker extends HadoopComponent {
HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
+ logPlan(info, mrPlan);
+
HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
meta.mapReducePlan(mrPlan);
@@ -354,6 +357,64 @@ public class HadoopJobTracker extends HadoopComponent {
}
/**
+ * Log map-reduce plan if needed.
+ *
+ * @param info Job info.
+ * @param plan Plan.
+ */
+ @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
+ private void logPlan(HadoopJobInfo info, HadoopMapReducePlan plan) {
+ if (log.isDebugEnabled()) {
+ Map<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> map = new HashMap<>();
+
+ for (UUID nodeId : plan.mapperNodeIds())
+ map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(plan.mappers(nodeId), null));
+
+ for (UUID nodeId : plan.reducerNodeIds()) {
+ int[] reducers = plan.reducers(nodeId);
+
+ IgniteBiTuple<Collection<HadoopInputSplit>, int[]> entry = map.get(nodeId);
+
+ if (entry == null)
+ map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(null, reducers));
+ else
+ entry.set2(reducers);
+ }
+
+ StringBuilder details = new StringBuilder("[");
+
+ boolean first = true;
+
+ for (Map.Entry<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> entry : map.entrySet()) {
+ if (first)
+ first = false;
+ else
+ details.append(", ");
+
+ UUID nodeId = entry.getKey();
+
+ Collection<HadoopInputSplit> mappers = entry.getValue().get1();
+
+ if (mappers == null)
+ mappers = Collections.emptyList();
+
+ int[] reducers = entry.getValue().get2();
+
+ if (reducers == null)
+ reducers = new int[0];
+
+ details.append("[nodeId=" + nodeId + ", mappers=" + mappers.size() + ", reducers=" + reducers.length +
+ ", mapperDetails=" + mappers + ", reducerDetails=" + Arrays.toString(reducers) + ']');
+ }
+
+ details.append(']');
+
+ log.debug("Prepared map-reduce plan [jobName=" + info.jobName() + ", mappers=" + plan.mappers() +
+ ", reducers=" + plan.reducers() + ", details=" + details + ']');
+ }
+ }
+
+ /**
* Convert Hadoop job metadata to job status.
*
* @param meta Metadata.