You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/09 08:38:29 UTC

[15/50] [abbrv] ignite git commit: IGNITE-4054: Hadoop: added map-reduce plan debug output.

IGNITE-4054: Hadoop: added map-reduce plan debug output.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf266e97
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf266e97
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf266e97

Branch: refs/heads/master
Commit: bf266e971579cad0f40d233090c93f3acec06d0a
Parents: 2a90fca
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Oct 17 11:26:12 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Oct 26 11:07:50 2016 +0300

----------------------------------------------------------------------
 .../hadoop/HadoopMapReducePlanner.java          |  1 +
 .../processors/hadoop/HadoopExternalSplit.java  |  8 +++
 .../processors/hadoop/HadoopSplitWrapper.java   |  9 +++
 .../hadoop/jobtracker/HadoopJobTracker.java     | 61 ++++++++++++++++++++
 4 files changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
index 185994f..0009c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
@@ -33,6 +33,7 @@ public interface HadoopMapReducePlanner {
      * @param top Topology.
      * @param oldPlan Old plan in case of partial failure.
      * @return Map reduce plan.
+     * @throws IgniteCheckedException If an error occurs.
      */
     public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
         @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
index bd767b3..a9b4532 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
+
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Arrays;
 
 /**
  * Split serialized in external file.
@@ -85,4 +88,9 @@ public class HadoopExternalSplit extends HadoopInputSplit {
     @Override public int hashCode() {
         return (int)(off ^ (off >>> 32));
     }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(HadoopExternalSplit.class, this, "hosts", Arrays.toString(hosts));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
index 511aa5a..fb6d0f3 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.hadoop;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Arrays;
 
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -33,6 +36,7 @@ public class HadoopSplitWrapper extends HadoopInputSplit {
     private static final long serialVersionUID = 0L;
 
     /** Native hadoop input split. */
+    @GridToStringExclude
     private byte[] bytes;
 
     /** */
@@ -116,4 +120,9 @@ public class HadoopSplitWrapper extends HadoopInputSplit {
     @Override public int hashCode() {
         return id;
     }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(HadoopSplitWrapper.class, this, "hosts", Arrays.toString(hosts));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf266e97/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index bffb82b..36782bf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -313,6 +314,8 @@ public class HadoopJobTracker extends HadoopComponent {
 
             HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
 
+            logPlan(info, mrPlan);
+
             HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
 
             meta.mapReducePlan(mrPlan);
@@ -354,6 +357,64 @@ public class HadoopJobTracker extends HadoopComponent {
     }
 
     /**
+     * Log map-reduce plan if needed.
+     *
+     * @param info Job info.
+     * @param plan Plan.
+     */
+    @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
+    private void logPlan(HadoopJobInfo info, HadoopMapReducePlan plan) {
+        if (log.isDebugEnabled()) {
+            Map<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> map = new HashMap<>();
+
+            for (UUID nodeId : plan.mapperNodeIds())
+                map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(plan.mappers(nodeId), null));
+
+            for (UUID nodeId : plan.reducerNodeIds()) {
+                int[] reducers = plan.reducers(nodeId);
+
+                IgniteBiTuple<Collection<HadoopInputSplit>, int[]> entry = map.get(nodeId);
+
+                if (entry == null)
+                    map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(null, reducers));
+                else
+                    entry.set2(reducers);
+            }
+
+            StringBuilder details = new StringBuilder("[");
+
+            boolean first = true;
+
+            for (Map.Entry<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> entry : map.entrySet()) {
+                if (first)
+                    first = false;
+                else
+                    details.append(", ");
+
+                UUID nodeId = entry.getKey();
+
+                Collection<HadoopInputSplit> mappers = entry.getValue().get1();
+
+                if (mappers == null)
+                    mappers = Collections.emptyList();
+
+                int[] reducers = entry.getValue().get2();
+
+                if (reducers == null)
+                    reducers = new int[0];
+
+                details.append("[nodeId=" + nodeId + ", mappers=" + mappers.size() + ", reducers=" + reducers.length +
+                    ", mapperDetails=" + mappers + ", reducerDetails=" + Arrays.toString(reducers) + ']');
+            }
+
+            details.append(']');
+
+            log.debug("Prepared map-reduce plan [jobName=" + info.jobName() + ", mappers=" + plan.mappers() +
+                ", reducers=" + plan.reducers() + ", details=" + details + ']');
+        }
+    }
+
+    /**
      * Convert Hadoop job metadata to job status.
      *
      * @param meta Metadata.