You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC
svn commit: r1077079 [9/11] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/
src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/
src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/
src/contrib/gr...
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * {@link AbstractClusterStory} provides a partial implementation of
+ * {@link ClusterStory} by parsing the topology tree.
+ */
+public abstract class AbstractClusterStory implements ClusterStory {
+ protected Set<MachineNode> machineNodes;
+ protected Set<RackNode> rackNodes;
+ protected MachineNode[] mNodesFlattened;
+ protected Map<String, MachineNode> mNodeMap;
+ protected Map<String, RackNode> rNodeMap;
+ protected int maximumDistance = 0;
+ protected Random random;
+
+ @Override
+ public Set<MachineNode> getMachines() {
+ parseTopologyTree();
+ return machineNodes;
+ }
+
+ @Override
+ public synchronized Set<RackNode> getRacks() {
+ parseTopologyTree();
+ return rackNodes;
+ }
+
+ @Override
+ public synchronized MachineNode[] getRandomMachines(int expected) {
+ if (expected == 0) {
+ return new MachineNode[0];
+ }
+
+ parseTopologyTree();
+ int total = machineNodes.size();
+ int select = Math.min(expected, total);
+
+ if (mNodesFlattened == null) {
+ mNodesFlattened = machineNodes.toArray(new MachineNode[total]);
+ random = new Random();
+ }
+
+ MachineNode[] retval = new MachineNode[select];
+ int i = 0;
+ while ((i != select) && (total != i + select)) {
+ int index = random.nextInt(total - i);
+ MachineNode tmp = mNodesFlattened[index];
+ mNodesFlattened[index] = mNodesFlattened[total - i - 1];
+ mNodesFlattened[total - i - 1] = tmp;
+ ++i;
+ }
+ if (i == select) {
+ System.arraycopy(mNodesFlattened, total - i, retval, 0, select);
+ } else {
+ System.arraycopy(mNodesFlattened, 0, retval, 0, select);
+ }
+
+ return retval;
+ }
+
+ protected synchronized void buildMachineNodeMap() {
+ if (mNodeMap == null) {
+ mNodeMap = new HashMap<String, MachineNode>(machineNodes.size());
+ for (MachineNode mn : machineNodes) {
+ mNodeMap.put(mn.getName(), mn);
+ }
+ }
+ }
+
+ @Override
+ public MachineNode getMachineByName(String name) {
+ buildMachineNodeMap();
+ return mNodeMap.get(name);
+ }
+
+ @Override
+ public int distance(Node a, Node b) {
+ int lvl_a = a.getLevel();
+ int lvl_b = b.getLevel();
+ int retval = 0;
+ if (lvl_a > lvl_b) {
+ retval = lvl_a-lvl_b;
+ for (int i=0; i<retval; ++i) {
+ a = a.getParent();
+ }
+ } else if (lvl_a < lvl_b) {
+ retval = lvl_b-lvl_a;
+ for (int i=0; i<retval; ++i) {
+ b = b.getParent();
+ }
+ }
+
+ while (a != b) {
+ a = a.getParent();
+ b = b.getParent();
+ ++retval;
+ }
+
+ return retval;
+ }
+
+ protected synchronized void buildRackNodeMap() {
+ if (rNodeMap == null) {
+ rNodeMap = new HashMap<String, RackNode>(rackNodes.size());
+ for (RackNode rn : rackNodes) {
+ rNodeMap.put(rn.getName(), rn);
+ }
+ }
+ }
+
+ @Override
+ public RackNode getRackByName(String name) {
+ buildRackNodeMap();
+ return rNodeMap.get(name);
+ }
+
+ @Override
+ public int getMaximumDistance() {
+ parseTopologyTree();
+ return maximumDistance;
+ }
+
+ protected synchronized void parseTopologyTree() {
+ if (machineNodes == null) {
+ Node root = getClusterTopology();
+ SortedSet<MachineNode> mNodes = new TreeSet<MachineNode>();
+ SortedSet<RackNode> rNodes = new TreeSet<RackNode>();
+ // dfs search of the tree.
+ Deque<Node> unvisited = new ArrayDeque<Node>();
+ Deque<Integer> distUnvisited = new ArrayDeque<Integer>();
+ unvisited.add(root);
+ distUnvisited.add(0);
+ for (Node n = unvisited.poll(); n != null; n = unvisited.poll()) {
+ int distance = distUnvisited.poll();
+ if (n instanceof RackNode) {
+ rNodes.add((RackNode) n);
+ mNodes.addAll(((RackNode) n).getMachinesInRack());
+ if (distance + 1 > maximumDistance) {
+ maximumDistance = distance + 1;
+ }
+ } else if (n instanceof MachineNode) {
+ mNodes.add((MachineNode) n);
+ if (distance > maximumDistance) {
+ maximumDistance = distance;
+ }
+ } else {
+ for (Node child : n.getChildren()) {
+ unvisited.addFirst(child);
+ distUnvisited.addFirst(distance+1);
+ }
+ }
+ }
+
+ machineNodes = Collections.unmodifiableSortedSet(mNodes);
+ rackNodes = Collections.unmodifiableSortedSet(rNodes);
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class CDFPiecewiseLinearRandomGenerator extends CDFRandomGenerator {
+
+ /**
+ * @param cdf
+ * builds a CDFRandomValue engine around this
+ * {@link LoggedDiscreteCDF}, with a defaultly seeded RNG
+ */
+ public CDFPiecewiseLinearRandomGenerator(LoggedDiscreteCDF cdf) {
+ super(cdf);
+ }
+
+ /**
+ * @param cdf
+ * builds a CDFRandomValue engine around this
+ * {@link LoggedDiscreteCDF}, with an explicitly seeded RNG
+ * @param seed
+ * the random number generator seed
+ */
+ public CDFPiecewiseLinearRandomGenerator(LoggedDiscreteCDF cdf, long seed) {
+ super(cdf, seed);
+ }
+
+ /**
+ * TODO This code assumes that the empirical minimum resp. maximum is the
+ * epistomological minimum resp. maximum. This is probably okay for the
+ * minimum, because that likely represents a task where everything went well,
+ * but for the maximum we may want to develop a way of extrapolating past the
+ * maximum.
+ */
+ @Override
+ public long valueAt(double probability) {
+ int rangeFloor = floorIndex(probability);
+
+ double segmentProbMin = getRankingAt(rangeFloor);
+ double segmentProbMax = getRankingAt(rangeFloor + 1);
+
+ long segmentMinValue = getDatumAt(rangeFloor);
+ long segmentMaxValue = getDatumAt(rangeFloor + 1);
+
+ // If this is zero, this object is based on an ill-formed cdf
+ double segmentProbRange = segmentProbMax - segmentProbMin;
+ long segmentDatumRange = segmentMaxValue - segmentMinValue;
+
+ long result = (long) ((probability - segmentProbMin) / segmentProbRange * segmentDatumRange)
+ + segmentMinValue;
+
+ return result;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * An instance of this class generates random values that confirm to the
+ * embedded {@link LoggedDiscreteCDF} . The discrete CDF is a pointwise
+ * approximation of the "real" CDF. We therefore have a choice of interpolation
+ * rules.
+ *
+ * A concrete subclass of this abstract class will implement valueAt(double)
+ * using a class-dependent interpolation rule.
+ *
+ */
+public abstract class CDFRandomGenerator {
+ final double[] rankings;
+ final long[] values;
+
+ final Random random;
+
+ CDFRandomGenerator(LoggedDiscreteCDF cdf) {
+ this(cdf, new Random());
+ }
+
+ CDFRandomGenerator(LoggedDiscreteCDF cdf, long seed) {
+ this(cdf, new Random(seed));
+ }
+
+ private CDFRandomGenerator(LoggedDiscreteCDF cdf, Random random) {
+ this.random = random;
+ rankings = new double[cdf.getRankings().size() + 2];
+ values = new long[cdf.getRankings().size() + 2];
+ initializeTables(cdf);
+ }
+
+ protected final void initializeTables(LoggedDiscreteCDF cdf) {
+ rankings[0] = 0.0;
+ values[0] = cdf.getMinimum();
+ rankings[rankings.length - 1] = 1.0;
+ values[rankings.length - 1] = cdf.getMaximum();
+
+ List<LoggedSingleRelativeRanking> subjects = cdf.getRankings();
+
+ for (int i = 0; i < subjects.size(); ++i) {
+ rankings[i + 1] = subjects.get(i).getRelativeRanking();
+ values[i + 1] = subjects.get(i).getDatum();
+ }
+ }
+
+ protected int floorIndex(double probe) {
+ int result = Arrays.binarySearch(rankings, probe);
+
+ return Math.abs(result + 1) - 1;
+ }
+
+ protected double getRankingAt(int index) {
+ return rankings[index];
+ }
+
+ protected long getDatumAt(int index) {
+ return values[index];
+ }
+
+ public long randomValue() {
+ return valueAt(random.nextDouble());
+ }
+
+ public abstract long valueAt(double probability);
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+
+/**
+ * {@link ClusterStory} represents all configurations of a MapReduce cluster,
+ * including nodes, network topology, and slot configurations.
+ */
+public interface ClusterStory {
+ /**
+ * Get all machines of the cluster.
+ * @return A read-only set that contains all machines of the cluster.
+ */
+ public Set<MachineNode> getMachines();
+
+ /**
+ * Get all racks of the cluster.
+ * @return A read-only set that contains all racks of the cluster.
+ */
+ public Set<RackNode> getRacks();
+
+ /**
+ * Get the cluster topology tree.
+ * @return The root node of the cluster topology tree.
+ */
+ public Node getClusterTopology();
+
+ /**
+ * Select a random set of machines.
+ * @param expected The expected sample size.
+ * @return An array of up to expected number of {@link MachineNode}s.
+ */
+ public MachineNode[] getRandomMachines(int expected);
+
+ /**
+ * Get {@link MachineNode} by its host name.
+ *
+ * @return The {@line MachineNode} with the same name. Or null if not found.
+ */
+ public MachineNode getMachineByName(String name);
+
+ /**
+ * Get {@link RackNode} by its name.
+ * @return The {@line RackNode} with the same name. Or null if not found.
+ */
+ public RackNode getRackByName(String name);
+
+ /**
+ * Determine the distance between two {@link Node}s. Currently, the distance
+ * is loosely defined as the length of the longer path for either a or b to
+ * reach their common ancestor.
+ *
+ * @param a
+ * @param b
+ * @return The distance between {@link Node} a and {@link Node} b.
+ */
+ int distance(Node a, Node b);
+
+ /**
+ * Get the maximum distance possible between any two nodes.
+ * @return the maximum distance possible between any two nodes.
+ */
+ int getMaximumDistance();
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Reading JSON-encoded cluster topology and produce the parsed
+ * {@link LoggedNetworkTopology} object.
+ */
+public class ClusterTopologyReader {
+ private LoggedNetworkTopology topology;
+
+ private void readTopology(JsonObjectMapperParser<LoggedNetworkTopology> parser)
+ throws IOException {
+ try {
+ topology = parser.getNext();
+ if (topology == null) {
+ throw new IOException(
+ "Input file does not contain valid topology data.");
+ }
+ } finally {
+ parser.close();
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param path
+ * Path to the JSON-encoded topology file, possibly compressed.
+ * @param conf
+ * @throws IOException
+ */
+ public ClusterTopologyReader(Path path, Configuration conf)
+ throws IOException {
+ JsonObjectMapperParser<LoggedNetworkTopology> parser = new JsonObjectMapperParser<LoggedNetworkTopology>(
+ path, LoggedNetworkTopology.class, conf);
+ readTopology(parser);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param input
+ * The input stream for the JSON-encoded topology data.
+ */
+ public ClusterTopologyReader(InputStream input) throws IOException {
+ JsonObjectMapperParser<LoggedNetworkTopology> parser = new JsonObjectMapperParser<LoggedNetworkTopology>(
+ input, LoggedNetworkTopology.class);
+ readTopology(parser);
+ }
+
+ /**
+ * Get the {@link LoggedNetworkTopology} object.
+ *
+ * @return The {@link LoggedNetworkTopology} object parsed from the input.
+ */
+ public LoggedNetworkTopology get() {
+ return topology;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepCompare.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepCompare.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepCompare.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepCompare.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * Classes that implement this interface can deep-compare [for equality only,
+ * not order] with another instance. They do a deep compare. If there is any
+ * semantically significant difference, an implementer throws an Exception to be
+ * thrown with a chain of causes describing the chain of field references and
+ * indices that get you to the miscompared point.
+ *
+ */
+public interface DeepCompare {
+ /**
+ * @param other
+ * the other comparand that's being compared to me
+ * @param myLocation
+ * the path that got to me. In the root, myLocation is null. To
+ * process the scalar {@code foo} field of the root we will make a
+ * recursive call with a {@link TreePath} whose {@code fieldName} is
+ * {@code "bar"} and whose {@code index} is -1 and whose {@code
+ * parent} is {@code null}. To process the plural {@code bar} field
+ * of the root we will make a recursive call with a {@link TreePath}
+ * whose fieldName is {@code "foo"} and whose {@code index} is -1 and
+ * whose {@code parent} is also {@code null}.
+ * @throws DeepInequalityException
+ */
+ public void deepCompare(DeepCompare other, TreePath myLocation)
+ throws DeepInequalityException;
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepInequalityException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepInequalityException.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepInequalityException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepInequalityException.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * We use this exception class in the unit test, and we do a deep comparison
+ * when we run the
+ *
+ */
+public class DeepInequalityException extends Exception {
+
+ static final long serialVersionUID = 1352469876;
+
+ final TreePath path;
+
+ /**
+ * @param message
+ * an exception message
+ * @param path
+ * the path that gets from the root to the inequality
+ *
+ * This is the constructor that I intend to have used for this
+ * exception.
+ */
+ public DeepInequalityException(String message, TreePath path,
+ Throwable chainee) {
+ super(message, chainee);
+
+ this.path = path;
+ }
+
+ /**
+ * @param message
+ * an exception message
+ * @param path
+ * the path that gets from the root to the inequality
+ *
+ * This is the constructor that I intend to have used for this
+ * exception.
+ */
+ public DeepInequalityException(String message, TreePath path) {
+ super(message);
+
+ this.path = path;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,1814 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.LineReader;
+
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * This is the main class for rumen log mining functionality.
+ *
+ * It reads a directory of job tracker logs, and computes various information
+ * about it. See {@code usage()}, below.
+ *
+ */
+public class HadoopLogsAnalyzer extends Configured implements Tool {
+
+ // output streams
+ private PrintStream statusOutput = System.out;
+ private PrintStream statisticalOutput = System.out;
+
+ private static PrintStream staticDebugOutput = System.err;
+
+ /**
+ * The number of splits a task can have, before we ignore them all.
+ */
+ private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
+
+ /**
+ * This element is to compensate for the fact that our percentiles engine
+ * rounds up for the expected sample count, so if the total number of readings
+ * is small enough we need to compensate slightly when aggregating the spread
+ * data from jobs with few reducers together with jobs with many reducers.
+ */
+ private static final long SMALL_SPREAD_COMPENSATION_THRESHOLD = 5L;
+
+ /**
+ * {@code MAXIMUM_CLOCK_SKEW} is the maximum plausible difference between the
+ * clocks of machines in the same cluster. This is important because an event
+ * that logically must follow a second event will be considered non-anomalous
+ * if it precedes that second event, provided they happen on different
+ * machines.
+ */
+ private static final long MAXIMUM_CLOCK_SKEW = 10000L;
+
+ /**
+ * The regular expression used to parse task attempt IDs in job tracker logs
+ */
+ private final static Pattern taskAttemptIDPattern = Pattern
+ .compile(".*_([0-9]+)");
+
+ private final static Pattern xmlFilePrefix = Pattern.compile("[ \t]*<");
+
+ private final static Pattern confFileHeader = Pattern.compile("_conf.xml!!");
+
+ private final Map<String, Pattern> counterPatterns = new HashMap<String, Pattern>();
+
+ /**
+ * The unpaired job config file. Currently only used to glean the {@code -Xmx}
+ * field of the JRE options
+ */
+ private ParsedConfigFile jobconf = null;
+
+ /**
+ * Set by {@code -omit-task-details}. If true, we <i>only</i> emit the job
+ * digest [statistical info], not the detailed job trace.
+ */
+ private boolean omitTaskDetails = false;
+
+ private JsonGenerator jobTraceGen = null;
+
+ private boolean prettyprintTrace = true;
+
+ private LoggedJob jobBeingTraced = null;
+
+ private Map<String, LoggedTask> tasksInCurrentJob;
+
+ private Map<String, LoggedTaskAttempt> attemptsInCurrentJob;
+
+ private Histogram[] successfulMapAttemptTimes;
+ private Histogram successfulReduceAttemptTimes;
+ private Histogram[] failedMapAttemptTimes;
+ private Histogram failedReduceAttemptTimes;
+ private Histogram successfulNthMapperAttempts;
+ private Histogram successfulNthReducerAttempts;
+ private Histogram mapperLocality;
+
+ static final private Log LOG = LogFactory.getLog(HadoopLogsAnalyzer.class);
+
+ private int[] attemptTimesPercentiles;
+
+ private JsonGenerator topologyGen = null;
+
+ private HashSet<ParsedHost> allHosts = new HashSet<ParsedHost>();
+
+ // number of ticks per second
+ private boolean collecting = false;
+
+ private long lineNumber = 0;
+
+ private String rereadableLine = null;
+
+ private String inputFilename;
+
+ private boolean inputIsDirectory = false;
+
+ private Path inputDirectoryPath = null;
+ private String[] inputDirectoryFiles = null;
+
+ private int inputDirectoryCursor = -1;
+
+ private LineReader input = null;
+ private CompressionCodec inputCodec = null;
+ private Decompressor inputDecompressor = null;
+ private Text inputLineText = new Text();
+
+ private boolean debug = false;
+
+ private int version = 0;
+
+ private int numberBuckets = 99;
+
+ private int spreadMin;
+
+ private int spreadMax;
+
+ private boolean spreading = false;
+ private boolean delays = false;
+ private boolean runtimes = false;
+
+ private boolean collectTaskTimes = false;
+
+ private LogRecordType canonicalJob = LogRecordType.intern("Job");
+ private LogRecordType canonicalMapAttempt = LogRecordType
+ .intern("MapAttempt");
+ private LogRecordType canonicalReduceAttempt = LogRecordType
+ .intern("ReduceAttempt");
+ private LogRecordType canonicalTask = LogRecordType.intern("Task");
+
+ private static Pattern streamingJobnamePattern = Pattern
+ .compile("streamjob\\d+.jar");
+
+ private HashSet<String> hostNames = new HashSet<String>();
+
+ private boolean fileFirstLine = true;
+ private String currentFileName = null;
+
+ // Here are the cumulative statistics.
+ enum JobOutcome {
+ SUCCESS, FAILURE, OVERALL
+ };
+
+ /**
+ * These rectangular arrays of {@link Histogram}s are indexed by the job type
+ * [java, streaming, pig or pipes] and then by the outcome [success or
+ * failure]
+ */
+ private Histogram runTimeDists[][];
+ private Histogram delayTimeDists[][];
+ private Histogram mapTimeSpreadDists[][];
+ private Histogram shuffleTimeSpreadDists[][];
+ private Histogram sortTimeSpreadDists[][];
+ private Histogram reduceTimeSpreadDists[][];
+
+ private Histogram mapTimeDists[][];
+ private Histogram shuffleTimeDists[][];
+ private Histogram sortTimeDists[][];
+ private Histogram reduceTimeDists[][];
+
+ private Map<String, Long> taskAttemptStartTimes;
+ private Map<String, Long> taskReduceAttemptShuffleEndTimes;
+ private Map<String, Long> taskReduceAttemptSortEndTimes;
+ private Map<String, Long> taskMapAttemptFinishTimes;
+ private Map<String, Long> taskReduceAttemptFinishTimes;
+
+ private long submitTimeCurrentJob;
+ private long launchTimeCurrentJob;
+
+ private String currentJobID;
+
+ // TODO this is currently not being set correctly. We should fix it.
+ // That only matters for statistics extraction.
+ private LoggedJob.JobType thisJobType;
+
+ private Histogram[][] newDistributionBlock() {
+ return newDistributionBlock(null);
+ }
+
+ private Histogram[][] newDistributionBlock(String blockname) {
+ Histogram[][] result = new Histogram[JobOutcome.values().length][];
+
+ for (int i = 0; i < JobOutcome.values().length; ++i) {
+ result[i] = new Histogram[LoggedJob.JobType.values().length];
+
+ for (int j = 0; j < LoggedJob.JobType.values().length; ++j) {
+ result[i][j] = blockname == null ? new Histogram() : new Histogram(
+ blockname);
+ }
+ }
+
+ return result;
+ }
+
+ private Histogram getDistribution(Histogram[][] block, JobOutcome outcome,
+ LoggedJob.JobType type) {
+ return block[outcome.ordinal()][type.ordinal()];
+ }
+
+ private void usage() {
+ statusOutput
+ .print("Usage: \n"
+ + "administrative subcommands:\n"
+ + "-v1 specify version 1 of the jt logs\n"
+ + "-h or -help print this message\n"
+ + "-d or -debug print voluminous debug info during processing\n"
+ + "-collect-prefixes collect the prefixes of log lines\n\n"
+ + " job trace subcommands\n"
+ + "-write-job-trace takes a filename.\n"
+ + " writes job trace in JSON to that filename\n"
+ + "-single-line-job-traces omit prettyprinting of job trace\n"
+ + "-omit-task-details leave out info about each task and attempt,\n"
+ + " so only statistical info is added to each job\n"
+ + "-write-topology takes a filename.\n"
+ + " writes JSON file giving network topology\n"
+ + "-job-digest-spectra takes a list of percentile points\n"
+ + " writes CDFs with min, max, and those percentiles\n\n"
+ + "subcommands for task statistical info\n"
+ + "-spreads we have a mode where, for each job, we can\n"
+ + " develop the ratio of percentile B to percentile A\n"
+ + " of task run times. Having developed that ratio,\n"
+ + " we can consider it to be a datum and we can\n"
+ + " build a CDF of those ratios. -spreads turns\n"
+ + " this option on, and takes A and B\n"
+ + "-delays tells us to gather and print CDFs for delays\n"
+ + " from job submit to job start\n"
+ + "-runtimes prints CDFs of job wallclock times [launch\n"
+ + " to finish]\n"
+ + "-tasktimes prints CDFs of job wallclock times [launch\n"
+ + " to finish]\n\n");
+ }
+
+ public HadoopLogsAnalyzer() {
+ super();
+ }
+
+ private boolean pathIsDirectory(Path p) throws IOException {
+ FileSystem fs = p.getFileSystem(getConf());
+ return fs.getFileStatus(p).isDir();
+ }
+
+ /**
+ * @param args
+ * string arguments. See {@code usage()}
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private int initializeHadoopLogsAnalyzer(String[] args)
+ throws FileNotFoundException, IOException {
+ Path jobTraceFilename = null;
+ Path topologyFilename = null;
+ if (args.length == 0 || args[args.length - 1].charAt(0) == '-') {
+ throw new IllegalArgumentException("No input specified.");
+ } else {
+ inputFilename = args[args.length - 1];
+ }
+
+ for (int i = 0; i < args.length - (inputFilename == null ? 0 : 1); ++i) {
+ if ("-h".equals(args[i].toLowerCase())
+ || "-help".equals(args[i].toLowerCase())) {
+ usage();
+ return 0;
+ }
+
+ if ("-c".equals(args[i].toLowerCase())
+ || "-collect-prefixes".equals(args[i].toLowerCase())) {
+ collecting = true;
+ continue;
+ }
+
+ // these control the job digest
+ if ("-write-job-trace".equals(args[i].toLowerCase())) {
+ ++i;
+ jobTraceFilename = new Path(args[i]);
+ continue;
+ }
+
+ if ("-single-line-job-traces".equals(args[i].toLowerCase())) {
+ prettyprintTrace = false;
+ continue;
+ }
+
+ if ("-omit-task-details".equals(args[i].toLowerCase())) {
+ omitTaskDetails = true;
+ continue;
+ }
+
+ if ("-write-topology".equals(args[i].toLowerCase())) {
+ ++i;
+ topologyFilename = new Path(args[i]);
+ continue;
+ }
+
+ if ("-job-digest-spectra".equals(args[i].toLowerCase())) {
+ ArrayList<Integer> values = new ArrayList<Integer>();
+
+ ++i;
+
+ while (i < args.length && Character.isDigit(args[i].charAt(0))) {
+ values.add(Integer.parseInt(args[i]));
+ ++i;
+ }
+
+ if (values.size() == 0) {
+ throw new IllegalArgumentException("Empty -job-digest-spectra list");
+ }
+
+ attemptTimesPercentiles = new int[values.size()];
+
+ int lastValue = 0;
+
+ for (int j = 0; j < attemptTimesPercentiles.length; ++j) {
+ if (values.get(j) <= lastValue || values.get(j) >= 100) {
+ throw new IllegalArgumentException(
+ "Bad -job-digest-spectra percentiles list");
+ }
+ attemptTimesPercentiles[j] = values.get(j);
+ }
+
+ --i;
+ continue;
+ }
+
+ if ("-d".equals(args[i].toLowerCase())
+ || "-debug".equals(args[i].toLowerCase())) {
+ debug = true;
+ continue;
+ }
+
+ if ("-spreads".equals(args[i].toLowerCase())) {
+ int min = Integer.parseInt(args[i + 1]);
+ int max = Integer.parseInt(args[i + 2]);
+
+ if (min < max && min < 1000 && max < 1000) {
+ spreadMin = min;
+ spreadMax = max;
+ spreading = true;
+ i += 2;
+ }
+ continue;
+ }
+
+ // These control log-wide CDF outputs
+ if ("-delays".equals(args[i].toLowerCase())) {
+ delays = true;
+ continue;
+ }
+
+ if ("-runtimes".equals(args[i].toLowerCase())) {
+ runtimes = true;
+ continue;
+ }
+
+ if ("-tasktimes".equals(args[i].toLowerCase())) {
+ collectTaskTimes = true;
+ continue;
+ }
+
+ if ("-v1".equals(args[i].toLowerCase())) {
+ version = 1;
+ continue;
+ }
+
+ throw new IllegalArgumentException("Unrecognized argument: " + args[i]);
+ }
+
+ runTimeDists = newDistributionBlock();
+ delayTimeDists = newDistributionBlock();
+ mapTimeSpreadDists = newDistributionBlock("map-time-spreads");
+ shuffleTimeSpreadDists = newDistributionBlock();
+ sortTimeSpreadDists = newDistributionBlock();
+ reduceTimeSpreadDists = newDistributionBlock();
+
+ mapTimeDists = newDistributionBlock();
+ shuffleTimeDists = newDistributionBlock();
+ sortTimeDists = newDistributionBlock();
+ reduceTimeDists = newDistributionBlock();
+
+ taskAttemptStartTimes = new HashMap<String, Long>();
+ taskReduceAttemptShuffleEndTimes = new HashMap<String, Long>();
+ taskReduceAttemptSortEndTimes = new HashMap<String, Long>();
+ taskMapAttemptFinishTimes = new HashMap<String, Long>();
+ taskReduceAttemptFinishTimes = new HashMap<String, Long>();
+
+ final Path inputPath = new Path(inputFilename);
+
+ inputIsDirectory = pathIsDirectory(inputPath);
+
+ if (jobTraceFilename != null && attemptTimesPercentiles == null) {
+ attemptTimesPercentiles = new int[19];
+
+ for (int i = 0; i < 19; ++i) {
+ attemptTimesPercentiles[i] = (i + 1) * 5;
+ }
+ }
+
+ if (!inputIsDirectory) {
+ input = maybeUncompressedPath(inputPath);
+ } else {
+ inputDirectoryPath = inputPath;
+ FileSystem fs = inputPath.getFileSystem(getConf());
+ FileStatus[] statuses = fs.listStatus(inputPath);
+ inputDirectoryFiles = new String[statuses.length];
+
+ for (int i = 0; i < statuses.length; ++i) {
+ inputDirectoryFiles[i] = statuses[i].getPath().getName();
+ }
+
+ // filter out the .crc files, if any
+ int dropPoint = 0;
+
+ for (int i = 0; i < inputDirectoryFiles.length; ++i) {
+ String name = inputDirectoryFiles[i];
+
+ if (!(name.length() >= 4 && ".crc".equals(name
+ .substring(name.length() - 4)))) {
+ inputDirectoryFiles[dropPoint++] = name;
+ }
+ }
+
+ LOG.info("We dropped " + (inputDirectoryFiles.length - dropPoint)
+ + " crc files.");
+
+ String[] new_inputDirectoryFiles = new String[dropPoint];
+ System.arraycopy(inputDirectoryFiles, 0, new_inputDirectoryFiles, 0,
+ dropPoint);
+ inputDirectoryFiles = new_inputDirectoryFiles;
+
+ Arrays.sort(inputDirectoryFiles);
+
+ if (!setNextDirectoryInputStream()) {
+ throw new FileNotFoundException("Empty directory specified.");
+ }
+ }
+
+ if (jobTraceFilename != null) {
+ ObjectMapper jmapper = new ObjectMapper();
+ jmapper.configure(
+ SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ JsonFactory jfactory = jmapper.getJsonFactory();
+ FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
+ jobTraceGen = jfactory.createJsonGenerator(
+ jobFS.create(jobTraceFilename), JsonEncoding.UTF8);
+ if (prettyprintTrace) {
+ jobTraceGen.useDefaultPrettyPrinter();
+ }
+
+ if (topologyFilename != null) {
+ ObjectMapper tmapper = new ObjectMapper();
+ tmapper.configure(
+ SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ JsonFactory tfactory = tmapper.getJsonFactory();
+ FileSystem topoFS = topologyFilename.getFileSystem(getConf());
+ topologyGen = tfactory.createJsonGenerator(
+ topoFS.create(topologyFilename), JsonEncoding.UTF8);
+ topologyGen.useDefaultPrettyPrinter();
+ }
+ }
+
+ return 0;
+ }
+
+ private LineReader maybeUncompressedPath(Path p)
+ throws FileNotFoundException, IOException {
+ CompressionCodecFactory codecs = new CompressionCodecFactory(getConf());
+ inputCodec = codecs.getCodec(p);
+ FileSystem fs = p.getFileSystem(getConf());
+ FSDataInputStream fileIn = fs.open(p);
+
+ if (inputCodec == null) {
+ return new LineReader(fileIn, getConf());
+ } else {
+ inputDecompressor = CodecPool.getDecompressor(inputCodec);
+ return new LineReader(inputCodec.createInputStream(fileIn,
+ inputDecompressor), getConf());
+ }
+ }
+
+ private boolean setNextDirectoryInputStream() throws FileNotFoundException,
+ IOException {
+ if (input != null) {
+ input.close();
+ LOG.info("File closed: "+currentFileName);
+ input = null;
+ }
+
+ if (inputCodec != null) {
+ CodecPool.returnDecompressor(inputDecompressor);
+ inputDecompressor = null;
+ inputCodec = null;
+ }
+
+ ++inputDirectoryCursor;
+
+ if (inputDirectoryCursor >= inputDirectoryFiles.length) {
+ return false;
+ }
+
+ fileFirstLine = true;
+
+ currentFileName = inputDirectoryFiles[inputDirectoryCursor];
+
+ LOG.info("\nOpening file " + currentFileName
+ + " *************************** .");
+ LOG
+ .info("This file, " + (inputDirectoryCursor + 1) + "/"
+ + inputDirectoryFiles.length + ", starts with line " + lineNumber
+ + ".");
+
+ input = maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
+
+ return input != null;
+ }
+
+ private String readInputLine() throws IOException {
+ try {
+ if (input == null) {
+ return null;
+ }
+ inputLineText.clear();
+ if (input.readLine(inputLineText) == 0) {
+ return null;
+ }
+
+ return inputLineText.toString();
+ } catch (EOFException e) {
+ return null;
+ }
+
+ }
+
+ private String readCountedLine() throws IOException {
+ if (rereadableLine != null) {
+ String result = rereadableLine;
+ rereadableLine = null;
+ return result;
+ }
+
+ String result = readInputLine();
+
+ if (result != null) {
+ if (fileFirstLine && (result.equals("") || result.charAt(0) != '\f')) {
+ fileFirstLine = false;
+ rereadableLine = result;
+ return "\f!!FILE " + currentFileName + "!!\n";
+ }
+ fileFirstLine = false;
+ ++lineNumber;
+ } else if (inputIsDirectory && setNextDirectoryInputStream()) {
+ result = readCountedLine();
+ }
+
+ return result;
+ }
+
+ private void unreadCountedLine(String unreadee) {
+ if (rereadableLine == null) {
+ rereadableLine = unreadee;
+ }
+ }
+
+ private boolean apparentConfFileHeader(String header) {
+ return confFileHeader.matcher(header).find();
+ }
+
+ private boolean apparentXMLFileStart(String line) {
+ return xmlFilePrefix.matcher(line).lookingAt();
+ }
+
+ // This can return either the Pair of the !!file line and the XMLconf
+ // file, or null and an ordinary line. Returns just null if there's
+ // no more input.
+ private Pair<String, String> readBalancedLine() throws IOException {
+ String line = readCountedLine();
+
+ if (line == null) {
+ return null;
+ }
+
+ while (line.indexOf('\f') > 0) {
+ line = line.substring(line.indexOf('\f'));
+ }
+
+ if (line.length() != 0 && line.charAt(0) == '\f') {
+ String subjectLine = readCountedLine();
+
+ if (subjectLine != null && subjectLine.length() != 0
+ && apparentConfFileHeader(line) && apparentXMLFileStart(subjectLine)) {
+ StringBuilder sb = new StringBuilder();
+
+ while (subjectLine != null && subjectLine.indexOf('\f') > 0) {
+ subjectLine = subjectLine.substring(subjectLine.indexOf('\f'));
+ }
+
+ while (subjectLine != null
+ && (subjectLine.length() == 0 || subjectLine.charAt(0) != '\f')) {
+ sb.append(subjectLine);
+ subjectLine = readCountedLine();
+ }
+
+ if (subjectLine != null) {
+ unreadCountedLine(subjectLine);
+ }
+
+ return new Pair<String, String>(line, sb.toString());
+ }
+
+ // here we had a file line, but it introduced a log segment, not
+ // a conf file. We want to just ignore the file line.
+
+ return readBalancedLine();
+ }
+
+ String endlineString = (version == 0 ? " " : " .");
+
+ if (line.length() < endlineString.length()) {
+ return new Pair<String, String>(null, line);
+ }
+
+ if (!endlineString.equals(line.substring(line.length()
+ - endlineString.length()))) {
+ StringBuilder sb = new StringBuilder(line);
+
+ String addedLine;
+
+ do {
+ addedLine = readCountedLine();
+
+ if (addedLine == null) {
+ return new Pair<String, String>(null, sb.toString());
+ }
+
+ while (addedLine.indexOf('\f') > 0) {
+ addedLine = addedLine.substring(addedLine.indexOf('\f'));
+ }
+
+ if (addedLine.length() > 0 && addedLine.charAt(0) == '\f') {
+ unreadCountedLine(addedLine);
+ return new Pair<String, String>(null, sb.toString());
+ }
+
+ sb.append("\n");
+ sb.append(addedLine);
+ } while (!endlineString.equals(addedLine.substring(addedLine.length()
+ - endlineString.length())));
+
+ line = sb.toString();
+ }
+
+ return new Pair<String, String>(null, line);
+ }
+
+ private void incorporateSpread(Histogram taskTimes, Histogram[][] spreadTo,
+ JobOutcome outcome, LoggedJob.JobType jtype) {
+ if (!spreading) {
+ return;
+ }
+
+ if (taskTimes.getTotalCount() <= 1) {
+ return;
+ }
+
+ // there are some literals here that probably should be options
+ int[] endpoints = new int[2];
+
+ endpoints[0] = spreadMin;
+ endpoints[1] = spreadMax;
+
+ long[] endpointKeys = taskTimes.getCDF(1000, endpoints);
+
+ int smallResultOffset = (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
+ : 0);
+
+ Histogram myTotal = spreadTo[outcome.ordinal()][jtype.ordinal()];
+
+ long dividend = endpointKeys[2 + smallResultOffset];
+ long divisor = endpointKeys[1 - smallResultOffset];
+
+ if (divisor > 0) {
+ long mytotalRatio = dividend * 1000000L / divisor;
+
+ myTotal.enter(mytotalRatio);
+ }
+ }
+
+ private void canonicalDistributionsEnter(Histogram[][] block,
+ JobOutcome outcome, LoggedJob.JobType type, long value) {
+ getDistribution(block, outcome, type).enter(value);
+ getDistribution(block, JobOutcome.OVERALL, type).enter(value);
+ getDistribution(block, outcome, LoggedJob.JobType.OVERALL).enter(value);
+ getDistribution(block, JobOutcome.OVERALL, LoggedJob.JobType.OVERALL)
+ .enter(value);
+ }
+
+ private void processJobLine(ParsedLine line) throws JsonProcessingException,
+ IOException {
+ try {
+ if (version == 0 || version == 1) {
+ // determine the job type if this is the declaration line
+ String jobID = line.get("JOBID");
+
+ String user = line.get("USER");
+
+ String jobPriority = line.get("JOB_PRIORITY");
+
+ String submitTime = line.get("SUBMIT_TIME");
+
+ String jobName = line.get("JOBNAME");
+
+ String launchTime = line.get("LAUNCH_TIME");
+
+ String finishTime = line.get("FINISH_TIME");
+
+ String status = line.get("JOB_STATUS");
+
+ String totalMaps = line.get("TOTAL_MAPS");
+
+ String totalReduces = line.get("TOTAL_REDUCES");
+
+ /*
+ * If the job appears new [the ID is different from the most recent one,
+ * if any] we make a new LoggedJob.
+ */
+ if (jobID != null
+ && jobTraceGen != null
+ && (jobBeingTraced == null || !jobID.equals(jobBeingTraced
+ .getJobID()))) {
+ // push out the old job if there is one, even though it did't get
+ // mated
+ // with a conf.
+
+ finalizeJob();
+
+ jobBeingTraced = new LoggedJob(jobID);
+
+ tasksInCurrentJob = new HashMap<String, LoggedTask>();
+ attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();
+
+ // initialize all the per-job statistics gathering places
+ successfulMapAttemptTimes = new Histogram[ParsedHost
+ .numberOfDistances() + 1];
+ for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
+ successfulMapAttemptTimes[i] = new Histogram();
+ }
+
+ successfulReduceAttemptTimes = new Histogram();
+ failedMapAttemptTimes = new Histogram[ParsedHost.numberOfDistances() + 1];
+ for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
+ failedMapAttemptTimes[i] = new Histogram();
+ }
+
+ failedReduceAttemptTimes = new Histogram();
+ successfulNthMapperAttempts = new Histogram();
+ successfulNthReducerAttempts = new Histogram();
+ mapperLocality = new Histogram();
+ }
+
+ // here we fill in all the stuff the trace might need
+ if (jobBeingTraced != null) {
+ if (user != null) {
+ jobBeingTraced.setUser(user);
+ }
+
+ if (jobPriority != null) {
+ jobBeingTraced.setPriority(LoggedJob.JobPriority
+ .valueOf(jobPriority));
+ }
+
+ if (totalMaps != null) {
+ jobBeingTraced.setTotalMaps(Integer.parseInt(totalMaps));
+ }
+
+ if (totalReduces != null) {
+ jobBeingTraced.setTotalReduces(Integer.parseInt(totalReduces));
+ }
+
+ if (submitTime != null) {
+ jobBeingTraced.setSubmitTime(Long.parseLong(submitTime));
+ }
+
+ if (launchTime != null) {
+ jobBeingTraced.setLaunchTime(Long.parseLong(launchTime));
+ }
+
+ if (finishTime != null) {
+ jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
+ if (status != null) {
+ jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values.valueOf(status));
+ }
+
+ maybeMateJobAndConf();
+ }
+ }
+
+ if (jobName != null) {
+ // we'll make it java unless the name parses out
+ Matcher m = streamingJobnamePattern.matcher(jobName);
+
+ thisJobType = LoggedJob.JobType.JAVA;
+
+ if (m.matches()) {
+ thisJobType = LoggedJob.JobType.STREAMING;
+ }
+ }
+ if (submitTime != null) {
+ submitTimeCurrentJob = Long.parseLong(submitTime);
+
+ currentJobID = jobID;
+
+ taskAttemptStartTimes = new HashMap<String, Long>();
+ taskReduceAttemptShuffleEndTimes = new HashMap<String, Long>();
+ taskReduceAttemptSortEndTimes = new HashMap<String, Long>();
+ taskMapAttemptFinishTimes = new HashMap<String, Long>();
+ taskReduceAttemptFinishTimes = new HashMap<String, Long>();
+
+ launchTimeCurrentJob = 0L;
+ } else if (launchTime != null && jobID != null
+ && currentJobID.equals(jobID)) {
+ launchTimeCurrentJob = Long.parseLong(launchTime);
+ } else if (finishTime != null && jobID != null
+ && currentJobID.equals(jobID)) {
+ long endTime = Long.parseLong(finishTime);
+
+ if (launchTimeCurrentJob != 0) {
+ String jobResultText = line.get("JOB_STATUS");
+
+ JobOutcome thisOutcome = ((jobResultText != null && "SUCCESS"
+ .equals(jobResultText)) ? JobOutcome.SUCCESS
+ : JobOutcome.FAILURE);
+
+ if (submitTimeCurrentJob != 0L) {
+ canonicalDistributionsEnter(delayTimeDists, thisOutcome,
+ thisJobType, launchTimeCurrentJob - submitTimeCurrentJob);
+ }
+
+ if (launchTimeCurrentJob != 0L) {
+ canonicalDistributionsEnter(runTimeDists, thisOutcome,
+ thisJobType, endTime - launchTimeCurrentJob);
+ }
+
+ // Now we process the hash tables with successful task attempts
+
+ Histogram currentJobMapTimes = new Histogram();
+ Histogram currentJobShuffleTimes = new Histogram();
+ Histogram currentJobSortTimes = new Histogram();
+ Histogram currentJobReduceTimes = new Histogram();
+
+ Iterator<Map.Entry<String, Long>> taskIter = taskAttemptStartTimes
+ .entrySet().iterator();
+
+ while (taskIter.hasNext()) {
+ Map.Entry<String, Long> entry = taskIter.next();
+
+ long startTime = entry.getValue();
+
+ // Map processing
+ Long mapEndTime = taskMapAttemptFinishTimes.get(entry.getKey());
+
+ if (mapEndTime != null) {
+ currentJobMapTimes.enter(mapEndTime - startTime);
+
+ canonicalDistributionsEnter(mapTimeDists, thisOutcome,
+ thisJobType, mapEndTime - startTime);
+ }
+
+ // Reduce processing
+ Long shuffleEnd = taskReduceAttemptShuffleEndTimes.get(entry
+ .getKey());
+ Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
+ Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());
+
+ if (shuffleEnd != null && sortEnd != null && reduceEnd != null) {
+ currentJobShuffleTimes.enter(shuffleEnd - startTime);
+ currentJobSortTimes.enter(sortEnd - shuffleEnd);
+ currentJobReduceTimes.enter(reduceEnd - sortEnd);
+
+ canonicalDistributionsEnter(shuffleTimeDists, thisOutcome,
+ thisJobType, shuffleEnd - startTime);
+ canonicalDistributionsEnter(sortTimeDists, thisOutcome,
+ thisJobType, sortEnd - shuffleEnd);
+ canonicalDistributionsEnter(reduceTimeDists, thisOutcome,
+ thisJobType, reduceEnd - sortEnd);
+ }
+ }
+
+ // Here we save out the task information
+ incorporateSpread(currentJobMapTimes, mapTimeSpreadDists,
+ thisOutcome, thisJobType);
+ incorporateSpread(currentJobShuffleTimes, shuffleTimeSpreadDists,
+ thisOutcome, thisJobType);
+ incorporateSpread(currentJobSortTimes, sortTimeSpreadDists,
+ thisOutcome, thisJobType);
+ incorporateSpread(currentJobReduceTimes, reduceTimeSpreadDists,
+ thisOutcome, thisJobType);
+ }
+ }
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "HadoopLogsAnalyzer.processJobLine: bad numerical format, at line "
+ + lineNumber + ".", e);
+ }
+ }
+
+ private void processTaskLine(ParsedLine line) {
+ if (jobBeingTraced != null) {
+ // these fields are in both the start and finish record
+ String taskID = line.get("TASKID");
+ String taskType = line.get("TASK_TYPE");
+
+ // this field is only in the start record
+ String startTime = line.get("START_TIME");
+
+ // these fields only exist or are only relevant in the finish record
+ String status = line.get("TASK_STATUS");
+ String finishTime = line.get("FINISH_TIME");
+
+ String splits = line.get("SPLITS");
+
+ LoggedTask task = tasksInCurrentJob.get(taskID);
+
+ boolean taskAlreadyLogged = task != null;
+
+ if (task == null) {
+ task = new LoggedTask();
+ }
+
+ if (splits != null) {
+ ArrayList<LoggedLocation> locations = null;
+
+ StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+ if (tok.countTokens() <= MAXIMUM_PREFERRED_LOCATIONS) {
+ locations = new ArrayList<LoggedLocation>();
+ }
+
+ while (tok.hasMoreTokens()) {
+ String nextSplit = tok.nextToken();
+
+ ParsedHost node = getAndRecordParsedHost(nextSplit);
+
+ if (locations != null && node != null) {
+ locations.add(node.makeLoggedLocation());
+ }
+ }
+
+ task.setPreferredLocations(locations);
+ }
+
+ task.setTaskID(taskID);
+
+ if (startTime != null) {
+ task.setStartTime(Long.parseLong(startTime));
+ }
+
+ if (finishTime != null) {
+ task.setFinishTime(Long.parseLong(finishTime));
+ }
+
+ Pre21JobHistoryConstants.Values typ;
+ Pre21JobHistoryConstants.Values stat;
+
+ try {
+ stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ } catch (IllegalArgumentException e) {
+ LOG.error("A task status you don't know about is \"" + status + "\".",
+ e);
+ stat = null;
+ }
+
+ task.setTaskStatus(stat);
+
+ try {
+ typ = taskType == null ? null : Pre21JobHistoryConstants.Values.valueOf(taskType);
+ } catch (IllegalArgumentException e) {
+ LOG.error("A task type you don't know about is \"" + taskType + "\".",
+ e);
+ typ = null;
+ }
+
+ if (typ == null) {
+ return;
+ }
+
+ task.setTaskType(typ);
+
+ List<LoggedTask> vec = typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
+ .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE ? jobBeingTraced
+ .getReduceTasks() : jobBeingTraced.getOtherTasks();
+
+ if (!taskAlreadyLogged) {
+ vec.add(task);
+
+ tasksInCurrentJob.put(taskID, task);
+ }
+ }
+ }
+
+ private Pattern counterPattern(String counterName) {
+ Pattern result = counterPatterns.get(counterName);
+
+ if (result == null) {
+ String namePatternRegex = "\\[\\(" + counterName
+ + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
+ result = Pattern.compile(namePatternRegex);
+ counterPatterns.put(counterName, result);
+ }
+
+ return result;
+ }
+
+ private String parseCounter(String counterString, String counterName) {
+ if (counterString == null) {
+ return null;
+ }
+
+ Matcher mat = counterPattern(counterName).matcher(counterString);
+
+ if (mat.find()) {
+ return mat.group(1);
+ }
+
+ return null;
+ }
+
+ abstract class SetField {
+ LoggedTaskAttempt attempt;
+
+ SetField(LoggedTaskAttempt attempt) {
+ this.attempt = attempt;
+ }
+
+ abstract void set(long value);
+ }
+
+ private void incorporateCounter(SetField thunk, String counterString,
+ String counterName) {
+ String valueString = parseCounter(counterString, counterName);
+
+ if (valueString != null) {
+ thunk.set(Long.parseLong(valueString));
+ }
+ }
+
+ private void incorporateCounters(LoggedTaskAttempt attempt2,
+ String counterString) {
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.hdfsBytesRead = val;
+ }
+ }, counterString, "HDFS_BYTES_READ");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.hdfsBytesWritten = val;
+ }
+ }, counterString, "HDFS_BYTES_WRITTEN");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.fileBytesRead = val;
+ }
+ }, counterString, "FILE_BYTES_READ");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.fileBytesWritten = val;
+ }
+ }, counterString, "FILE_BYTES_WRITTEN");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.mapInputBytes = val;
+ }
+ }, counterString, "MAP_INPUT_BYTES");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.mapInputRecords = val;
+ }
+ }, counterString, "MAP_INPUT_RECORDS");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.mapOutputBytes = val;
+ }
+ }, counterString, "MAP_OUTPUT_BYTES");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.mapOutputRecords = val;
+ }
+ }, counterString, "MAP_OUTPUT_RECORDS");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.combineInputRecords = val;
+ }
+ }, counterString, "COMBINE_INPUT_RECORDS");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.reduceInputGroups = val;
+ }
+ }, counterString, "REDUCE_INPUT_GROUPS");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.reduceInputRecords = val;
+ }
+ }, counterString, "REDUCE_INPUT_RECORDS");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.reduceShuffleBytes = val;
+ }
+ }, counterString, "REDUCE_SHUFFLE_BYTES");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.reduceOutputRecords = val;
+ }
+ }, counterString, "REDUCE_OUTPUT_RECORDS");
+ incorporateCounter(new SetField(attempt2) {
+ @Override
+ void set(long val) {
+ attempt.spilledRecords = val;
+ }
+ }, counterString, "SPILLED_RECORDS");
+ }
+
+ private ParsedHost getAndRecordParsedHost(String hostName) {
+ ParsedHost result = ParsedHost.parse(hostName);
+
+ if (result != null && !allHosts.contains(result)) {
+ allHosts.add(result);
+ }
+
+ return result;
+ }
+
+ private void processMapAttemptLine(ParsedLine line) {
+ String attemptID = line.get("TASK_ATTEMPT_ID");
+
+ String taskID = line.get("TASKID");
+
+ String status = line.get("TASK_STATUS");
+
+ String attemptStartTime = line.get("START_TIME");
+ String attemptFinishTime = line.get("FINISH_TIME");
+
+ String hostName = line.get("HOSTNAME");
+
+ String counters = line.get("COUNTERS");
+
+ if (jobBeingTraced != null && taskID != null) {
+ LoggedTask task = tasksInCurrentJob.get(taskID);
+
+ if (task == null) {
+ task = new LoggedTask();
+
+ task.setTaskID(taskID);
+
+ jobBeingTraced.getMapTasks().add(task);
+
+ tasksInCurrentJob.put(taskID, task);
+ }
+
+ task.setTaskID(taskID);
+
+ LoggedTaskAttempt attempt = attemptsInCurrentJob.get(attemptID);
+
+ boolean attemptAlreadyExists = attempt != null;
+
+ if (attempt == null) {
+ attempt = new LoggedTaskAttempt();
+
+ attempt.setAttemptID(attemptID);
+ }
+
+ if (!attemptAlreadyExists) {
+ attemptsInCurrentJob.put(attemptID, attempt);
+ task.getAttempts().add(attempt);
+ }
+
+ Pre21JobHistoryConstants.Values stat = null;
+
+ try {
+ stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ } catch (IllegalArgumentException e) {
+ LOG.error("A map attempt status you don't know about is \"" + status
+ + "\".", e);
+ stat = null;
+ }
+
+ incorporateCounters(attempt, counters);
+
+ attempt.setResult(stat);
+
+ if (attemptStartTime != null) {
+ attempt.setStartTime(Long.parseLong(attemptStartTime));
+ }
+
+ if (attemptFinishTime != null) {
+ attempt.setFinishTime(Long.parseLong(attemptFinishTime));
+ }
+
+ int distance = Integer.MAX_VALUE;
+
+ if (hostName != null) {
+ attempt.setHostName(hostName);
+
+ ParsedHost host = null;
+
+ host = getAndRecordParsedHost(hostName);
+
+ if (host != null) {
+ attempt.setLocation(host.makeLoggedLocation());
+ }
+
+ ArrayList<LoggedLocation> locs = task.getPreferredLocations();
+
+ if (host != null && locs != null) {
+ for (LoggedLocation loc : locs) {
+ ParsedHost preferedLoc = new ParsedHost(loc);
+
+ distance = Math.min(distance, preferedLoc.distance(host));
+ }
+ }
+
+ mapperLocality.enter(distance);
+ }
+
+ distance = Math.min(distance, successfulMapAttemptTimes.length - 1);
+
+ if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
+ long runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+ if (stat == Pre21JobHistoryConstants.Values.SUCCESS) {
+ successfulMapAttemptTimes[distance].enter(runtime);
+ }
+
+ if (stat == Pre21JobHistoryConstants.Values.FAILED) {
+ failedMapAttemptTimes[distance].enter(runtime);
+ }
+ }
+
+ if (attemptID != null) {
+ Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
+
+ if (matcher.matches()) {
+ String attemptNumberString = matcher.group(1);
+
+ if (attemptNumberString != null) {
+ int attemptNumber = Integer.parseInt(attemptNumberString);
+
+ successfulNthMapperAttempts.enter(attemptNumber);
+ }
+ }
+ }
+ }
+
+ try {
+ if (attemptStartTime != null) {
+ long startTimeValue = Long.parseLong(attemptStartTime);
+
+ if (startTimeValue != 0
+ && startTimeValue + MAXIMUM_CLOCK_SKEW >= launchTimeCurrentJob) {
+ taskAttemptStartTimes.put(attemptID, startTimeValue);
+ } else {
+ taskAttemptStartTimes.remove(attemptID);
+ }
+ } else if (status != null && attemptFinishTime != null) {
+ long finishTime = Long.parseLong(attemptFinishTime);
+
+ if (status.equals("SUCCESS")) {
+ taskMapAttemptFinishTimes.put(attemptID, finishTime);
+ }
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "HadoopLogsAnalyzer.processMapAttemptLine: bad numerical format, at line"
+ + lineNumber + ".", e);
+ }
+ }
+
+ private void processReduceAttemptLine(ParsedLine line) {
+ String attemptID = line.get("TASK_ATTEMPT_ID");
+
+ String taskID = line.get("TASKID");
+
+ String status = line.get("TASK_STATUS");
+
+ String attemptStartTime = line.get("START_TIME");
+ String attemptFinishTime = line.get("FINISH_TIME");
+ String attemptShuffleFinished = line.get("SHUFFLE_FINISHED");
+ String attemptSortFinished = line.get("SORT_FINISHED");
+
+ String counters = line.get("COUNTERS");
+
+ String hostName = line.get("HOSTNAME");
+
+ if (hostName != null && !hostNames.contains(hostName)) {
+ hostNames.add(hostName);
+ }
+
+ if (jobBeingTraced != null && taskID != null) {
+ LoggedTask task = tasksInCurrentJob.get(taskID);
+
+ if (task == null) {
+ task = new LoggedTask();
+
+ task.setTaskID(taskID);
+
+ jobBeingTraced.getReduceTasks().add(task);
+
+ tasksInCurrentJob.put(taskID, task);
+ }
+
+ task.setTaskID(taskID);
+
+ LoggedTaskAttempt attempt = attemptsInCurrentJob.get(attemptID);
+
+ boolean attemptAlreadyExists = attempt != null;
+
+ if (attempt == null) {
+ attempt = new LoggedTaskAttempt();
+
+ attempt.setAttemptID(attemptID);
+ }
+
+ if (!attemptAlreadyExists) {
+ attemptsInCurrentJob.put(attemptID, attempt);
+ task.getAttempts().add(attempt);
+ }
+
+ Pre21JobHistoryConstants.Values stat = null;
+
+ try {
+ stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("A map attempt status you don't know about is \"" + status
+ + "\".", e);
+ stat = null;
+ }
+
+ incorporateCounters(attempt, counters);
+
+ attempt.setResult(stat);
+
+ if (attemptStartTime != null) {
+ attempt.setStartTime(Long.parseLong(attemptStartTime));
+ }
+
+ if (attemptFinishTime != null) {
+ attempt.setFinishTime(Long.parseLong(attemptFinishTime));
+ }
+
+ if (attemptShuffleFinished != null) {
+ attempt.setShuffleFinished(Long.parseLong(attemptShuffleFinished));
+ }
+
+ if (attemptSortFinished != null) {
+ attempt.setSortFinished(Long.parseLong(attemptSortFinished));
+ }
+
+ if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
+ long runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+ if (stat == Pre21JobHistoryConstants.Values.SUCCESS) {
+ successfulReduceAttemptTimes.enter(runtime);
+ }
+
+ if (stat == Pre21JobHistoryConstants.Values.FAILED) {
+ failedReduceAttemptTimes.enter(runtime);
+ }
+ }
+ if (hostName != null) {
+ attempt.setHostName(hostName);
+ }
+
+ if (attemptID != null) {
+ Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
+
+ if (matcher.matches()) {
+ String attemptNumberString = matcher.group(1);
+
+ if (attemptNumberString != null) {
+ int attemptNumber = Integer.parseInt(attemptNumberString);
+
+ successfulNthReducerAttempts.enter(attemptNumber);
+ }
+ }
+ }
+ }
+
+ try {
+ if (attemptStartTime != null) {
+ long startTimeValue = Long.parseLong(attemptStartTime);
+
+ if (startTimeValue != 0
+ && startTimeValue + MAXIMUM_CLOCK_SKEW >= launchTimeCurrentJob) {
+ taskAttemptStartTimes.put(attemptID, startTimeValue);
+ }
+ } else if (status != null && status.equals("SUCCESS")
+ && attemptFinishTime != null) {
+ long finishTime = Long.parseLong(attemptFinishTime);
+
+ taskReduceAttemptFinishTimes.put(attemptID, finishTime);
+
+ if (attemptShuffleFinished != null) {
+ taskReduceAttemptShuffleEndTimes.put(attemptID, Long
+ .parseLong(attemptShuffleFinished));
+ }
+
+ if (attemptSortFinished != null) {
+ taskReduceAttemptSortEndTimes.put(attemptID, Long
+ .parseLong(attemptSortFinished));
+ }
+ }
+ } catch (NumberFormatException e) {
+ LOG.error(
+ "HadoopLogsAnalyzer.processReduceAttemptLine: bad numerical format, at line"
+ + lineNumber + ".", e);
+ }
+ }
+
+ private void processParsedLine(ParsedLine line)
+ throws JsonProcessingException, IOException {
+ if (!collecting) {
+ // "Job", "MapAttempt", "ReduceAttempt", "Task"
+ LogRecordType myType = line.getType();
+
+ if (myType == canonicalJob) {
+ processJobLine(line);
+ } else if (myType == canonicalTask) {
+ processTaskLine(line);
+ } else if (myType == canonicalMapAttempt) {
+ processMapAttemptLine(line);
+ } else if (myType == canonicalReduceAttempt) {
+ processReduceAttemptLine(line);
+ } else {
+ }
+ }
+ }
+
+ private void printDistributionSet(String title, Histogram[][] distSet) {
+ statisticalOutput.print(title + "\n\n");
+
+ // print out buckets
+
+ for (int i = 0; i < JobOutcome.values().length; ++i) {
+ for (int j = 0; j < LoggedJob.JobType.values().length; ++j) {
+ JobOutcome thisOutcome = JobOutcome.values()[i];
+ LoggedJob.JobType thisType = LoggedJob.JobType.values()[j];
+
+ statisticalOutput.print("outcome = ");
+ statisticalOutput.print(thisOutcome.toString());
+ statisticalOutput.print(", and type = ");
+ statisticalOutput.print(thisType.toString());
+ statisticalOutput.print(".\n\n");
+
+ Histogram dist = distSet[i][j];
+
+ printSingleDistributionData(dist);
+ }
+ }
+ }
+
+ private void printSingleDistributionData(Histogram dist) {
+ int[] percentiles = new int[numberBuckets];
+
+ for (int k = 0; k < numberBuckets; ++k) {
+ percentiles[k] = k + 1;
+ }
+
+ long[] cdf = dist.getCDF(numberBuckets + 1, percentiles);
+
+ if (cdf == null) {
+ statisticalOutput.print("(No data)\n");
+ } else {
+ statisticalOutput.print("min: ");
+ statisticalOutput.print(cdf[0]);
+ statisticalOutput.print("\n");
+
+ for (int k = 0; k < numberBuckets; ++k) {
+ statisticalOutput.print(percentiles[k]);
+ statisticalOutput.print("% ");
+ statisticalOutput.print(cdf[k + 1]);
+ statisticalOutput.print("\n");
+ }
+
+ statisticalOutput.print("max: ");
+ statisticalOutput.print(cdf[numberBuckets + 1]);
+ statisticalOutput.print("\n");
+ }
+ }
+
+ private void maybeMateJobAndConf() throws IOException {
+ if (jobBeingTraced != null && jobconf != null
+ && jobBeingTraced.getJobID().equals(jobconf.jobID)) {
+ jobBeingTraced.setHeapMegabytes(jobconf.heapMegabytes);
+
+ jobBeingTraced.setQueue(jobconf.queue);
+ jobBeingTraced.setJobName(jobconf.jobName);
+
+ jobBeingTraced.setClusterMapMB(jobconf.clusterMapMB);
+ jobBeingTraced.setClusterReduceMB(jobconf.clusterReduceMB);
+ jobBeingTraced.setJobMapMB(jobconf.jobMapMB);
+ jobBeingTraced.setJobReduceMB(jobconf.jobReduceMB);
+
+ jobconf = null;
+
+ finalizeJob();
+ }
+ }
+
+ private ArrayList<LoggedDiscreteCDF> mapCDFArrayList(Histogram[] data) {
+ ArrayList<LoggedDiscreteCDF> result = new ArrayList<LoggedDiscreteCDF>();
+
+ for (Histogram hist : data) {
+ LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF();
+ discCDF.setCDF(hist, attemptTimesPercentiles, 100);
+ result.add(discCDF);
+ }
+
+ return result;
+ }
+
+ private void finalizeJob() throws IOException {
+ if (jobBeingTraced != null) {
+ if (omitTaskDetails) {
+ jobBeingTraced.setMapTasks(null);
+ jobBeingTraced.setReduceTasks(null);
+ jobBeingTraced.setOtherTasks(null);
+ }
+
+ // add digest info to the job
+ jobBeingTraced
+ .setSuccessfulMapAttemptCDFs(mapCDFArrayList(successfulMapAttemptTimes));
+ jobBeingTraced
+ .setFailedMapAttemptCDFs(mapCDFArrayList(failedMapAttemptTimes));
+
+ LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF();
+ discCDF
+ .setCDF(successfulReduceAttemptTimes, attemptTimesPercentiles, 100);
+ jobBeingTraced.setSuccessfulReduceAttemptCDF(discCDF);
+
+ discCDF = new LoggedDiscreteCDF();
+ discCDF.setCDF(failedReduceAttemptTimes, attemptTimesPercentiles, 100);
+ jobBeingTraced.setFailedReduceAttemptCDF(discCDF);
+
+ long totalSuccessfulAttempts = 0L;
+ long maxTriesToSucceed = 0L;
+
+ for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+ totalSuccessfulAttempts += ent.getValue();
+ maxTriesToSucceed = Math.max(maxTriesToSucceed, ent.getKey());
+ }
+
+ if (totalSuccessfulAttempts > 0L) {
+ double[] successAfterI = new double[(int) maxTriesToSucceed + 1];
+ for (int i = 0; i < successAfterI.length; ++i) {
+ successAfterI[i] = 0.0D;
+ }
+
+ for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+ successAfterI[ent.getKey().intValue()] = ((double) ent.getValue())
+ / totalSuccessfulAttempts;
+ }
+ jobBeingTraced.setMapperTriesToSucceed(successAfterI);
+ } else {
+ jobBeingTraced.setMapperTriesToSucceed(null);
+ }
+
+ jobTraceGen.writeObject(jobBeingTraced);
+
+ jobTraceGen.writeRaw("\n");
+
+ jobBeingTraced = null;
+ }
+ }
+
+ public int run(String[] args) throws IOException {
+
+ int result = initializeHadoopLogsAnalyzer(args);
+
+ if (result != 0) {
+ return result;
+ }
+
+ return run();
+ }
+
+ int run() throws IOException {
+ Pair<String, String> line = readBalancedLine();
+
+ while (line != null) {
+ if (debug
+ && (lineNumber < 1000000L && lineNumber % 1000L == 0 || lineNumber % 1000000L == 0)) {
+ LOG.debug("" + lineNumber + " " + line.second());
+ }
+
+ if (line.first() == null) {
+ try {
+ // HACK ALERT!! It's possible for a Job end line to end a
+ // job for which we have a config file
+ // image [ a ParsedConfigFile ] in jobconf.
+ //
+ // processParsedLine handles this.
+
+ processParsedLine(new ParsedLine(line.second(), version));
+ } catch (StringIndexOutOfBoundsException e) {
+ LOG.warn("anomalous line #" + lineNumber + ":" + line, e);
+ }
+ } else {
+ jobconf = new ParsedConfigFile(line.first(), line.second());
+
+ if (jobconf.valid == false) {
+ jobconf = null;
+ }
+
+ maybeMateJobAndConf();
+ }
+
+ line = readBalancedLine();
+ }
+
+ finalizeJob();
+
+ if (collecting) {
+ String[] typeNames = LogRecordType.lineTypes();
+
+ for (int i = 0; i < typeNames.length; ++i) {
+ statisticalOutput.print(typeNames[i]);
+ statisticalOutput.print('\n');
+ }
+ } else {
+ if (delays) {
+ printDistributionSet("Job start delay spectrum:", delayTimeDists);
+ }
+
+ if (runtimes) {
+ printDistributionSet("Job run time spectrum:", runTimeDists);
+ }
+
+ if (spreading) {
+ String ratioDescription = "(" + spreadMax + "/1000 %ile) to ("
+ + spreadMin + "/1000 %ile) scaled by 1000000";
+
+ printDistributionSet(
+ "Map task success times " + ratioDescription + ":",
+ mapTimeSpreadDists);
+ printDistributionSet("Shuffle success times " + ratioDescription + ":",
+ shuffleTimeSpreadDists);
+ printDistributionSet("Sort success times " + ratioDescription + ":",
+ sortTimeSpreadDists);
+ printDistributionSet("Reduce success times " + ratioDescription + ":",
+ reduceTimeSpreadDists);
+ }
+
+ if (collectTaskTimes) {
+ printDistributionSet("Global map task success times:", mapTimeDists);
+ printDistributionSet("Global shuffle task success times:",
+ shuffleTimeDists);
+ printDistributionSet("Global sort task success times:", sortTimeDists);
+ printDistributionSet("Global reduce task success times:",
+ reduceTimeDists);
+ }
+ }
+
+ if (topologyGen != null) {
+ LoggedNetworkTopology topo = new LoggedNetworkTopology(allHosts,
+ "<root>", 0);
+ topologyGen.writeObject(topo);
+ topologyGen.close();
+ }
+
+ if (jobTraceGen != null) {
+ jobTraceGen.close();
+ }
+
+ if (input != null) {
+ input.close();
+ input = null;
+ }
+
+ if (inputCodec != null) {
+ CodecPool.returnDecompressor(inputDecompressor);
+ inputDecompressor = null;
+ inputCodec = null;
+ }
+
+ return 0;
+ }
+
+ /**
+ * @param args
+ *
+ * Last arg is the input file. That file can be a directory, in which
+ * case you get all the files in sorted order. We will decompress
+ * files whose nmes end in .gz .
+ *
+ * switches: -c collect line types.
+ *
+ * -d debug mode
+ *
+ * -delays print out the delays [interval between job submit time and
+ * launch time]
+ *
+ * -runtimes print out the job runtimes
+ *
+ * -spreads print out the ratio of 10%ile and 90%ile, of both the
+ * successful map task attempt run times and the the successful
+ * reduce task attempt run times
+ *
+ * -tasktimes prints out individual task time distributions
+ *
+ * collects all the line types and prints the first example of each
+ * one
+ */
+ public static void main(String[] args) {
+ try {
+ HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+
+ int result = ToolRunner.run(analyzer, args);
+
+ if (result == 0) {
+ return;
+ }
+
+ System.exit(result);
+ } catch (FileNotFoundException e) {
+ LOG.error("", e);
+ e.printStackTrace(staticDebugOutput);
+ System.exit(1);
+ } catch (IOException e) {
+ LOG.error("", e);
+ e.printStackTrace(staticDebugOutput);
+ System.exit(2);
+ } catch (Exception e) {
+ LOG.error("", e);
+ e.printStackTrace(staticDebugOutput);
+ System.exit(3);
+ }
+ }
+}