You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/08/15 11:04:10 UTC

svn commit: r686181 [1/2] - in /hadoop/core/trunk: ./ src/contrib/failmon/ src/contrib/failmon/bin/ src/contrib/failmon/conf/ src/contrib/failmon/src/ src/contrib/failmon/src/java/ src/contrib/failmon/src/java/org/ src/contrib/failmon/src/java/org/apac...

Author: dhruba
Date: Fri Aug 15 02:04:07 2008
New Revision: 686181

URL: http://svn.apache.org/viewvc?rev=686181&view=rev
Log:
HADOOP-3585. FailMon package for hardware failure monitoring and
analysis of anomalies. (Ioannis Koltsidas via dhruba)


Added:
    hadoop/core/trunk/src/contrib/failmon/
    hadoop/core/trunk/src/contrib/failmon/README
    hadoop/core/trunk/src/contrib/failmon/bin/
    hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh
    hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py
    hadoop/core/trunk/src/contrib/failmon/build.xml
    hadoop/core/trunk/src/contrib/failmon/conf/
    hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties
    hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties
    hadoop/core/trunk/src/contrib/failmon/conf/global.config
    hadoop/core/trunk/src/contrib/failmon/conf/hosts.list
    hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties
    hadoop/core/trunk/src/contrib/failmon/src/
    hadoop/core/trunk/src/contrib/failmon/src/java/
    hadoop/core/trunk/src/contrib/failmon/src/java/org/
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LogParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/MonitorJob.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Monitored.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/NICParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/OfflineAnonymizer.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/PersistentState.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/RunOnce.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SMARTParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SensorsParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SerializedRecord.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/ShellParser.java
    hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SystemLogParser.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/build.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686181&r1=686180&r2=686181&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 15 02:04:07 2008
@@ -86,6 +86,9 @@
     number of files/bytes copied in a particular run to support incremental
     updates and mirroring. (TszWo (Nicholas), SZE via cdouglas)
 
+    HADOOP-3585. FailMon package for hardware failure monitoring and 
+    analysis of anomalies. (Ioannis Koltsidas via dhruba)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=686181&r1=686180&r2=686181&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Fri Aug 15 02:04:07 2008
@@ -799,7 +799,8 @@
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/data_join/src/java"/>
     	<packageset dir="src/contrib/index/src/java"/>
-
+	<packageset dir="src/contrib/failmon/src/java/"/> 
+	
         <link href="${javadoc.link.java}"/>
 
         <classpath >
@@ -816,7 +817,7 @@
        <group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
        <group title="contrib: DataJoin" packages="org.apache.hadoop.contrib.utils.join*"/>
        <group title="contrib: Index" packages="org.apache.hadoop.contrib.index*"/>
-
+       <group title="contrib: FailMon" packages="org.apache.hadoop.contrib.failmon*"/>
     </javadoc>
   </target>	
 

Added: hadoop/core/trunk/src/contrib/failmon/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/README?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/README (added)
+++ hadoop/core/trunk/src/contrib/failmon/README Fri Aug 15 02:04:07 2008
@@ -0,0 +1,97 @@
+****************** FailMon Quick Start Guide ***********************
+
+This document is a guide to quickly setting up and running FailMon.
+For more information and details please see the FailMon User Manual.
+
+***** Building FailMon *****
+
+Normally, FailMon lies under <hadoop-dir>/src/contrib/failmon, where
+<hadoop-source-dir> is the Hadoop project root folder. To compile it,
+one can either run ant for the whole Hadoop project, i.e.:
+
+$ cd <hadoop-dir>
+$ ant
+
+or run ant only for FailMon:
+
+$ cd <hadoop-dir>/src/contrib/failmon
+$ ant
+
+The above will compile FailMon and place all class files under
+<hadoop-dir>/build/contrib/failmon/classes.
+
+By invoking:
+
+$ cd <hadoop-dir>/src/contrib/failmon
+$ ant tar
+
+FailMon is packaged as a standalone jar application in
+<hadoop-dir>/src/contrib/failmon/failmon.tar.gz.
+
+
+***** Deploying FailMon *****
+
+There are two ways FailMon can be deployed in a cluster:
+
+a) Within Hadoop, in which case the whole Hadoop package is uploaded
+to the cluster nodes. In that case, nothing else needs to be done on
+individual nodes.
+
+b) Independently of the Hadoop deployment, i.e., by uploading
+failmon.tar.gz to all nodes and uncompressing it. In that case, the
+bin/failmon.sh script needs to be edited; environment variable
+HADOOPDIR should point to the root directory of the Hadoop
+distribution. Also the location of the Hadoop configuration files
+should be pointed by the property 'hadoop.conf.path' in file
+conf/failmon.properties. Note that these files refer to the HDFS in
+which we want to store the FailMon data (which can potentially be
+different than the one on the cluster we are monitoring).
+
+We assume that either way FailMon is placed in the same directory on
+all nodes, which is typical for most clusters. If this is not
+feasible, one should create the same symbolic link on all nodes of the
+cluster, that points to the FailMon directory of each node.
+
+One should also edit the conf/failmon.properties file on each node to
+set his own property values. However, the default values are expected
+to serve most practical cases. Refer to the FailMon User Manual about
+the various properties and configuration parameters.
+
+
+***** Running FailMon *****
+
+In order to run FailMon using a node to do the ad-hoc scheduling of
+monitoring jobs, one needs edit the hosts.list file to specify the
+list of machine hostnames on which FailMon is to be run. Also, in file
+conf/global.config the username used to connect to the machines has to
+be specified (passwordless SSH is assumed) in property 'ssh.username'.
+In property 'failmon.dir', the path to the FailMon folder has to be
+specified as well (it is assumed to be the same on all machines in the
+cluster). Then one only needs to invoke the command:
+
+$ cd <hadoop-dir>
+$ bin/scheduler.py
+
+to start the system.
+
+
+***** Merging HDFS files *****
+
+For the purpose of merging the files created on HDFS by FailMon, the
+following command can be used:
+
+$ cd <hadoop-dir>
+$ bin/failmon.sh --mergeFiles
+
+This will concatenate all files in the HDFS folder (pointed to by the
+'hdfs.upload.dir' property in conf/failmon.properties file) into a
+single file, which will be placed in the same folder. Also the
+location of the Hadoop configuration files should be pointed by the
+property 'hadoop.conf.path' in file conf/failmon.properties. Note that
+these files refer to the HDFS in which have stored the FailMon data
+(which can potentially be different than the one on the cluster we are
+monitoring). Also, the scheduler.py script can be set up to merge the
+HDFS files when their number surpasses a configurable limit (see
+'conf/global.config' file).
+
+Please refer to the FailMon User Manual for more details.

Added: hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh (added)
+++ hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh Fri Aug 15 02:04:07 2008
@@ -0,0 +1,54 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# First we need to determine whether Failmon has been distributed with
+# Hadoop, or as standalone. In the latter case failmon.jar will lie in
+# the current directory.
+
+JARNAME="failmon.jar"
+HADOOPDIR=""
+CLASSPATH=""
+
+if [ `ls -l | grep src | wc -l` == 0 ]
+then
+    # standalone binary
+    if [ -n $1 ] && [ "$1" == "--mergeFiles" ]
+    then
+	jar -ufe $JARNAME org.apache.hadoop.contrib.failmon.HDFSMerger
+        java -jar $JARNAME
+    else
+    	jar -ufe $JARNAME org.apache.hadoop.contrib.failmon.RunOnce
+	java -jar $JARNAME $*
+    fi
+else
+    # distributed with Hadoop
+    HADOOPDIR=`pwd`/../../../
+    CLASSPATH=$CLASSPATH:$HADOOPDIR/build/contrib/failmon/classes
+    CLASSPATH=$CLASSPATH:$HADOOPDIR/build/classes
+    CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/commons-logging-api-1*.jar`
+    CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/commons-logging-1*.jar`
+    CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/log4j-*.jar`
+#    echo $CLASSPATH
+    if [ -n $1 ] && [ "$1" == "--mergeFiles" ]
+    then
+        java -cp $CLASSPATH org.apache.hadoop.contrib.failmon.HDFSMerger
+    else
+        java -cp $CLASSPATH org.apache.hadoop.contrib.failmon.RunOnce $*
+    fi
+fi
+

Added: hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py (added)
+++ hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py Fri Aug 15 02:04:07 2008
@@ -0,0 +1,235 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Schedule FailMon execution for nodes of file hosts.list, according to
+# the properties file conf/global.config.
+
+import time
+import ConfigParser
+import subprocess
+import threading
+import random
+
+jobs = []
+username = "user"
+connections = 10
+failmonDir = ""
+maxFiles = 100
+
+# This class represents a thread that connects to a set of cluster
+# nodes to locally execute monitoring jobs. These jobs are specified
+# as a shell command in the constructor.
+class sshThread (threading.Thread):
+
+    def __init__(self, threadname, username, command, failmonDir):
+        threading.Thread.__init__(self)
+        self.name = threadname
+        self.username = username
+        self.command = command
+        self.failmonDir = failmonDir
+        self.hosts = []
+
+    def addHost(self, host):
+        self.hosts.append(host)
+        
+    def run (self):
+        for host in self.hosts:
+            toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
+            print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...",
+            subprocess.check_call(toRun)
+            print "Done!"
+
+# This class represents a monitoring job. The param member is a string
+# that can be passed in the '--only' list of jobs given to the Java
+# class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
+# node.
+class Job:
+    def __init__(self, param, interval):
+        self.param = param
+        self.interval = interval
+        self.counter = interval
+        return
+
+    def reset(self):
+        self.counter = self.interval
+
+# This function reads the configuration file to get the values of the
+# configuration parameters.
+def getJobs(file):
+    global username
+    global connections
+    global jobs
+    global failmonDir
+    global maxFiles
+    
+    conf = ConfigParser.SafeConfigParser()
+    conf.read(file)
+
+    username = conf.get("Default", "ssh.username")
+    connections = int(conf.get("Default", "max.connections"))
+    failmonDir = conf.get("Default", "failmon.dir")
+    maxFiles = conf.get("Default", "hdfs.files.max")
+    
+    # Hadoop Log
+    interval = int(conf.get("Default", "log.hadoop.interval"))
+
+    if interval != 0:
+        jobs.append(Job("hadoopLog", interval))
+
+    # System Log
+    interval = int(conf.get("Default", "log.system.interval"))
+
+    if interval != 0:
+        jobs.append(Job("systemLog", interval))
+
+    # NICs
+    interval = int(conf.get("Default", "nics.interval"))
+
+    if interval != 0:
+        jobs.append(Job("nics", interval))
+
+    # CPU
+    interval = int(conf.get("Default", "cpu.interval"))
+
+    if interval != 0:
+        jobs.append(Job("cpu", interval))
+
+    # CPU
+    interval = int(conf.get("Default", "disks.interval"))
+
+    if interval != 0:
+        jobs.append(Job("disks", interval))
+
+    # sensors
+    interval = int(conf.get("Default", "sensors.interval"))
+
+    if interval != 0:
+        jobs.append(Job("sensors", interval))
+
+    # upload
+    interval = int(conf.get("Default", "upload.interval"))
+
+    if interval != 0:
+        jobs.append(Job("upload", interval))
+
+    return
+
+
+# Compute the gcd (Greatest Common Divisor) of two integerss
+def GCD(a, b):
+    assert isinstance(a, int)
+    assert isinstance(b, int)
+
+    while a:
+        a, b = b%a, a
+
+    return b
+
+# Compute the gcd (Greatest Common Divisor) of a list of integers
+def listGCD(joblist):
+    assert isinstance(joblist, list)
+
+    if (len(joblist) == 1):
+        return joblist[0].interval
+
+    g = GCD(joblist[0].interval, joblist[1].interval)
+
+    for i in range (2, len(joblist)):
+        g = GCD(g, joblist[i].interval)
+        
+    return g
+
+# Merge all failmon files created on the HDFS into a single file
+def mergeFiles():
+    global username
+    global failmonDir
+    hostList = []
+    hosts = open('./conf/hosts.list', 'r')
+    for host in hosts:
+        hostList.append(host.strip().rstrip())
+    randomHost = random.sample(hostList, 1)
+    mergeCommand = "bin/failmon.sh --mergeFiles"
+    toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
+    print "Invoking command on", randomHost, ":\t", mergeCommand, "...",
+    subprocess.check_call(toRun)
+    print "Done!"
+    return
+
+# The actual scheduling is done here
+def main():
+    getJobs("./conf/global.config")
+
+    for job in jobs:
+        print "Configuration: ", job.param, "every", job.interval, "seconds"
+        
+    globalInterval = listGCD(jobs)
+        
+    while True :
+        time.sleep(globalInterval)
+        params = []
+        
+        for job in jobs:
+            job.counter -= globalInterval
+            
+            if (job.counter <= 0):
+                params.append(job.param)
+                job.reset()
+                
+        if (len(params) == 0):
+            continue;
+                    
+        onlyStr = "--only " + params[0]
+        for i in range(1, len(params)):
+            onlyStr += ',' + params[i] 
+                
+        command = "bin/failmon.sh " + onlyStr
+
+        # execute on all nodes
+        hosts = open('./conf/hosts.list', 'r')
+        threadList = []
+        # create a thread for every connection
+        for i in range(0, connections):
+            threadList.append(sshThread(i, username, command, failmonDir))
+
+        # assign some hosts/connections hosts to every thread
+        cur = 0;
+        for host in hosts:
+            threadList[cur].addHost(host.strip().rstrip())
+            cur += 1
+            if (cur == len(threadList)):
+                cur = 0    
+
+        for ready in threadList:
+            ready.start()
+
+        for ssht in threading.enumerate():
+            if ssht != threading.currentThread():
+                ssht.join()
+
+        # if an upload has been done, then maybe we need to merge the
+        # HDFS files
+        if "upload" in params:
+            mergeFiles()
+
+    return
+
+
+if __name__ == '__main__':
+    main()
+

Added: hadoop/core/trunk/src/contrib/failmon/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/build.xml?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/build.xml (added)
+++ hadoop/core/trunk/src/contrib/failmon/build.xml Fri Aug 15 02:04:07 2008
@@ -0,0 +1,133 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="failmon" default="compile">
+
+  <import file="../build-contrib.xml"/>
+
+  <property name="jarfile" value="${build.dir}/${name}.jar"/>
+
+  <target name="jar" depends="compile" unless="skip.contrib">
+    <!-- Make sure that the hadoop jar has been created -->
+<!-- This works, but causes findbugs to fail
+    <subant antfile="build.xml" target="jar">
+      <fileset dir="../../.." includes="build.xml"/>
+    </subant>
+-->
+    <!-- Copy the required files so that the jar can run independently
+	 of Hadoop source code -->
+    
+    <mkdir dir="lib"/>
+  
+    <copy todir="lib">
+      <fileset dir="${hadoop.root}/lib/"
+	       includes="commons-logging-*, log4j*"/>
+    </copy>
+  
+    <copy todir="lib">
+      <fileset dir="${hadoop.root}/build/"
+	       includes="hadoop-*"/>
+    </copy>
+  
+  <!-- create the list of files to add to the classpath -->
+  <fileset dir="${basedir}" id="class.path">
+    <include name="lib/commons-logging*.jar"/>
+    <include name="lib/log4j*.jar"/>
+    <include name="lib/hadoop-*.jar"/>
+  </fileset>
+  
+  <pathconvert pathsep=" " property="failmon-class-path" refid="class.path">
+    <map from="${basedir}/" to=""/>
+  </pathconvert>
+
+    <echo message="contrib: ${name}"/>
+    <jar jarfile="${jarfile}" basedir="${build.classes}">
+      <manifest>
+        <attribute name="Main-Class" value="org.apache.hadoop.contrib.failmon.RunOnce"/>
+	<attribute name="Class-Path" value="${failmon-class-path}"/> 
+      </manifest>
+    </jar>
+
+  </target>
+
+  
+  <!-- Override test target to copy sample data -->
+  <target name="test" depends="compile-test, compile, compile-examples" if="test.available">
+    <echo message="contrib: ${name}"/>
+    <delete dir="${hadoop.log.dir}"/>
+    <mkdir dir="${hadoop.log.dir}"/>
+    <delete dir="${build.test}/sample"/>
+    <mkdir dir="${build.test}/sample"/>
+    <copy todir="${build.test}/sample">
+      <fileset dir="${root}/sample"/>
+    </copy>
+    <junit
+      printsummary="yes" showoutput="${test.output}" 
+      haltonfailure="no" fork="yes" maxmemory="256m"
+      errorProperty="tests.failed" failureProperty="tests.failed"
+      timeout="${test.timeout}">
+      
+      <sysproperty key="test.build.data" value="${build.test}/data"/>
+      <sysproperty key="build.test" value="${build.test}"/>
+      <sysproperty key="contrib.name" value="${name}"/>
+      
+      <!-- requires fork=yes for: 
+        relative File paths to use the specified user.dir 
+        classpath to use build/contrib/*.jar
+      -->
+      <sysproperty key="user.dir" value="${build.test}/data"/>
+      
+      <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+      <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+      <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+      <classpath refid="test.classpath"/>
+      <formatter type="${test.junit.output.format}" />
+      <batchtest todir="${build.test}" unless="testcase">
+        <fileset dir="${src.test}"
+                 includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+      </batchtest>
+      <batchtest todir="${build.test}" if="testcase">
+        <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+      </batchtest>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+
+  </target>
+  
+  <target name="tar" depends="jar">
+
+    <copy todir=".">
+      <fileset dir="${hadoop.root}/build/contrib/failmon/"
+	       includes="failmon.jar"/>
+    </copy>
+    
+    <tar tarfile="${name}.tar" 
+	 basedir=".." 
+	 includes="${name}/**"
+	 excludes="${name}/${name}.tar.gz, ${name}/src/**, ${name}/logs/**, ${name}/build.xml*"/>
+    <gzip zipfile="${name}.tar.gz" src="${name}.tar"/>
+    <delete file="${name}.tar"/>
+    <delete file="${name}.jar"/>
+
+    <move file="${name}.tar.gz" todir="${build.dir}"/>
+    <echo message= "${hadoop.root}/build/contrib/failmon/${name}.jar"/>
+    
+  </target>
+  
+</project>

Added: hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties Fri Aug 15 02:04:07 2008
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#Logging Implementation
+
+#Log4J
+org.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger
+
+#JDK Logger
+#org.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger

Added: hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties Fri Aug 15 02:04:07 2008
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# HDFS properties
+hdfs.upload.dir = /failmon
+hadoop.conf.path = ../../../conf
+
+# Hadoop Log file properties
+log.hadoop.enabled = true
+log.hadoop.filenames = /home/hadoop/hadoop-0.17.0/logs/
+# set to non-zero only for continous mode:
+log.hadoop.interval = 0
+log.hadoop.dateformat = \\d{4}-\\d{2}-\\d{2}
+log.hadoop.timeformat = \\d{2}:\\d{2}:\\d{2}
+
+# System Log file properties
+log.system.enabled = true
+log.system.filenames = /var/log/messages
+# set to non-zero only for continous mode:
+log.system.interval = 0
+log.system.dateformat = (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+(\\d+)
+log.system.timeformat = \\d{2}:\\d{2}:\\d{2}
+
+# Network Interfaces
+nic.enabled = true
+nic.list = eth0, eth1
+# set to non-zero only for continous mode:
+nic.interval = 0
+
+# CPUs & Motherboard
+cpu.enabled = true
+# set to non-zero only for continous mode:
+cpu.interval = 0
+
+# Disk devices. For all devices listed under disks.list, the corresponding
+# property disk./dev/xxx.source specifies where the output of 
+# "sudo smartctl --all /dev/xxx" can be read by a user. If this property is
+# missing, super-user privileges are assumed and the smartctl command will be 
+# invoked itself.
+
+disks.enabled = true
+disks.list = /dev/sda, /dev/sdb, /dev/sdc, /dev/sdd, /dev/hda, /dev/hdb, /dev/hdc, /dev/hdd
+#disks./dev/sda.source = hda.smart
+# set to non-zero only for continous mode:
+disks.interval = 0
+
+# lm-sensors polling
+sensors.enabled = true
+# set to non-zero only for continous mode:
+sensors.interval = 0
+
+# Executor thread properties	
+executor.interval.min = 1	
+
+# Anonymization properties
+anonymizer.hash.hostnames = false
+anonymizer.hash.ips = false
+anonymizer.hash.filenames = false
+anonymizer.hostname.suffix = apache.org
+
+# Local files options
+local.tmp.filename = failmon.dat
+local.tmp.compression = false
+# set to non-zero only for continous mode:
+local.upload.interval = 0

Added: hadoop/core/trunk/src/contrib/failmon/conf/global.config
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/global.config?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/global.config (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/global.config Fri Aug 15 02:04:07 2008
@@ -0,0 +1,39 @@
+[Default]
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# general settings
+
+# the username to use to connect to cluster nodes
+ssh.username = user
+# the maximum number of SSH connections to keep open at any time
+max.connections = 2
+# the directory in which FailMon lies 
+failmon.dir = /home/user/hadoop-core-trunk/src/contrib/failmon
+# the maximum number of HDFS files to allow FailMon to create. After
+# this limit is surpassed, all HDFS files will be concatenated into
+# one file.
+hdfs.files.max = 100
+
+# iteration intervals
+log.hadoop.interval = 0
+log.system.interval = 0
+nics.interval = 10
+cpu.interval = 10
+disks.interval = 0
+sensors.interval = 0
+upload.interval = 20

Added: hadoop/core/trunk/src/contrib/failmon/conf/hosts.list
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/hosts.list?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/hosts.list (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/hosts.list Fri Aug 15 02:04:07 2008
@@ -0,0 +1,10 @@
+host00
+host01
+host02
+host03
+host04
+host05
+host06
+host07
+host08
+host09

Added: hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties Fri Aug 15 02:04:07 2008
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Define some default values that can be overridden by system properties
+failmon.log.dir=logs
+failmon.log.file=failmon.log
+
+log4j.rootLogger= INFO, simpleFile, console
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+log4j.appender.simpleFile=org.apache.log4j.FileAppender
+log4j.appender.simpleFile.layout=org.apache.log4j.PatternLayout
+log4j.appender.simpleFile.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+log4j.appender.simpleFile.file= ${failmon.log.dir}/${failmon.log.file}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**********************************************************
+ * This class provides anonymization to SerializedRecord objects. It 
+ * anonymizes all hostnames, ip addresses and file names/paths
+ * that appear in EventRecords gathered from the logs
+ * and other system utilities. Such values are hashed using a
+ * cryptographically safe one-way-hash algorithm (MD5).
+ * 
+ **********************************************************/
+
+public class Anonymizer {
+
+  /**
+	 * Anonymize hostnames, ip addresses and file names/paths
+   * that appear in fields of a SerializedRecord.
+   * 
+	 * @param sr the input SerializedRecord
+	 * 
+	 * @return the anonymized SerializedRecord
+	 */  	
+  public static SerializedRecord anonymize(SerializedRecord sr)
+      throws Exception {
+
+    String hostname = sr.get("hostname");
+
+    if (hostname == null)
+      throw new Exception("Malformed SerializedRecord: no hostname found");
+
+    if ("true".equalsIgnoreCase(Environment
+        .getProperty("anonymizer.hash.hostnames"))) {
+      // hash the node's hostname
+      anonymizeField(sr, "message", hostname, "_hn_");
+      anonymizeField(sr, "hostname", hostname, "_hn_");
+      // hash all other hostnames
+      String suffix = Environment.getProperty("anonymizer.hostname.suffix");
+      if (suffix != null)
+        anonymizeField(sr, "message", "(\\S+\\.)*" + suffix, "_hn_");
+    }
+
+    if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.ips"))) {
+      // hash all ip addresses
+      String ipPattern = "(\\d{1,3}\\.){3}\\d{1,3}";
+      anonymizeField(sr, "message", ipPattern, "_ip_");
+      anonymizeField(sr, "ips", ipPattern, "_ip_");
+      // if multiple ips are present for a node:
+      int i = 0;
+      while (sr.get("ips" + "#" + i) != null)
+        anonymizeField(sr, "ips" + "#" + i++, ipPattern, "_ip_");
+
+      if ("NIC".equalsIgnoreCase(sr.get("type")))
+        anonymizeField(sr, "ipAddress", ipPattern, "_ip_");
+    }
+
+    if ("true".equalsIgnoreCase(Environment
+        .getProperty("anonymizer.hash.filenames"))) {
+      // hash every filename present in messages
+      anonymizeField(sr, "message", "\\s+/(\\S+/)*[^:\\s]*", " _fn_");
+      anonymizeField(sr, "message", "\\s+hdfs://(\\S+/)*[^:\\s]*",
+          " hdfs://_fn_");
+    }
+
+    return sr;
+  }
+
+  /**
+   * Anonymize hostnames, ip addresses and file names/paths
+   * that appear in fields of an EventRecord, after it gets
+   * serialized into a SerializedRecord.
+   * 
+   * @param er the input EventRecord
+   * 
+   * @return the anonymized SerializedRecord
+   */   
+  public static SerializedRecord anonymize(EventRecord er) throws Exception {
+    return anonymize(new SerializedRecord(er));
+  }
+
+  
+  private static String anonymizeField(SerializedRecord sr, String fieldName,
+      String pattern, String prefix) {
+    String txt = sr.get(fieldName);
+
+    if (txt == null)
+      return null;
+    else {
+      String anon = getMD5Hash(pattern);
+      sr.set(fieldName, txt.replaceAll(pattern, (prefix == null ? "" : prefix)
+          + anon));
+      return txt;
+    }
+  }
+
+  /**
+   * Create the MD5 digest of an input text.
+   * 
+   * @param text the input text
+   * 
+   * @return the hexadecimal representation of the MD5 digest
+   */   
+  public static String getMD5Hash(String text) {
+    MessageDigest md;
+    byte[] md5hash = new byte[32];
+    try {
+      md = MessageDigest.getInstance("MD5");
+      md.update(text.getBytes("iso-8859-1"), 0, text.length());
+      md5hash = md.digest();
+    } catch (NoSuchAlgorithmException e) {
+      e.printStackTrace();
+    } catch (UnsupportedEncodingException e) {
+      e.printStackTrace();
+    }
+    return convertToHex(md5hash);
+  }
+
+  private static String convertToHex(byte[] data) {
+    StringBuffer buf = new StringBuffer();
+    for (int i = 0; i < data.length; i++) {
+      int halfbyte = (data[i] >>> 4) & 0x0F;
+      int two_halfs = 0;
+      do {
+        if ((0 <= halfbyte) && (halfbyte <= 9))
+          buf.append((char) ('0' + halfbyte));
+        else
+          buf.append((char) ('a' + (halfbyte - 10)));
+        halfbyte = data[i] & 0x0F;
+      } while (two_halfs++ < 1);
+    }
+    return buf.toString();
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.Calendar;
+
+/**********************************************************
+ * Objects of this class parse the /proc/cpuinfo file to 
+ * gather information about present processors in the system.
+ *
+ **********************************************************/
+
+
+public class CPUParser extends ShellParser {
+
+ /**
+  * Constructs a CPUParser
+  */
+  public CPUParser() {
+    super();
+  }
+
+  /**
+   * Reads and parses /proc/cpuinfo and creates an appropriate 
+   * EventRecord that holds the desirable information.
+   * 
+   * @param s unused parameter
+   * 
+   * @return the EventRecord created
+   */
+  public EventRecord query(String s) throws Exception {
+    StringBuffer sb = Environment.runCommand("cat /proc/cpuinfo");
+    EventRecord retval = new EventRecord(InetAddress.getLocalHost()
+        .getCanonicalHostName(), InetAddress.getAllByName(InetAddress.getLocalHost()
+        .getHostName()), Calendar.getInstance(), "CPU", "Unknown", "CPU", "-");
+
+    retval.set("processors", findAll("\\s*processor\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    retval.set("model name", findPattern("\\s*model name\\s*:\\s*(.+)", sb
+        .toString(), 1));
+
+    retval.set("frequency", findAll("\\s*cpu\\s*MHz\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    retval.set("physical id", findAll("\\s*physical\\s*id\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    retval.set("core id", findAll("\\s*core\\s*id\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    return retval;
+  }
+
+  /**
+   * Invokes query() to do the parsing and handles parsing errors. 
+   * 
+   * @return an array of EventRecords that holds one element that represents
+   * the current state of /proc/cpuinfo
+   */
+  
+  public EventRecord[] monitor() {
+
+    EventRecord[] recs = new EventRecord[1];
+
+    try {
+      recs[0] = query(null);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    return recs;
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    return ("CPU Info parser");
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+
+/**********************************************************
+ * This class runs FailMon in a continuous mode on the local
+ * node.
+ * 
+ **********************************************************/
+
+public class Continuous {
+
+  public static void main(String[] args) {
+
+
+    Environment.prepare("failmon.properties");
+
+    Executor ex = new Executor(null);
+    new Thread(ex).start();
+
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.*;
+import org.apache.log4j.PropertyConfigurator;
+
+/**********************************************************
+ * This class provides various methods for interaction with
+ * the configuration and the operating system environment. Also
+ * provides some helper methods for use by other classes in
+ * the package.
+ **********************************************************/
+
+public class Environment {
+
+  public static final int DEFAULT_LOG_INTERVAL = 3600;
+
+  public static final int DEFAULT_POLL_INTERVAL = 360;
+
+  public static int MIN_INTERVAL = 5;
+
+  public static final int MAX_OUTPUT_LENGTH = 51200;
+
+  public static Log LOG;
+  
+  static Properties fmProperties = new Properties();
+
+  static boolean superuser = false;
+
+  static boolean ready = false;
+
+  /**
+   * Initializes structures needed by other methods. Also determines
+   * whether the executing user has superuser privileges. 
+   *  
+   */
+  public static void prepare(String fname) {
+
+    if (!"Linux".equalsIgnoreCase(System.getProperty("os.name"))) {
+      System.err.println("Linux system required for FailMon. Exiting...");
+      System.exit(0);
+    }
+
+    System.setProperty("log4j.configuration", "conf/log4j.properties");
+    PropertyConfigurator.configure("conf/log4j.properties");
+    LOG = LogFactory.getLog("org.apache.hadoop.contrib.failmon");
+    logInfo("********** FailMon started ***********");
+
+    // read parseState file
+    PersistentState.readState("conf/parsing.state");
+    
+    try {
+      FileInputStream propFile = new FileInputStream(fname);
+      fmProperties.load(propFile);
+      propFile.close();
+    } catch (FileNotFoundException e1) {
+      e1.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    ready = true;
+
+    try {
+      String sudo_prompt = "passwd_needed:";
+      String echo_txt = "access_ok";
+      
+      Process p = Runtime.getRuntime().exec("sudo -S -p " + sudo_prompt + " echo " + echo_txt );
+      InputStream inps = p.getInputStream();
+      InputStream errs = p.getErrorStream();
+      
+      while (inps.available() < echo_txt.length() && errs.available() < sudo_prompt.length())
+	Thread.sleep(100);
+
+      byte [] buf;
+      String s;
+      
+      if (inps.available() >= echo_txt.length()) {
+        buf = new byte[inps.available()];
+        inps.read(buf);
+        s = new String(buf);
+        if (s.startsWith(echo_txt)) {
+          superuser = true;
+	  logInfo("Superuser privileges found!");
+	} else {
+	  // no need to read errs
+	  superuser = false;
+	  logInfo("Superuser privileges not found.");
+	}
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Fetches the value of a property from the configuration file.
+   * 
+   *  @param key the name of the property
+   *  
+   *  @return the value of the property, if it exists and
+   *  null otherwise
+   */
+  public static String getProperty(String key) {
+    if (!ready)
+      prepare("conf/failmon.properties");
+    return fmProperties.getProperty(key);
+  }
+
+  /**
+   * Sets the value of a property inthe configuration file.
+   * 
+   *  @param key the name of the property
+   *  @param value the new value for the property
+   *  
+   */
+  
+  public static void setProperty(String key, String value) {
+    fmProperties.setProperty(key, value);
+  }
+
+  /**
+   * Scans the configuration file to determine which monitoring
+   * utilities are available in the system. For each one of them, a
+   * job is created. All such jobs are scheduled and executed by
+   * Executor.
+   * 
+   * @return an ArrayList that contains jobs to be executed by theExecutor. 
+   */
+  public static ArrayList<MonitorJob> getJobs() {
+
+    ArrayList<MonitorJob> monitors = new ArrayList<MonitorJob>();
+    int timeInt = 0;
+
+    // for Hadoop Log parsing
+    String [] fnames_r = getProperty("log.hadoop.filenames").split(",\\s*");
+    String tmp = getProperty("log.hadoop.enabled");
+
+    String [] fnames = expandDirs(fnames_r, ".*(.log).*");
+
+    timeInt = setValue("log.hadoop.interval", DEFAULT_LOG_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp) && fnames[0] != null)
+      for (String fname : fnames) {
+        File f = new File(fname);
+        if (f.exists() && f.canRead()) {
+          monitors.add(new MonitorJob(new HadoopLogParser(fname), "hadoopLog", timeInt));
+	  logInfo("Created Monitor for Hadoop log file: " + f.getAbsolutePath());
+	} else if (!f.exists())
+	  logInfo("Skipping Hadoop log file " + fname + " (file not found)");
+	else
+	  logInfo("Skipping Hadoop log file " + fname + " (permission denied)");
+    }
+    
+    
+    // for System Log parsing
+    fnames_r = getProperty("log.system.filenames").split(",\\s*");
+    tmp = getProperty("log.system.enabled");
+
+    fnames = expandDirs(fnames_r, ".*(messages).*");
+
+    timeInt = setValue("log.system.interval", DEFAULT_LOG_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp))
+      for (String fname : fnames) {
+        File f = new File(fname);
+        if (f.exists() && f.canRead()) {
+          monitors.add(new MonitorJob(new SystemLogParser(fname), "systemLog", timeInt));
+	  logInfo("Created Monitor for System log file: " + f.getAbsolutePath());
+        } else if (!f.exists())
+	  logInfo("Skipping system log file " + fname + " (file not found)");
+	else
+	  logInfo("Skipping system log file " + fname + " (permission denied)");
+      }
+        
+
+    // for network interfaces
+    tmp = getProperty("nic.enabled");
+
+    timeInt = setValue("nics.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp)) {
+      monitors.add(new MonitorJob(new NICParser(), "nics", timeInt));
+      logInfo("Created Monitor for NICs");
+    }
+
+    // for cpu
+    tmp = getProperty("cpu.enabled");
+
+    timeInt = setValue("cpu.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp)) {
+      monitors.add(new MonitorJob(new CPUParser(), "cpu", timeInt));
+      logInfo("Created Monitor for CPUs");
+    }
+
+    // for disks
+    tmp = getProperty("disks.enabled");
+
+    timeInt = setValue("disks.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp)) {
+      // check privileges if a disk with no disks./dev/xxx/.source is found
+      boolean smart_present = checkExistence("smartctl");
+      int disks_ok = 0;
+      String devicesStr = getProperty("disks.list");
+      String[] devices = null;
+
+      if (devicesStr != null)
+        devices = devicesStr.split(",\\s*");
+      
+      for (int i = 0; i< devices.length; i++) {
+        boolean file_present = false;
+        boolean disk_present = false;
+        
+        String fileloc = getProperty("disks." + devices[i] + ".source");
+        if (fileloc != null && fileloc.equalsIgnoreCase("true"))
+          file_present = true;
+        
+        if (!file_present) 
+          if (superuser) {
+              StringBuffer sb = runCommand("sudo smartctl -i " + devices[i]);
+              String patternStr = "[(failed)(device not supported)]";
+              Pattern pattern = Pattern.compile(patternStr);
+              Matcher matcher = pattern.matcher(sb.toString());
+              if (matcher.find(0))
+                disk_present = false;
+              else
+                disk_present = true;            
+          }
+        if (file_present || (disk_present && smart_present)) {
+          disks_ok++;
+        } else
+          devices[i] = null;
+      } 
+      
+      // now remove disks that dont exist
+      StringBuffer resetSB = new StringBuffer();
+      for (int j = 0; j < devices.length; j++) {
+        resetSB.append(devices[j] == null ? "" : devices[j] + ", ");
+	if (devices[j] != null)
+	    logInfo("Found S.M.A.R.T. attributes for disk " + devices[j]);
+      }
+      // fix the property
+      if (resetSB.length() >= 2)
+        setProperty("disks.list", resetSB.substring(0, resetSB.length() - 2));
+      
+      if (disks_ok > 0) {
+        monitors.add(new MonitorJob(new SMARTParser(), "disks", timeInt));
+	logInfo("Created Monitor for S.M.A.R.T disk attributes");
+      }
+    }
+
+    // for lm-sensors
+    tmp = getProperty("sensors.enabled");
+
+    timeInt = setValue("sensors.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp) && checkExistence("sensors")) {
+      monitors.add(new MonitorJob(new SensorsParser(), "sensors", timeInt));
+      logInfo("Created Monitor for lm-sensors output");
+    }
+
+    return monitors;
+  }
+
+  /**
+   * Determines the minimum interval at which the executor thread
+   * needs to wake upto execute jobs. Essentially, this is interval 
+   * equals the GCD of intervals of all scheduled jobs. 
+   * 
+   *  @param monitors the list of scheduled jobs
+   *  
+   *  @return the minimum interval between two scheduled jobs
+   */
+  public static int getInterval(ArrayList<MonitorJob> monitors) {
+    String tmp = getProperty("executor.interval.min");
+    if (tmp != null)
+      MIN_INTERVAL = Integer.parseInt(tmp);
+
+    int[] monIntervals = new int[monitors.size()];
+
+    for (int i = 0; i < monitors.size(); i++)
+      monIntervals[i] = monitors.get(i).interval;
+
+    return Math.max(MIN_INTERVAL, gcd(monIntervals));
+  }
+
+  /**
+   * Checks whether a specific shell command is available
+   * in the system. 
+   * 
+   *  @param cmd the command to check against
+   *
+   *  @return true, if the command is availble, false otherwise
+   */
+  public static boolean checkExistence(String cmd) {
+    StringBuffer sb = runCommand("which " + cmd);
+    if (sb.length() > 1)
+      return true;
+
+    return false;
+  }
+
+  /**
+   * Runs a shell command in the system and provides a StringBuffer
+   * with the output of the command.
+   * 
+   *  @param cmd an array of string that form the command to run 
+   *  
+   *  @return a StringBuffer that contains the output of the command 
+   */
+  public static StringBuffer runCommand(String[] cmd) {
+    StringBuffer retval = new StringBuffer(MAX_OUTPUT_LENGTH);
+    Process p;
+    try {
+      p = Runtime.getRuntime().exec(cmd);
+      InputStream tmp = p.getInputStream();
+      p.waitFor();
+      int c;
+      while ((c = tmp.read()) != -1)
+        retval.append((char) c);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    return retval;
+  }
+
+  /**
+   * Runs a shell command in the system and provides a StringBuffer
+   * with the output of the command.
+   * 
+   *  @param cmd the command to run 
+   *  
+   *  @return a StringBuffer that contains the output of the command 
+   */
+  public static StringBuffer runCommand(String cmd) {
+    return runCommand(cmd.split("\\s+"));
+  }
+
+  /**
+   * Determines the greatest common divisor (GCD) of two integers.
+   * 
+   *  @param m the first integer
+   *  @param n the second integer
+   *  
+   *  @return the greatest common divisor of m and n
+   */
+  public static int gcd(int m, int n) {
+    if (m == 0 && n == 0)
+      return 0;
+    if (m < n) {
+      int t = m;
+      m = n;
+      n = t;
+    }
+    int r = m % n;
+    if (r == 0) {
+      return n;
+    } else {
+      return gcd(n, r);
+    }
+  }
+
+  /**
+   * Determines the greatest common divisor (GCD) of a list
+   * of integers.
+   * 
+   *  @param numbers the list of integers to process
+   *  
+   *  @return the greatest common divisor of all numbers
+   */
+  public static int gcd(int[] numbers) {
+
+    if (numbers.length == 1)
+      return numbers[0];
+
+    int g = gcd(numbers[0], numbers[1]);
+
+    for (int i = 2; i < numbers.length; i++)
+      g = gcd(g, numbers[i]);
+
+    return g;
+  }
+
+  private static String [] expandDirs(String [] input, String patternStr) {
+
+    ArrayList<String> fnames = new ArrayList<String>();
+    Pattern pattern = Pattern.compile(patternStr);
+    Matcher matcher;
+    File f;
+    
+    for (String fname : input) {
+      f = new File(fname);
+      if (f.exists()) {
+	if (f.isDirectory()) {
+	  // add all matching files
+	  File [] fcs = f.listFiles();
+	  for (File fc : fcs) {
+	    matcher = pattern.matcher(fc.getName());
+	    if (matcher.find() && fc.isFile())
+	      fnames.add(fc.getAbsolutePath());
+	  }
+	} else {
+	  // normal file, just add to output
+	  fnames.add(f.getAbsolutePath());
+	}
+      }
+    }
+    return fnames.toArray(input);
+  }
+
+  private static int setValue(String propname, int defaultValue) {
+
+    String v = getProperty(propname);
+
+    if (v != null)
+      return Integer.parseInt(v);
+    else
+      return defaultValue;
+  }
+
+  
+  public static void logInfo(String str) {
+    LOG.info(str);
+  }
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+
+/**********************************************************
+ * Objects of this class represent metrics collected for 
+ * a specific hardware source. Each EventRecord contains a HashMap of 
+ * (key, value) pairs, each of which represents a property of
+ * the metered value. For instance, when parsing a log file, an
+ * EventRecord is created for each log entry, which contains 
+ * the hostname and the ip addresses of the node, timestamp of
+ * the log entry, the actual message etc. Each and every EventRecord
+ * contains the hostname of the machine on which it was collected,
+ * its IP address and the time of collection.
+ * 
+ * The main purpose of this class is to provide a uniform format
+ * for records collected from various system compontents (logs,
+ * ifconfig, smartmontools, lm-sensors etc). All metric values are 
+ * converted into this format after they are collected by a
+ * Monitored object.
+ *
+ **********************************************************/
+
+public class EventRecord {
+
+  HashMap<String, Object> fields;
+
+  /**
+   * Create the EventRecord given the most common properties
+   * among different metric types.
+   */
+  public EventRecord(String _hostname, Object [] _ips, Calendar _timestamp,
+      String _type, String _logLevel, String _source, String _message) {
+    fields = new HashMap<String, Object>();
+    fields.clear();
+    set("hostname", _hostname);
+    set("ips", _ips);
+    set("timestamp", _timestamp);
+    set("type", _type);
+    set("logLevel", _logLevel);
+    set("source", _source);
+    set("message", _message);
+  }
+
+  /**
+   * Create the EventRecord with no fields other than "invalid" as
+   * the hostname. This is only used as a dummy.
+   */
+  public EventRecord() {
+    // creates an invalid record
+    fields = new HashMap<String, Object>();
+    fields.clear();
+    set("hostname", "invalid");
+  }
+
+  /**
+   * Return the HashMap of properties of the EventRecord.
+   * 
+   * @return a HashMap that contains all properties of the record.
+   */
+  public final HashMap<String, Object> getMap() {
+    return fields;
+  }
+
+  /**
+   * Set the value of a property of the EventRecord.
+   * 
+   * @param fieldName the name of the property to set
+   * @param fieldValue the value of the property to set
+   * 
+   */
+  public void set(String fieldName, Object fieldValue) {
+    if (fieldValue != null)
+      fields.put(fieldName, fieldValue);
+  }
+
+  /**
+   * Get the value of a property of the EventRecord.
+   * If the property with the specific key is not found,
+   * null is returned.
+   * 
+   * @param fieldName the name of the property to get.
+   */
+  public Object get(String fieldName) {
+    return fields.get(fieldName);
+  }
+
+  /**
+   * Check if the EventRecord is a valid one, i.e., whether
+   * it represents meaningful metric values.
+   * 
+   * @return true if the EventRecord is a valid one, false otherwise.
+   */
+  public boolean isValid() {
+    return !("invalid".equalsIgnoreCase((String) fields.get("hostname")));
+  }
+
+  /**
+   * Creates and returns a string representation of the object.
+   * 
+   * @return a String representation of the object
+   */
+
+  public String toString() {
+    String retval = "";
+    ArrayList<String> keys = new ArrayList<String>(fields.keySet());
+    Collections.sort(keys);
+
+    for (int i = 0; i < keys.size(); i++) {
+      Object value = fields.get(keys.get(i));
+      if (value == null)
+        retval += keys.get(i) + ":\tnull\n";
+      else if (value instanceof String)
+        retval += keys.get(i) + ":\t" + value + "\n";
+      else if (value instanceof Calendar)
+        retval += keys.get(i) + ":\t" + ((Calendar) value).getTime() + "\n";
+      else if (value instanceof InetAddress[] || value instanceof String []) {
+        retval += "Known IPs:\t";
+        for (InetAddress ip : ((InetAddress[]) value))
+          retval += ip.getHostAddress() + " ";
+        retval += "\n";
+      } else {
+        retval += keys.get(i) + ":\t" + value.toString() + "\n";
+      }
+    }
+    return retval;
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**********************************************************
+ * This class executes monitoring jobs on all nodes of the
+ * cluster, on which we intend to gather failure metrics. 
+ * It is basically a thread that sleeps and periodically wakes
+ * up to execute monitoring jobs and ship all gathered data to 
+ * a "safe" location, which in most cases will be the HDFS 
+ * filesystem of the monitored cluster.
+ * 
+ **********************************************************/
+
+public class Executor implements Runnable {
+
+  public static final int DEFAULT_LOG_INTERVAL = 3600;
+
+  public static final int DEFAULT_POLL_INTERVAL = 360;
+
+  public static int MIN_INTERVAL = 5;
+
+  public static int instances = 0;
+
+  LocalStore lstore;
+
+  ArrayList<MonitorJob> monitors;
+  
+  int interval;
+
+  int upload_interval;
+  int upload_counter;
+  
+  /**
+   * Create an instance of the class and read the configuration
+   * file to determine the set of jobs that will be run and the 
+   * maximum interval for which the thread can sleep before it 
+   * wakes up to execute a monitoring job on the node.
+   * 
+   */ 
+
+  public Executor(Configuration conf) {
+    
+    Environment.prepare("conf/failmon.properties");
+    
+    String localTmpDir;
+    
+    if (conf == null) {
+      // running as a stand-alone application
+      localTmpDir = System.getProperty("java.io.tmpdir");
+      Environment.setProperty("local.tmp.dir", localTmpDir);
+    } else {
+      // running from within Hadoop
+      localTmpDir = conf.get("hadoop.tmp.dir");
+      String hadoopLogPath = System.getProperty("hadoop.log.dir") + "/" + System.getProperty("hadoop.log.file");
+      Environment.setProperty("hadoop.log.file", hadoopLogPath);
+      Environment.setProperty("local.tmp.dir", localTmpDir);
+    }
+    
+    monitors = Environment.getJobs();
+    interval = Environment.getInterval(monitors);
+    upload_interval = LocalStore.UPLOAD_INTERVAL;
+    lstore = new LocalStore();
+    
+    if (Environment.getProperty("local.upload.interval") != null) 
+     upload_interval = Integer.parseInt(Environment.getProperty("local.upload.interval"));
+
+    instances++;
+  }
+
+  public void run() {
+    upload_counter = upload_interval;
+
+    Environment.logInfo("Failmon Executor thread started successfully.");
+    while (true) {
+      try {
+        Thread.sleep(interval * 1000);
+        for (int i = 0; i < monitors.size(); i++) {
+          monitors.get(i).counter -= interval;
+          if (monitors.get(i).counter <= 0) {
+            monitors.get(i).reset();
+            Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...\t");
+            monitors.get(i).job.monitor(lstore);
+          }
+        }
+        upload_counter -= interval;
+        if (upload_counter <= 0) {
+          lstore.upload();
+          upload_counter = upload_interval;
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public void cleanup() {
+    instances--;   
+  }
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class HDFSMerger {
+
+  Configuration hadoopConf;
+  FileSystem hdfs;
+  
+  String hdfsDir;
+  
+  FileStatus [] inputFiles;
+
+  Path outputFilePath;
+  FSDataOutputStream outputFile;
+    
+  boolean compress;
+
+  FileWriter fw;
+
+  BufferedWriter writer;
+
+  public HDFSMerger() throws IOException {
+
+    String hadoopConfPath; 
+
+    if (Environment.getProperty("hadoop.conf.path") == null)
+      hadoopConfPath = "../../../conf";
+    else
+      hadoopConfPath = Environment.getProperty("hadoop.conf.path");
+
+    // Read the configuration for the Hadoop environment
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
+    
+    // determine the local output file name
+    if (Environment.getProperty("local.tmp.filename") == null)
+      Environment.setProperty("local.tmp.filename", "failmon.dat");
+    
+    // determine the upload location
+    hdfsDir = Environment.getProperty("hdfs.upload.dir");
+    if (hdfsDir == null)
+      hdfsDir = "/failmon";
+
+    hdfs = FileSystem.get(hadoopConf);
+    
+    Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir);
+
+    try {
+      if (!hdfs.getFileStatus(hdfsDirPath).isDir()) {
+	Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString());
+	System.exit(0);
+      }
+    } catch (FileNotFoundException e) {
+      Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString());
+    }
+
+    inputFiles = hdfs.listStatus(hdfsDirPath);
+
+    outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-"
+			  + Calendar.getInstance().getTimeInMillis() + ".dat");
+    outputFile = hdfs.create(outputFilePath);
+    
+    for (FileStatus fstatus : inputFiles) {
+      appendFile(fstatus.getPath());
+      hdfs.delete(fstatus.getPath());
+    }
+
+    outputFile.close();
+
+    Environment.logInfo("HDFS file merging complete!");
+  }
+
+  private void appendFile (Path inputPath) throws IOException {
+    
+    FSDataInputStream anyInputFile = hdfs.open(inputPath);
+    InputStream inputFile;
+    byte buffer[] = new byte[4096];
+    
+    if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) {
+      // the file is compressed
+      inputFile = new ZipInputStream(anyInputFile);
+      ((ZipInputStream) inputFile).getNextEntry();
+    } else {
+      inputFile = anyInputFile;
+    }
+    
+    try {
+      int bytesRead = 0;
+      while ((bytesRead = inputFile.read(buffer)) > 0) {
+	outputFile.write(buffer, 0, bytesRead);
+      }
+    } catch (IOException e) {
+      Environment.logInfo("Error while copying file:" + inputPath.toString());
+    } finally {
+      inputFile.close();
+    }    
+  }
+
+  
+  public static void main(String [] args) {
+
+    Environment.prepare("./conf/failmon.properties");
+
+    try {
+      new HDFSMerger();
+    } catch (IOException e) {
+      e.printStackTrace();
+      }
+
+  }
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * An object of this class parses a Hadoop log file to create
+ * appropriate EventRecords. The log file can either be the log 
+ * of a NameNode or JobTracker or DataNode or TaskTracker.
+ * 
+ **********************************************************/
+
+public class HadoopLogParser extends LogParser {
+
+  /**
+   * Create a new parser object and try to find the hostname
+   * of the node that generated the log
+   */
+  public HadoopLogParser(String fname) {
+    super(fname);
+    if ((dateformat = Environment.getProperty("log.hadoop.dateformat")) == null)
+      dateformat = "\\d{4}-\\d{2}-\\d{2}";
+    if ((timeformat = Environment.getProperty("log.hadoop.timeformat")) == null)
+      timeformat = "\\d{2}:\\d{2}:\\d{2}";
+    findHostname();
+  }
+
+  /**
+   * Parses one line of the log. If the line contains a valid 
+   * log entry, then an appropriate EventRecord is returned, after all
+   * relevant fields have been parsed.
+   *
+   *  @param line the log line to be parsed
+   *
+   *  @return the EventRecord representing the log entry of the line. If 
+   *  the line does not contain a valid log entry, then the EventRecord 
+   *  returned has isValid() = false. When the end-of-file has been reached,
+   *  null is returned to the caller.
+   */
+  public EventRecord parseLine(String line) throws IOException {
+    EventRecord retval = null;
+
+    if (line != null) {
+      // process line
+      String patternStr = "(" + dateformat + ")";
+      patternStr += "\\s+";
+      patternStr += "(" + timeformat + ")";
+      patternStr += ".{4}\\s(\\w*)\\s"; // for logLevel
+      patternStr += "\\s*([\\w+\\.?]+)"; // for source
+      patternStr += ":\\s+(.+)"; // for the message
+      Pattern pattern = Pattern.compile(patternStr);
+      Matcher matcher = pattern.matcher(line);
+
+      if (matcher.find(0) && matcher.groupCount() >= 5) {
+        retval = new EventRecord(hostname, ips, parseDate(matcher.group(1),
+            matcher.group(2)),
+	    "HadoopLog",
+	    matcher.group(3), // loglevel
+            matcher.group(4), // source
+            matcher.group(5)); // message
+      } else {
+        retval = new EventRecord();
+      }
+    }
+
+    return retval;
+  }
+
+  /**
+   * Parse a date found in the Hadoop log.
+   * 
+   * @return a Calendar representing the date
+   */
+  protected Calendar parseDate(String strDate, String strTime) {
+    Calendar retval = Calendar.getInstance();
+    // set date
+    String[] fields = strDate.split("-");
+    retval.set(Calendar.YEAR, Integer.parseInt(fields[0]));
+    retval.set(Calendar.MONTH, Integer.parseInt(fields[1]));
+    retval.set(Calendar.DATE, Integer.parseInt(fields[2]));
+    // set time
+    fields = strTime.split(":");
+    retval.set(Calendar.HOUR_OF_DAY, Integer.parseInt(fields[0]));
+    retval.set(Calendar.MINUTE, Integer.parseInt(fields[1]));
+    retval.set(Calendar.SECOND, Integer.parseInt(fields[2]));
+    return retval;
+  }
+
+  /**
+   * Attempt to determine the hostname of the node that created the
+   * log file. This information can be found in the STARTUP_MSG lines 
+   * of the Hadoop log, which are emitted when the node starts.
+   * 
+   */
+  private void findHostname() {
+    String startupInfo = Environment.runCommand(
+        "grep --max-count=1 STARTUP_MSG:\\s*host " + file.getName()).toString();
+    Pattern pattern = Pattern.compile("\\s+(\\w+/.+)\\s+");
+    Matcher matcher = pattern.matcher(startupInfo);
+    if (matcher.find(0)) {
+      hostname = matcher.group(1).split("/")[0];
+      ips = new String[1];
+      ips[0] = matcher.group(1).split("/")[1];
+    }
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    return ("Hadoop Log Parser for file: " + file.getName());
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**********************************************************
+ * This class takes care of the temporary local storage of 
+ * gathered metrics before they get uploaded into HDFS. It writes 
+ * Serialized Records as lines in a temporary file and then 
+ * compresses and uploads it into HDFS.
+ * 
+ **********************************************************/
+
+public class LocalStore {
+
+  public final static char FIELD_SEPARATOR = '|';
+
+  public final static char RECORD_SEPARATOR = '\n';
+
+  public final static String COMPRESSION_SUFFIX = ".zip";
+
+  public final static int UPLOAD_INTERVAL = 600;
+
+  String filename;
+  String hdfsDir;
+
+  boolean compress;
+
+  FileWriter fw;
+
+  BufferedWriter writer;
+
+  /**
+   * Create an instance of the class and read the configuration
+   * file to determine some output parameters. Then, initiate the 
+   * structured needed for the buffered I/O (so that smal appends
+   * can be handled efficiently).
+   * 
+   */ 
+
+  public LocalStore() {
+    // determine the local output file name
+    if (Environment.getProperty("local.tmp.filename") == null)
+      Environment.setProperty("local.tmp.filename", "failmon.dat");
+    
+    // local.tmp.dir has been set by the Executor
+    if (Environment.getProperty("local.tmp.dir") == null)
+      Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir"));
+    
+    filename = Environment.getProperty("local.tmp.dir") + "/" +
+      Environment.getProperty("local.tmp.filename");
+
+    // determine the upload location
+    hdfsDir = Environment.getProperty("hdfs.upload.dir");
+    if (hdfsDir == null)
+      hdfsDir = "/failmon";
+
+    // determine if compression is enabled
+    compress = true;
+    if ("false".equalsIgnoreCase(Environment
+        .getProperty("local.tmp.compression")))
+      compress = false;
+
+    try {
+      fw = new FileWriter(filename, true);
+      writer = new BufferedWriter(fw);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Insert an EventRecord to the local storage, after it
+   * gets serialized and anonymized.
+   * 
+   * @param er the EventRecord to be inserted
+   */ 
+  
+  public void insert(EventRecord er) {
+    SerializedRecord sr = new SerializedRecord(er);
+    try {
+      Anonymizer.anonymize(sr);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    append(sr);
+  }
+
+  /**
+   * Insert an array of EventRecords to the local storage, after they
+   * get serialized and anonymized.
+   * 
+   * @param ers the array of EventRecords to be inserted
+   */
+  public void insert(EventRecord[] ers) {
+    for (EventRecord er : ers)
+      insert(er);
+  }
+
+  private void append(SerializedRecord sr) {
+    try {
+      writer.write(pack(sr).toString());
+      writer.write(RECORD_SEPARATOR);
+      // writer.flush();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Pack a SerializedRecord into an array of bytes
+   * 
+   * @param sr the SerializedRecord to be packed
+   */
+  public static StringBuffer pack(SerializedRecord sr) {
+    StringBuffer sb = new StringBuffer();
+
+    ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet());
+
+    if (sr.isValid())
+      SerializedRecord.arrangeKeys(keys);
+
+    for (int i = 0; i < keys.size(); i++) {
+      String value = sr.fields.get(keys.get(i));
+      sb.append(keys.get(i) + ":" + value);
+      sb.append(FIELD_SEPARATOR);
+    }
+    return sb;
+  }
+
+  /**
+   * Upload the local file store into HDFS, after it 
+   * compressing it. Then a new local file is created 
+   * as a temporary record store.
+   * 
+   */
+  public void upload() {
+    try {
+      writer.flush();
+      if (compress)
+        zipCompress(filename);
+      String remoteName = "failmon-";
+      if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames")))
+        remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-";
+      else
+        remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-"; 
+      remoteName += Calendar.getInstance().getTimeInMillis();//.toString();
+      if (compress)
+	copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX);
+      else
+	copyToHDFS(filename, hdfsDir + "/" + remoteName);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // delete and re-open
+    try {
+      fw.close();
+      fw = new FileWriter(filename);
+      writer = new BufferedWriter(fw);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  /**
+   * Compress a text file using the ZIP compressing algorithm.
+   * 
+   * @param filename the path to the file to be compressed
+   */
+  public static void zipCompress(String filename) throws IOException {
+    FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX);
+    CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32());
+    ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum));
+    out.setComment("Failmon records.");
+
+    BufferedReader in = new BufferedReader(new FileReader(filename));
+    out.putNextEntry(new ZipEntry(new File(filename).getName()));
+    int c;
+    while ((c = in.read()) != -1)
+      out.write(c);
+    in.close();
+
+    out.finish();
+    out.close();
+  }
+
+  /**
+   * Copy a local file to HDFS
+   * 
+   * @param localFile the filename of the local file
+   * @param hdfsFile the HDFS filename to copy to
+   */
+  public static void copyToHDFS(String localFile, String hdfsFile) throws IOException {
+
+    String hadoopConfPath; 
+
+    if (Environment.getProperty("hadoop.conf.path") == null)
+      hadoopConfPath = "../../../conf";
+    else
+      hadoopConfPath = Environment.getProperty("hadoop.conf.path");
+
+    // Read the configuration for the Hadoop environment
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
+
+    // System.out.println(hadoopConf.get("hadoop.tmp.dir"));
+    // System.out.println(hadoopConf.get("fs.default.name"));
+    FileSystem fs = FileSystem.get(hadoopConf);
+
+    // HadoopDFS deals with Path
+    Path inFile = new Path("file://" + localFile);
+    Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile);
+
+     // Read from and write to new file
+    Environment.logInfo("Uploading to HDFS (file " + outFile + ") ...");
+    fs.copyFromLocalFile(false, inFile, outFile);
+  }
+
+  /**
+   * Close the temporary local file
+   * 
+   */ 
+  public void close() {
+    try {
+    writer.flush();
+    writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}