You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/08/15 11:04:10 UTC
svn commit: r686181 [1/2] - in /hadoop/core/trunk: ./ src/contrib/failmon/
src/contrib/failmon/bin/ src/contrib/failmon/conf/ src/contrib/failmon/src/
src/contrib/failmon/src/java/ src/contrib/failmon/src/java/org/
src/contrib/failmon/src/java/org/apac...
Author: dhruba
Date: Fri Aug 15 02:04:07 2008
New Revision: 686181
URL: http://svn.apache.org/viewvc?rev=686181&view=rev
Log:
HADOOP-3585. FailMon package for hardware failure monitoring and
analysis of anomalies. (Ioannis Koltsidas via dhruba)
Added:
hadoop/core/trunk/src/contrib/failmon/
hadoop/core/trunk/src/contrib/failmon/README
hadoop/core/trunk/src/contrib/failmon/bin/
hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh
hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py
hadoop/core/trunk/src/contrib/failmon/build.xml
hadoop/core/trunk/src/contrib/failmon/conf/
hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties
hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties
hadoop/core/trunk/src/contrib/failmon/conf/global.config
hadoop/core/trunk/src/contrib/failmon/conf/hosts.list
hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties
hadoop/core/trunk/src/contrib/failmon/src/
hadoop/core/trunk/src/contrib/failmon/src/java/
hadoop/core/trunk/src/contrib/failmon/src/java/org/
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LogParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/MonitorJob.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Monitored.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/NICParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/OfflineAnonymizer.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/PersistentState.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/RunOnce.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SMARTParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SensorsParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SerializedRecord.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/ShellParser.java
hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SystemLogParser.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/build.xml
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686181&r1=686180&r2=686181&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 15 02:04:07 2008
@@ -86,6 +86,9 @@
number of files/bytes copied in a particular run to support incremental
updates and mirroring. (TszWo (Nicholas), SZE via cdouglas)
+ HADOOP-3585. FailMon package for hardware failure monitoring and
+ analysis of anomalies. (Ioannis Koltsidas via dhruba)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=686181&r1=686180&r2=686181&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Fri Aug 15 02:04:07 2008
@@ -799,7 +799,8 @@
<packageset dir="src/contrib/streaming/src/java"/>
<packageset dir="src/contrib/data_join/src/java"/>
<packageset dir="src/contrib/index/src/java"/>
-
+ <packageset dir="src/contrib/failmon/src/java/"/>
+
<link href="${javadoc.link.java}"/>
<classpath >
@@ -816,7 +817,7 @@
<group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
<group title="contrib: DataJoin" packages="org.apache.hadoop.contrib.utils.join*"/>
<group title="contrib: Index" packages="org.apache.hadoop.contrib.index*"/>
-
+ <group title="contrib: FailMon" packages="org.apache.hadoop.contrib.failmon*"/>
</javadoc>
</target>
Added: hadoop/core/trunk/src/contrib/failmon/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/README?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/README (added)
+++ hadoop/core/trunk/src/contrib/failmon/README Fri Aug 15 02:04:07 2008
@@ -0,0 +1,97 @@
+****************** FailMon Quick Start Guide ***********************
+
+This document is a guide to quickly setting up and running FailMon.
+For more information and details please see the FailMon User Manual.
+
+***** Building FailMon *****
+
+Normally, FailMon lies under <hadoop-dir>/src/contrib/failmon, where
+<hadoop-source-dir> is the Hadoop project root folder. To compile it,
+one can either run ant for the whole Hadoop project, i.e.:
+
+$ cd <hadoop-dir>
+$ ant
+
+or run ant only for FailMon:
+
+$ cd <hadoop-dir>/src/contrib/failmon
+$ ant
+
+The above will compile FailMon and place all class files under
+<hadoop-dir>/build/contrib/failmon/classes.
+
+By invoking:
+
+$ cd <hadoop-dir>/src/contrib/failmon
+$ ant tar
+
+FailMon is packaged as a standalone jar application in
+<hadoop-dir>/src/contrib/failmon/failmon.tar.gz.
+
+
+***** Deploying FailMon *****
+
+There are two ways FailMon can be deployed in a cluster:
+
+a) Within Hadoop, in which case the whole Hadoop package is uploaded
+to the cluster nodes. In that case, nothing else needs to be done on
+individual nodes.
+
+b) Independently of the Hadoop deployment, i.e., by uploading
+failmon.tar.gz to all nodes and uncompressing it. In that case, the
+bin/failmon.sh script needs to be edited; environment variable
+HADOOPDIR should point to the root directory of the Hadoop
+distribution. Also the location of the Hadoop configuration files
+should be pointed by the property 'hadoop.conf.path' in file
+conf/failmon.properties. Note that these files refer to the HDFS in
+which we want to store the FailMon data (which can potentially be
+different than the one on the cluster we are monitoring).
+
+We assume that either way FailMon is placed in the same directory on
+all nodes, which is typical for most clusters. If this is not
+feasible, one should create the same symbolic link on all nodes of the
+cluster, that points to the FailMon directory of each node.
+
+One should also edit the conf/failmon.properties file on each node to
+set his own property values. However, the default values are expected
+to serve most practical cases. Refer to the FailMon User Manual about
+the various properties and configuration parameters.
+
+
+***** Running FailMon *****
+
+In order to run FailMon using a node to do the ad-hoc scheduling of
+monitoring jobs, one needs edit the hosts.list file to specify the
+list of machine hostnames on which FailMon is to be run. Also, in file
+conf/global.config the username used to connect to the machines has to
+be specified (passwordless SSH is assumed) in property 'ssh.username'.
+In property 'failmon.dir', the path to the FailMon folder has to be
+specified as well (it is assumed to be the same on all machines in the
+cluster). Then one only needs to invoke the command:
+
+$ cd <hadoop-dir>
+$ bin/scheduler.py
+
+to start the system.
+
+
+***** Merging HDFS files *****
+
+For the purpose of merging the files created on HDFS by FailMon, the
+following command can be used:
+
+$ cd <hadoop-dir>
+$ bin/failmon.sh --mergeFiles
+
+This will concatenate all files in the HDFS folder (pointed to by the
+'hdfs.upload.dir' property in conf/failmon.properties file) into a
+single file, which will be placed in the same folder. Also the
+location of the Hadoop configuration files should be pointed by the
+property 'hadoop.conf.path' in file conf/failmon.properties. Note that
+these files refer to the HDFS in which have stored the FailMon data
+(which can potentially be different than the one on the cluster we are
+monitoring). Also, the scheduler.py script can be set up to merge the
+HDFS files when their number surpasses a configurable limit (see
+'conf/global.config' file).
+
+Please refer to the FailMon User Manual for more details.
Added: hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh (added)
+++ hadoop/core/trunk/src/contrib/failmon/bin/failmon.sh Fri Aug 15 02:04:07 2008
@@ -0,0 +1,54 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# First we need to determine whether Failmon has been distributed with
+# Hadoop, or as standalone. In the latter case failmon.jar will lie in
+# the current directory.
+
+JARNAME="failmon.jar"
+HADOOPDIR=""
+CLASSPATH=""
+
+if [ `ls -l | grep src | wc -l` == 0 ]
+then
+ # standalone binary
+ if [ -n $1 ] && [ "$1" == "--mergeFiles" ]
+ then
+ jar -ufe $JARNAME org.apache.hadoop.contrib.failmon.HDFSMerger
+ java -jar $JARNAME
+ else
+ jar -ufe $JARNAME org.apache.hadoop.contrib.failmon.RunOnce
+ java -jar $JARNAME $*
+ fi
+else
+ # distributed with Hadoop
+ HADOOPDIR=`pwd`/../../../
+ CLASSPATH=$CLASSPATH:$HADOOPDIR/build/contrib/failmon/classes
+ CLASSPATH=$CLASSPATH:$HADOOPDIR/build/classes
+ CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/commons-logging-api-1*.jar`
+ CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/commons-logging-1*.jar`
+ CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/log4j-*.jar`
+# echo $CLASSPATH
+ if [ -n $1 ] && [ "$1" == "--mergeFiles" ]
+ then
+ java -cp $CLASSPATH org.apache.hadoop.contrib.failmon.HDFSMerger
+ else
+ java -cp $CLASSPATH org.apache.hadoop.contrib.failmon.RunOnce $*
+ fi
+fi
+
Added: hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py (added)
+++ hadoop/core/trunk/src/contrib/failmon/bin/scheduler.py Fri Aug 15 02:04:07 2008
@@ -0,0 +1,235 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Schedule FailMon execution for nodes of file hosts.list, according to
+# the properties file conf/global.config.
+
+import time
+import ConfigParser
+import subprocess
+import threading
+import random
+
+jobs = []
+username = "user"
+connections = 10
+failmonDir = ""
+maxFiles = 100
+
+# This class represents a thread that connects to a set of cluster
+# nodes to locally execute monitoring jobs. These jobs are specified
+# as a shell command in the constructor.
+class sshThread (threading.Thread):
+
+ def __init__(self, threadname, username, command, failmonDir):
+ threading.Thread.__init__(self)
+ self.name = threadname
+ self.username = username
+ self.command = command
+ self.failmonDir = failmonDir
+ self.hosts = []
+
+ def addHost(self, host):
+ self.hosts.append(host)
+
+ def run (self):
+ for host in self.hosts:
+ toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
+ print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...",
+ subprocess.check_call(toRun)
+ print "Done!"
+
+# This class represents a monitoring job. The param member is a string
+# that can be passed in the '--only' list of jobs given to the Java
+# class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
+# node.
+class Job:
+ def __init__(self, param, interval):
+ self.param = param
+ self.interval = interval
+ self.counter = interval
+ return
+
+ def reset(self):
+ self.counter = self.interval
+
+# This function reads the configuration file to get the values of the
+# configuration parameters.
+def getJobs(file):
+ global username
+ global connections
+ global jobs
+ global failmonDir
+ global maxFiles
+
+ conf = ConfigParser.SafeConfigParser()
+ conf.read(file)
+
+ username = conf.get("Default", "ssh.username")
+ connections = int(conf.get("Default", "max.connections"))
+ failmonDir = conf.get("Default", "failmon.dir")
+ maxFiles = conf.get("Default", "hdfs.files.max")
+
+ # Hadoop Log
+ interval = int(conf.get("Default", "log.hadoop.interval"))
+
+ if interval != 0:
+ jobs.append(Job("hadoopLog", interval))
+
+ # System Log
+ interval = int(conf.get("Default", "log.system.interval"))
+
+ if interval != 0:
+ jobs.append(Job("systemLog", interval))
+
+ # NICs
+ interval = int(conf.get("Default", "nics.interval"))
+
+ if interval != 0:
+ jobs.append(Job("nics", interval))
+
+ # CPU
+ interval = int(conf.get("Default", "cpu.interval"))
+
+ if interval != 0:
+ jobs.append(Job("cpu", interval))
+
+ # CPU
+ interval = int(conf.get("Default", "disks.interval"))
+
+ if interval != 0:
+ jobs.append(Job("disks", interval))
+
+ # sensors
+ interval = int(conf.get("Default", "sensors.interval"))
+
+ if interval != 0:
+ jobs.append(Job("sensors", interval))
+
+ # upload
+ interval = int(conf.get("Default", "upload.interval"))
+
+ if interval != 0:
+ jobs.append(Job("upload", interval))
+
+ return
+
+
+# Compute the gcd (Greatest Common Divisor) of two integerss
+def GCD(a, b):
+ assert isinstance(a, int)
+ assert isinstance(b, int)
+
+ while a:
+ a, b = b%a, a
+
+ return b
+
+# Compute the gcd (Greatest Common Divisor) of a list of integers
+def listGCD(joblist):
+ assert isinstance(joblist, list)
+
+ if (len(joblist) == 1):
+ return joblist[0].interval
+
+ g = GCD(joblist[0].interval, joblist[1].interval)
+
+ for i in range (2, len(joblist)):
+ g = GCD(g, joblist[i].interval)
+
+ return g
+
+# Merge all failmon files created on the HDFS into a single file
+def mergeFiles():
+ global username
+ global failmonDir
+ hostList = []
+ hosts = open('./conf/hosts.list', 'r')
+ for host in hosts:
+ hostList.append(host.strip().rstrip())
+ randomHost = random.sample(hostList, 1)
+ mergeCommand = "bin/failmon.sh --mergeFiles"
+ toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
+ print "Invoking command on", randomHost, ":\t", mergeCommand, "...",
+ subprocess.check_call(toRun)
+ print "Done!"
+ return
+
+# The actual scheduling is done here
+def main():
+ getJobs("./conf/global.config")
+
+ for job in jobs:
+ print "Configuration: ", job.param, "every", job.interval, "seconds"
+
+ globalInterval = listGCD(jobs)
+
+ while True :
+ time.sleep(globalInterval)
+ params = []
+
+ for job in jobs:
+ job.counter -= globalInterval
+
+ if (job.counter <= 0):
+ params.append(job.param)
+ job.reset()
+
+ if (len(params) == 0):
+ continue;
+
+ onlyStr = "--only " + params[0]
+ for i in range(1, len(params)):
+ onlyStr += ',' + params[i]
+
+ command = "bin/failmon.sh " + onlyStr
+
+ # execute on all nodes
+ hosts = open('./conf/hosts.list', 'r')
+ threadList = []
+ # create a thread for every connection
+ for i in range(0, connections):
+ threadList.append(sshThread(i, username, command, failmonDir))
+
+ # assign some hosts/connections hosts to every thread
+ cur = 0;
+ for host in hosts:
+ threadList[cur].addHost(host.strip().rstrip())
+ cur += 1
+ if (cur == len(threadList)):
+ cur = 0
+
+ for ready in threadList:
+ ready.start()
+
+ for ssht in threading.enumerate():
+ if ssht != threading.currentThread():
+ ssht.join()
+
+ # if an upload has been done, then maybe we need to merge the
+ # HDFS files
+ if "upload" in params:
+ mergeFiles()
+
+ return
+
+
+if __name__ == '__main__':
+ main()
+
Added: hadoop/core/trunk/src/contrib/failmon/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/build.xml?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/build.xml (added)
+++ hadoop/core/trunk/src/contrib/failmon/build.xml Fri Aug 15 02:04:07 2008
@@ -0,0 +1,133 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project name="failmon" default="compile">
+
+ <import file="../build-contrib.xml"/>
+
+ <property name="jarfile" value="${build.dir}/${name}.jar"/>
+
+ <target name="jar" depends="compile" unless="skip.contrib">
+ <!-- Make sure that the hadoop jar has been created -->
+<!-- This works, but causes findbugs to fail
+ <subant antfile="build.xml" target="jar">
+ <fileset dir="../../.." includes="build.xml"/>
+ </subant>
+-->
+ <!-- Copy the required files so that the jar can run independently
+ of Hadoop source code -->
+
+ <mkdir dir="lib"/>
+
+ <copy todir="lib">
+ <fileset dir="${hadoop.root}/lib/"
+ includes="commons-logging-*, log4j*"/>
+ </copy>
+
+ <copy todir="lib">
+ <fileset dir="${hadoop.root}/build/"
+ includes="hadoop-*"/>
+ </copy>
+
+ <!-- create the list of files to add to the classpath -->
+ <fileset dir="${basedir}" id="class.path">
+ <include name="lib/commons-logging*.jar"/>
+ <include name="lib/log4j*.jar"/>
+ <include name="lib/hadoop-*.jar"/>
+ </fileset>
+
+ <pathconvert pathsep=" " property="failmon-class-path" refid="class.path">
+ <map from="${basedir}/" to=""/>
+ </pathconvert>
+
+ <echo message="contrib: ${name}"/>
+ <jar jarfile="${jarfile}" basedir="${build.classes}">
+ <manifest>
+ <attribute name="Main-Class" value="org.apache.hadoop.contrib.failmon.RunOnce"/>
+ <attribute name="Class-Path" value="${failmon-class-path}"/>
+ </manifest>
+ </jar>
+
+ </target>
+
+
+ <!-- Override test target to copy sample data -->
+ <target name="test" depends="compile-test, compile, compile-examples" if="test.available">
+ <echo message="contrib: ${name}"/>
+ <delete dir="${hadoop.log.dir}"/>
+ <mkdir dir="${hadoop.log.dir}"/>
+ <delete dir="${build.test}/sample"/>
+ <mkdir dir="${build.test}/sample"/>
+ <copy todir="${build.test}/sample">
+ <fileset dir="${root}/sample"/>
+ </copy>
+ <junit
+ printsummary="yes" showoutput="${test.output}"
+ haltonfailure="no" fork="yes" maxmemory="256m"
+ errorProperty="tests.failed" failureProperty="tests.failed"
+ timeout="${test.timeout}">
+
+ <sysproperty key="test.build.data" value="${build.test}/data"/>
+ <sysproperty key="build.test" value="${build.test}"/>
+ <sysproperty key="contrib.name" value="${name}"/>
+
+ <!-- requires fork=yes for:
+ relative File paths to use the specified user.dir
+ classpath to use build/contrib/*.jar
+ -->
+ <sysproperty key="user.dir" value="${build.test}/data"/>
+
+ <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+ <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+ <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+ <classpath refid="test.classpath"/>
+ <formatter type="${test.junit.output.format}" />
+ <batchtest todir="${build.test}" unless="testcase">
+ <fileset dir="${src.test}"
+ includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <batchtest todir="${build.test}" if="testcase">
+ <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+ </batchtest>
+ </junit>
+ <fail if="tests.failed">Tests failed!</fail>
+
+ </target>
+
+ <target name="tar" depends="jar">
+
+ <copy todir=".">
+ <fileset dir="${hadoop.root}/build/contrib/failmon/"
+ includes="failmon.jar"/>
+ </copy>
+
+ <tar tarfile="${name}.tar"
+ basedir=".."
+ includes="${name}/**"
+ excludes="${name}/${name}.tar.gz, ${name}/src/**, ${name}/logs/**, ${name}/build.xml*"/>
+ <gzip zipfile="${name}.tar.gz" src="${name}.tar"/>
+ <delete file="${name}.tar"/>
+ <delete file="${name}.jar"/>
+
+ <move file="${name}.tar.gz" todir="${build.dir}"/>
+ <echo message= "${hadoop.root}/build/contrib/failmon/${name}.jar"/>
+
+ </target>
+
+</project>
Added: hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/commons-logging.properties Fri Aug 15 02:04:07 2008
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#Logging Implementation
+
+#Log4J
+org.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger
+
+#JDK Logger
+#org.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger
Added: hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/failmon.properties Fri Aug 15 02:04:07 2008
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# HDFS properties
+hdfs.upload.dir = /failmon
+hadoop.conf.path = ../../../conf
+
+# Hadoop Log file properties
+log.hadoop.enabled = true
+log.hadoop.filenames = /home/hadoop/hadoop-0.17.0/logs/
+# set to non-zero only for continous mode:
+log.hadoop.interval = 0
+log.hadoop.dateformat = \\d{4}-\\d{2}-\\d{2}
+log.hadoop.timeformat = \\d{2}:\\d{2}:\\d{2}
+
+# System Log file properties
+log.system.enabled = true
+log.system.filenames = /var/log/messages
+# set to non-zero only for continous mode:
+log.system.interval = 0
+log.system.dateformat = (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+(\\d+)
+log.system.timeformat = \\d{2}:\\d{2}:\\d{2}
+
+# Network Interfaces
+nic.enabled = true
+nic.list = eth0, eth1
+# set to non-zero only for continous mode:
+nic.interval = 0
+
+# CPUs & Motherboard
+cpu.enabled = true
+# set to non-zero only for continous mode:
+cpu.interval = 0
+
+# Disk devices. For all devices listed under disks.list, the corresponding
+# property disk./dev/xxx.source specifies where the output of
+# "sudo smartctl --all /dev/xxx" can be read by a user. If this property is
+# missing, super-user privileges are assumed and the smartctl command will be
+# invoked itself.
+
+disks.enabled = true
+disks.list = /dev/sda, /dev/sdb, /dev/sdc, /dev/sdd, /dev/hda, /dev/hdb, /dev/hdc, /dev/hdd
+#disks./dev/sda.source = hda.smart
+# set to non-zero only for continous mode:
+disks.interval = 0
+
+# lm-sensors polling
+sensors.enabled = true
+# set to non-zero only for continous mode:
+sensors.interval = 0
+
+# Executor thread properties
+executor.interval.min = 1
+
+# Anonymization properties
+anonymizer.hash.hostnames = false
+anonymizer.hash.ips = false
+anonymizer.hash.filenames = false
+anonymizer.hostname.suffix = apache.org
+
+# Local files options
+local.tmp.filename = failmon.dat
+local.tmp.compression = false
+# set to non-zero only for continous mode:
+local.upload.interval = 0
Added: hadoop/core/trunk/src/contrib/failmon/conf/global.config
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/global.config?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/global.config (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/global.config Fri Aug 15 02:04:07 2008
@@ -0,0 +1,39 @@
+[Default]
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# general settings
+
+# the username to use to connect to cluster nodes
+ssh.username = user
+# the maximum number of SSH connections to keep open at any time
+max.connections = 2
+# the directory in which FailMon lies
+failmon.dir = /home/user/hadoop-core-trunk/src/contrib/failmon
+# the maximum number of HDFS files to allow FailMon to create. After
+# this limit is surpassed, all HDFS files will be concatenated into
+# one file.
+hdfs.files.max = 100
+
+# iteration intervals
+log.hadoop.interval = 0
+log.system.interval = 0
+nics.interval = 10
+cpu.interval = 10
+disks.interval = 0
+sensors.interval = 0
+upload.interval = 20
Added: hadoop/core/trunk/src/contrib/failmon/conf/hosts.list
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/hosts.list?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/hosts.list (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/hosts.list Fri Aug 15 02:04:07 2008
@@ -0,0 +1,10 @@
+host00
+host01
+host02
+host03
+host04
+host05
+host06
+host07
+host08
+host09
Added: hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties (added)
+++ hadoop/core/trunk/src/contrib/failmon/conf/log4j.properties Fri Aug 15 02:04:07 2008
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Define some default values that can be overridden by system properties
+failmon.log.dir=logs
+failmon.log.file=failmon.log
+
+log4j.rootLogger= INFO, simpleFile, console
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+log4j.appender.simpleFile=org.apache.log4j.FileAppender
+log4j.appender.simpleFile.layout=org.apache.log4j.PatternLayout
+log4j.appender.simpleFile.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+log4j.appender.simpleFile.file= ${failmon.log.dir}/${failmon.log.file}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**********************************************************
+ * This class provides anonymization to SerializedRecord objects. It
+ * anonymizes all hostnames, ip addresses and file names/paths
+ * that appear in EventRecords gathered from the logs
+ * and other system utilities. Such values are hashed using a
+ * cryptographically safe one-way-hash algorithm (MD5).
+ *
+ **********************************************************/
+
+public class Anonymizer {
+
+ /**
+ * Anonymize hostnames, ip addresses and file names/paths
+ * that appear in fields of a SerializedRecord.
+ *
+ * @param sr the input SerializedRecord
+ *
+ * @return the anonymized SerializedRecord
+ */
+ public static SerializedRecord anonymize(SerializedRecord sr)
+ throws Exception {
+
+ String hostname = sr.get("hostname");
+
+ if (hostname == null)
+ throw new Exception("Malformed SerializedRecord: no hostname found");
+
+ if ("true".equalsIgnoreCase(Environment
+ .getProperty("anonymizer.hash.hostnames"))) {
+ // hash the node's hostname
+ anonymizeField(sr, "message", hostname, "_hn_");
+ anonymizeField(sr, "hostname", hostname, "_hn_");
+ // hash all other hostnames
+ String suffix = Environment.getProperty("anonymizer.hostname.suffix");
+ if (suffix != null)
+ anonymizeField(sr, "message", "(\\S+\\.)*" + suffix, "_hn_");
+ }
+
+ if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.ips"))) {
+ // hash all ip addresses
+ String ipPattern = "(\\d{1,3}\\.){3}\\d{1,3}";
+ anonymizeField(sr, "message", ipPattern, "_ip_");
+ anonymizeField(sr, "ips", ipPattern, "_ip_");
+ // if multiple ips are present for a node:
+ int i = 0;
+ while (sr.get("ips" + "#" + i) != null)
+ anonymizeField(sr, "ips" + "#" + i++, ipPattern, "_ip_");
+
+ if ("NIC".equalsIgnoreCase(sr.get("type")))
+ anonymizeField(sr, "ipAddress", ipPattern, "_ip_");
+ }
+
+ if ("true".equalsIgnoreCase(Environment
+ .getProperty("anonymizer.hash.filenames"))) {
+ // hash every filename present in messages
+ anonymizeField(sr, "message", "\\s+/(\\S+/)*[^:\\s]*", " _fn_");
+ anonymizeField(sr, "message", "\\s+hdfs://(\\S+/)*[^:\\s]*",
+ " hdfs://_fn_");
+ }
+
+ return sr;
+ }
+
+ /**
+ * Anonymize hostnames, ip addresses and file names/paths
+ * that appear in fields of an EventRecord, after it gets
+ * serialized into a SerializedRecord.
+ *
+ * @param er the input EventRecord
+ *
+ * @return the anonymized SerializedRecord
+ */
+ public static SerializedRecord anonymize(EventRecord er) throws Exception {
+ return anonymize(new SerializedRecord(er));
+ }
+
+
+ private static String anonymizeField(SerializedRecord sr, String fieldName,
+ String pattern, String prefix) {
+ String txt = sr.get(fieldName);
+
+ if (txt == null)
+ return null;
+ else {
+ String anon = getMD5Hash(pattern);
+ sr.set(fieldName, txt.replaceAll(pattern, (prefix == null ? "" : prefix)
+ + anon));
+ return txt;
+ }
+ }
+
+ /**
+ * Create the MD5 digest of an input text.
+ *
+ * @param text the input text
+ *
+ * @return the hexadecimal representation of the MD5 digest
+ */
+ public static String getMD5Hash(String text) {
+ MessageDigest md;
+ byte[] md5hash = new byte[32];
+ try {
+ md = MessageDigest.getInstance("MD5");
+ md.update(text.getBytes("iso-8859-1"), 0, text.length());
+ md5hash = md.digest();
+ } catch (NoSuchAlgorithmException e) {
+ e.printStackTrace();
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ return convertToHex(md5hash);
+ }
+
+ private static String convertToHex(byte[] data) {
+ StringBuffer buf = new StringBuffer();
+ for (int i = 0; i < data.length; i++) {
+ int halfbyte = (data[i] >>> 4) & 0x0F;
+ int two_halfs = 0;
+ do {
+ if ((0 <= halfbyte) && (halfbyte <= 9))
+ buf.append((char) ('0' + halfbyte));
+ else
+ buf.append((char) ('a' + (halfbyte - 10)));
+ halfbyte = data[i] & 0x0F;
+ } while (two_halfs++ < 1);
+ }
+ return buf.toString();
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.Calendar;
+
+/**********************************************************
+ * Objects of this class parse the /proc/cpuinfo file to
+ * gather information about present processors in the system.
+ *
+ **********************************************************/
+
+
+public class CPUParser extends ShellParser {
+
+ /**
+ * Constructs a CPUParser
+ */
+ public CPUParser() {
+ super();
+ }
+
+ /**
+ * Reads and parses /proc/cpuinfo and creates an appropriate
+ * EventRecord that holds the desirable information.
+ *
+ * @param s unused parameter
+ *
+ * @return the EventRecord created
+ */
+ public EventRecord query(String s) throws Exception {
+ StringBuffer sb = Environment.runCommand("cat /proc/cpuinfo");
+ EventRecord retval = new EventRecord(InetAddress.getLocalHost()
+ .getCanonicalHostName(), InetAddress.getAllByName(InetAddress.getLocalHost()
+ .getHostName()), Calendar.getInstance(), "CPU", "Unknown", "CPU", "-");
+
+ retval.set("processors", findAll("\\s*processor\\s*:\\s*(\\d+)", sb
+ .toString(), 1, ", "));
+
+ retval.set("model name", findPattern("\\s*model name\\s*:\\s*(.+)", sb
+ .toString(), 1));
+
+ retval.set("frequency", findAll("\\s*cpu\\s*MHz\\s*:\\s*(\\d+)", sb
+ .toString(), 1, ", "));
+
+ retval.set("physical id", findAll("\\s*physical\\s*id\\s*:\\s*(\\d+)", sb
+ .toString(), 1, ", "));
+
+ retval.set("core id", findAll("\\s*core\\s*id\\s*:\\s*(\\d+)", sb
+ .toString(), 1, ", "));
+
+ return retval;
+ }
+
+ /**
+ * Invokes query() to do the parsing and handles parsing errors.
+ *
+ * @return an array of EventRecords that holds one element that represents
+ * the current state of /proc/cpuinfo
+ */
+
+ public EventRecord[] monitor() {
+
+ EventRecord[] recs = new EventRecord[1];
+
+ try {
+ recs[0] = query(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return recs;
+ }
+
+ /**
+ * Return a String with information about this class
+ *
+ * @return A String describing this class
+ */
+ public String getInfo() {
+ return ("CPU Info parser");
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+
+/**********************************************************
+ * This class runs FailMon in a continuous mode on the local
+ * node.
+ *
+ **********************************************************/
+
+public class Continuous {
+
+ public static void main(String[] args) {
+
+
+ Environment.prepare("failmon.properties");
+
+ Executor ex = new Executor(null);
+ new Thread(ex).start();
+
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.*;
+import org.apache.log4j.PropertyConfigurator;
+
+/**********************************************************
+ * This class provides various methods for interaction with
+ * the configuration and the operating system environment. Also
+ * provides some helper methods for use by other classes in
+ * the package.
+ **********************************************************/
+
+public class Environment {
+
+ public static final int DEFAULT_LOG_INTERVAL = 3600;
+
+ public static final int DEFAULT_POLL_INTERVAL = 360;
+
+ public static int MIN_INTERVAL = 5;
+
+ public static final int MAX_OUTPUT_LENGTH = 51200;
+
+ public static Log LOG;
+
+ static Properties fmProperties = new Properties();
+
+ static boolean superuser = false;
+
+ static boolean ready = false;
+
+ /**
+ * Initializes structures needed by other methods. Also determines
+ * whether the executing user has superuser privileges.
+ *
+ */
+ public static void prepare(String fname) {
+
+ if (!"Linux".equalsIgnoreCase(System.getProperty("os.name"))) {
+ System.err.println("Linux system required for FailMon. Exiting...");
+ System.exit(0);
+ }
+
+ System.setProperty("log4j.configuration", "conf/log4j.properties");
+ PropertyConfigurator.configure("conf/log4j.properties");
+ LOG = LogFactory.getLog("org.apache.hadoop.contrib.failmon");
+ logInfo("********** FailMon started ***********");
+
+ // read parseState file
+ PersistentState.readState("conf/parsing.state");
+
+ try {
+ FileInputStream propFile = new FileInputStream(fname);
+ fmProperties.load(propFile);
+ propFile.close();
+ } catch (FileNotFoundException e1) {
+ e1.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ ready = true;
+
+ try {
+ String sudo_prompt = "passwd_needed:";
+ String echo_txt = "access_ok";
+
+ Process p = Runtime.getRuntime().exec("sudo -S -p " + sudo_prompt + " echo " + echo_txt );
+ InputStream inps = p.getInputStream();
+ InputStream errs = p.getErrorStream();
+
+ while (inps.available() < echo_txt.length() && errs.available() < sudo_prompt.length())
+ Thread.sleep(100);
+
+ byte [] buf;
+ String s;
+
+ if (inps.available() >= echo_txt.length()) {
+ buf = new byte[inps.available()];
+ inps.read(buf);
+ s = new String(buf);
+ if (s.startsWith(echo_txt)) {
+ superuser = true;
+ logInfo("Superuser privileges found!");
+ } else {
+ // no need to read errs
+ superuser = false;
+ logInfo("Superuser privileges not found.");
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Fetches the value of a property from the configuration file.
+ *
+ * @param key the name of the property
+ *
+ * @return the value of the property, if it exists and
+ * null otherwise
+ */
+ public static String getProperty(String key) {
+ if (!ready)
+ prepare("conf/failmon.properties");
+ return fmProperties.getProperty(key);
+ }
+
+ /**
+ * Sets the value of a property inthe configuration file.
+ *
+ * @param key the name of the property
+ * @param value the new value for the property
+ *
+ */
+
+ public static void setProperty(String key, String value) {
+ fmProperties.setProperty(key, value);
+ }
+
+ /**
+ * Scans the configuration file to determine which monitoring
+ * utilities are available in the system. For each one of them, a
+ * job is created. All such jobs are scheduled and executed by
+ * Executor.
+ *
+ * @return an ArrayList that contains jobs to be executed by theExecutor.
+ */
+ public static ArrayList<MonitorJob> getJobs() {
+
+ ArrayList<MonitorJob> monitors = new ArrayList<MonitorJob>();
+ int timeInt = 0;
+
+ // for Hadoop Log parsing
+ String [] fnames_r = getProperty("log.hadoop.filenames").split(",\\s*");
+ String tmp = getProperty("log.hadoop.enabled");
+
+ String [] fnames = expandDirs(fnames_r, ".*(.log).*");
+
+ timeInt = setValue("log.hadoop.interval", DEFAULT_LOG_INTERVAL);
+
+ if ("true".equalsIgnoreCase(tmp) && fnames[0] != null)
+ for (String fname : fnames) {
+ File f = new File(fname);
+ if (f.exists() && f.canRead()) {
+ monitors.add(new MonitorJob(new HadoopLogParser(fname), "hadoopLog", timeInt));
+ logInfo("Created Monitor for Hadoop log file: " + f.getAbsolutePath());
+ } else if (!f.exists())
+ logInfo("Skipping Hadoop log file " + fname + " (file not found)");
+ else
+ logInfo("Skipping Hadoop log file " + fname + " (permission denied)");
+ }
+
+
+ // for System Log parsing
+ fnames_r = getProperty("log.system.filenames").split(",\\s*");
+ tmp = getProperty("log.system.enabled");
+
+ fnames = expandDirs(fnames_r, ".*(messages).*");
+
+ timeInt = setValue("log.system.interval", DEFAULT_LOG_INTERVAL);
+
+ if ("true".equalsIgnoreCase(tmp))
+ for (String fname : fnames) {
+ File f = new File(fname);
+ if (f.exists() && f.canRead()) {
+ monitors.add(new MonitorJob(new SystemLogParser(fname), "systemLog", timeInt));
+ logInfo("Created Monitor for System log file: " + f.getAbsolutePath());
+ } else if (!f.exists())
+ logInfo("Skipping system log file " + fname + " (file not found)");
+ else
+ logInfo("Skipping system log file " + fname + " (permission denied)");
+ }
+
+
+ // for network interfaces
+ tmp = getProperty("nic.enabled");
+
+ timeInt = setValue("nics.interval", DEFAULT_POLL_INTERVAL);
+
+ if ("true".equalsIgnoreCase(tmp)) {
+ monitors.add(new MonitorJob(new NICParser(), "nics", timeInt));
+ logInfo("Created Monitor for NICs");
+ }
+
+ // for cpu
+ tmp = getProperty("cpu.enabled");
+
+ timeInt = setValue("cpu.interval", DEFAULT_POLL_INTERVAL);
+
+ if ("true".equalsIgnoreCase(tmp)) {
+ monitors.add(new MonitorJob(new CPUParser(), "cpu", timeInt));
+ logInfo("Created Monitor for CPUs");
+ }
+
+ // for disks
+ tmp = getProperty("disks.enabled");
+
+ timeInt = setValue("disks.interval", DEFAULT_POLL_INTERVAL);
+
+ if ("true".equalsIgnoreCase(tmp)) {
+ // check privileges if a disk with no disks./dev/xxx/.source is found
+ boolean smart_present = checkExistence("smartctl");
+ int disks_ok = 0;
+ String devicesStr = getProperty("disks.list");
+ String[] devices = null;
+
+ if (devicesStr != null)
+ devices = devicesStr.split(",\\s*");
+
+ for (int i = 0; i< devices.length; i++) {
+ boolean file_present = false;
+ boolean disk_present = false;
+
+ String fileloc = getProperty("disks." + devices[i] + ".source");
+ if (fileloc != null && fileloc.equalsIgnoreCase("true"))
+ file_present = true;
+
+ if (!file_present)
+ if (superuser) {
+ StringBuffer sb = runCommand("sudo smartctl -i " + devices[i]);
+ String patternStr = "[(failed)(device not supported)]";
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(sb.toString());
+ if (matcher.find(0))
+ disk_present = false;
+ else
+ disk_present = true;
+ }
+ if (file_present || (disk_present && smart_present)) {
+ disks_ok++;
+ } else
+ devices[i] = null;
+ }
+
+ // now remove disks that dont exist
+ StringBuffer resetSB = new StringBuffer();
+ for (int j = 0; j < devices.length; j++) {
+ resetSB.append(devices[j] == null ? "" : devices[j] + ", ");
+ if (devices[j] != null)
+ logInfo("Found S.M.A.R.T. attributes for disk " + devices[j]);
+ }
+ // fix the property
+ if (resetSB.length() >= 2)
+ setProperty("disks.list", resetSB.substring(0, resetSB.length() - 2));
+
+ if (disks_ok > 0) {
+ monitors.add(new MonitorJob(new SMARTParser(), "disks", timeInt));
+ logInfo("Created Monitor for S.M.A.R.T disk attributes");
+ }
+ }
+
+ // for lm-sensors
+ tmp = getProperty("sensors.enabled");
+
+ timeInt = setValue("sensors.interval", DEFAULT_POLL_INTERVAL);
+
+ if ("true".equalsIgnoreCase(tmp) && checkExistence("sensors")) {
+ monitors.add(new MonitorJob(new SensorsParser(), "sensors", timeInt));
+ logInfo("Created Monitor for lm-sensors output");
+ }
+
+ return monitors;
+ }
+
+ /**
+ * Determines the minimum interval at which the executor thread
+ * needs to wake upto execute jobs. Essentially, this is interval
+ * equals the GCD of intervals of all scheduled jobs.
+ *
+ * @param monitors the list of scheduled jobs
+ *
+ * @return the minimum interval between two scheduled jobs
+ */
+ public static int getInterval(ArrayList<MonitorJob> monitors) {
+ String tmp = getProperty("executor.interval.min");
+ if (tmp != null)
+ MIN_INTERVAL = Integer.parseInt(tmp);
+
+ int[] monIntervals = new int[monitors.size()];
+
+ for (int i = 0; i < monitors.size(); i++)
+ monIntervals[i] = monitors.get(i).interval;
+
+ return Math.max(MIN_INTERVAL, gcd(monIntervals));
+ }
+
+ /**
+ * Checks whether a specific shell command is available
+ * in the system.
+ *
+ * @param cmd the command to check against
+ *
+ * @return true, if the command is availble, false otherwise
+ */
+ public static boolean checkExistence(String cmd) {
+ StringBuffer sb = runCommand("which " + cmd);
+ if (sb.length() > 1)
+ return true;
+
+ return false;
+ }
+
+ /**
+ * Runs a shell command in the system and provides a StringBuffer
+ * with the output of the command.
+ *
+ * @param cmd an array of string that form the command to run
+ *
+ * @return a StringBuffer that contains the output of the command
+ */
+ public static StringBuffer runCommand(String[] cmd) {
+ StringBuffer retval = new StringBuffer(MAX_OUTPUT_LENGTH);
+ Process p;
+ try {
+ p = Runtime.getRuntime().exec(cmd);
+ InputStream tmp = p.getInputStream();
+ p.waitFor();
+ int c;
+ while ((c = tmp.read()) != -1)
+ retval.append((char) c);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return retval;
+ }
+
+ /**
+ * Runs a shell command in the system and provides a StringBuffer
+ * with the output of the command.
+ *
+ * @param cmd the command to run
+ *
+ * @return a StringBuffer that contains the output of the command
+ */
+ public static StringBuffer runCommand(String cmd) {
+ return runCommand(cmd.split("\\s+"));
+ }
+
+ /**
+ * Determines the greatest common divisor (GCD) of two integers.
+ *
+ * @param m the first integer
+ * @param n the second integer
+ *
+ * @return the greatest common divisor of m and n
+ */
+ public static int gcd(int m, int n) {
+ if (m == 0 && n == 0)
+ return 0;
+ if (m < n) {
+ int t = m;
+ m = n;
+ n = t;
+ }
+ int r = m % n;
+ if (r == 0) {
+ return n;
+ } else {
+ return gcd(n, r);
+ }
+ }
+
+ /**
+ * Determines the greatest common divisor (GCD) of a list
+ * of integers.
+ *
+ * @param numbers the list of integers to process
+ *
+ * @return the greatest common divisor of all numbers
+ */
+ public static int gcd(int[] numbers) {
+
+ if (numbers.length == 1)
+ return numbers[0];
+
+ int g = gcd(numbers[0], numbers[1]);
+
+ for (int i = 2; i < numbers.length; i++)
+ g = gcd(g, numbers[i]);
+
+ return g;
+ }
+
+ private static String [] expandDirs(String [] input, String patternStr) {
+
+ ArrayList<String> fnames = new ArrayList<String>();
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher;
+ File f;
+
+ for (String fname : input) {
+ f = new File(fname);
+ if (f.exists()) {
+ if (f.isDirectory()) {
+ // add all matching files
+ File [] fcs = f.listFiles();
+ for (File fc : fcs) {
+ matcher = pattern.matcher(fc.getName());
+ if (matcher.find() && fc.isFile())
+ fnames.add(fc.getAbsolutePath());
+ }
+ } else {
+ // normal file, just add to output
+ fnames.add(f.getAbsolutePath());
+ }
+ }
+ }
+ return fnames.toArray(input);
+ }
+
+ private static int setValue(String propname, int defaultValue) {
+
+ String v = getProperty(propname);
+
+ if (v != null)
+ return Integer.parseInt(v);
+ else
+ return defaultValue;
+ }
+
+
+ public static void logInfo(String str) {
+ LOG.info(str);
+ }
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+
+/**********************************************************
+ * Objects of this class represent metrics collected for
+ * a specific hardware source. Each EventRecord contains a HashMap of
+ * (key, value) pairs, each of which represents a property of
+ * the metered value. For instance, when parsing a log file, an
+ * EventRecord is created for each log entry, which contains
+ * the hostname and the ip addresses of the node, timestamp of
+ * the log entry, the actual message etc. Each and every EventRecord
+ * contains the hostname of the machine on which it was collected,
+ * its IP address and the time of collection.
+ *
+ * The main purpose of this class is to provide a uniform format
+ * for records collected from various system compontents (logs,
+ * ifconfig, smartmontools, lm-sensors etc). All metric values are
+ * converted into this format after they are collected by a
+ * Monitored object.
+ *
+ **********************************************************/
+
+public class EventRecord {
+
+ HashMap<String, Object> fields;
+
+ /**
+ * Create the EventRecord given the most common properties
+ * among different metric types.
+ */
+ public EventRecord(String _hostname, Object [] _ips, Calendar _timestamp,
+ String _type, String _logLevel, String _source, String _message) {
+ fields = new HashMap<String, Object>();
+ fields.clear();
+ set("hostname", _hostname);
+ set("ips", _ips);
+ set("timestamp", _timestamp);
+ set("type", _type);
+ set("logLevel", _logLevel);
+ set("source", _source);
+ set("message", _message);
+ }
+
+ /**
+ * Create the EventRecord with no fields other than "invalid" as
+ * the hostname. This is only used as a dummy.
+ */
+ public EventRecord() {
+ // creates an invalid record
+ fields = new HashMap<String, Object>();
+ fields.clear();
+ set("hostname", "invalid");
+ }
+
+ /**
+ * Return the HashMap of properties of the EventRecord.
+ *
+ * @return a HashMap that contains all properties of the record.
+ */
+ public final HashMap<String, Object> getMap() {
+ return fields;
+ }
+
+ /**
+ * Set the value of a property of the EventRecord.
+ *
+ * @param fieldName the name of the property to set
+ * @param fieldValue the value of the property to set
+ *
+ */
+ public void set(String fieldName, Object fieldValue) {
+ if (fieldValue != null)
+ fields.put(fieldName, fieldValue);
+ }
+
+ /**
+ * Get the value of a property of the EventRecord.
+ * If the property with the specific key is not found,
+ * null is returned.
+ *
+ * @param fieldName the name of the property to get.
+ */
+ public Object get(String fieldName) {
+ return fields.get(fieldName);
+ }
+
+ /**
+ * Check if the EventRecord is a valid one, i.e., whether
+ * it represents meaningful metric values.
+ *
+ * @return true if the EventRecord is a valid one, false otherwise.
+ */
+ public boolean isValid() {
+ return !("invalid".equalsIgnoreCase((String) fields.get("hostname")));
+ }
+
+ /**
+ * Creates and returns a string representation of the object.
+ *
+ * @return a String representation of the object
+ */
+
+ public String toString() {
+ String retval = "";
+ ArrayList<String> keys = new ArrayList<String>(fields.keySet());
+ Collections.sort(keys);
+
+ for (int i = 0; i < keys.size(); i++) {
+ Object value = fields.get(keys.get(i));
+ if (value == null)
+ retval += keys.get(i) + ":\tnull\n";
+ else if (value instanceof String)
+ retval += keys.get(i) + ":\t" + value + "\n";
+ else if (value instanceof Calendar)
+ retval += keys.get(i) + ":\t" + ((Calendar) value).getTime() + "\n";
+ else if (value instanceof InetAddress[] || value instanceof String []) {
+ retval += "Known IPs:\t";
+ for (InetAddress ip : ((InetAddress[]) value))
+ retval += ip.getHostAddress() + " ";
+ retval += "\n";
+ } else {
+ retval += keys.get(i) + ":\t" + value.toString() + "\n";
+ }
+ }
+ return retval;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**********************************************************
+ * This class executes monitoring jobs on all nodes of the
+ * cluster, on which we intend to gather failure metrics.
+ * It is basically a thread that sleeps and periodically wakes
+ * up to execute monitoring jobs and ship all gathered data to
+ * a "safe" location, which in most cases will be the HDFS
+ * filesystem of the monitored cluster.
+ *
+ **********************************************************/
+
+public class Executor implements Runnable {
+
+ public static final int DEFAULT_LOG_INTERVAL = 3600;
+
+ public static final int DEFAULT_POLL_INTERVAL = 360;
+
+ public static int MIN_INTERVAL = 5;
+
+ public static int instances = 0;
+
+ LocalStore lstore;
+
+ ArrayList<MonitorJob> monitors;
+
+ int interval;
+
+ int upload_interval;
+ int upload_counter;
+
+ /**
+ * Create an instance of the class and read the configuration
+ * file to determine the set of jobs that will be run and the
+ * maximum interval for which the thread can sleep before it
+ * wakes up to execute a monitoring job on the node.
+ *
+ */
+
+ public Executor(Configuration conf) {
+
+ Environment.prepare("conf/failmon.properties");
+
+ String localTmpDir;
+
+ if (conf == null) {
+ // running as a stand-alone application
+ localTmpDir = System.getProperty("java.io.tmpdir");
+ Environment.setProperty("local.tmp.dir", localTmpDir);
+ } else {
+ // running from within Hadoop
+ localTmpDir = conf.get("hadoop.tmp.dir");
+ String hadoopLogPath = System.getProperty("hadoop.log.dir") + "/" + System.getProperty("hadoop.log.file");
+ Environment.setProperty("hadoop.log.file", hadoopLogPath);
+ Environment.setProperty("local.tmp.dir", localTmpDir);
+ }
+
+ monitors = Environment.getJobs();
+ interval = Environment.getInterval(monitors);
+ upload_interval = LocalStore.UPLOAD_INTERVAL;
+ lstore = new LocalStore();
+
+ if (Environment.getProperty("local.upload.interval") != null)
+ upload_interval = Integer.parseInt(Environment.getProperty("local.upload.interval"));
+
+ instances++;
+ }
+
+ public void run() {
+ upload_counter = upload_interval;
+
+ Environment.logInfo("Failmon Executor thread started successfully.");
+ while (true) {
+ try {
+ Thread.sleep(interval * 1000);
+ for (int i = 0; i < monitors.size(); i++) {
+ monitors.get(i).counter -= interval;
+ if (monitors.get(i).counter <= 0) {
+ monitors.get(i).reset();
+ Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...\t");
+ monitors.get(i).job.monitor(lstore);
+ }
+ }
+ upload_counter -= interval;
+ if (upload_counter <= 0) {
+ lstore.upload();
+ upload_counter = upload_interval;
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void cleanup() {
+ instances--;
+ }
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class HDFSMerger {
+
+ Configuration hadoopConf;
+ FileSystem hdfs;
+
+ String hdfsDir;
+
+ FileStatus [] inputFiles;
+
+ Path outputFilePath;
+ FSDataOutputStream outputFile;
+
+ boolean compress;
+
+ FileWriter fw;
+
+ BufferedWriter writer;
+
+ public HDFSMerger() throws IOException {
+
+ String hadoopConfPath;
+
+ if (Environment.getProperty("hadoop.conf.path") == null)
+ hadoopConfPath = "../../../conf";
+ else
+ hadoopConfPath = Environment.getProperty("hadoop.conf.path");
+
+ // Read the configuration for the Hadoop environment
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
+ hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
+
+ // determine the local output file name
+ if (Environment.getProperty("local.tmp.filename") == null)
+ Environment.setProperty("local.tmp.filename", "failmon.dat");
+
+ // determine the upload location
+ hdfsDir = Environment.getProperty("hdfs.upload.dir");
+ if (hdfsDir == null)
+ hdfsDir = "/failmon";
+
+ hdfs = FileSystem.get(hadoopConf);
+
+ Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir);
+
+ try {
+ if (!hdfs.getFileStatus(hdfsDirPath).isDir()) {
+ Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString());
+ System.exit(0);
+ }
+ } catch (FileNotFoundException e) {
+ Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString());
+ }
+
+ inputFiles = hdfs.listStatus(hdfsDirPath);
+
+ outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-"
+ + Calendar.getInstance().getTimeInMillis() + ".dat");
+ outputFile = hdfs.create(outputFilePath);
+
+ for (FileStatus fstatus : inputFiles) {
+ appendFile(fstatus.getPath());
+ hdfs.delete(fstatus.getPath());
+ }
+
+ outputFile.close();
+
+ Environment.logInfo("HDFS file merging complete!");
+ }
+
+ private void appendFile (Path inputPath) throws IOException {
+
+ FSDataInputStream anyInputFile = hdfs.open(inputPath);
+ InputStream inputFile;
+ byte buffer[] = new byte[4096];
+
+ if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) {
+ // the file is compressed
+ inputFile = new ZipInputStream(anyInputFile);
+ ((ZipInputStream) inputFile).getNextEntry();
+ } else {
+ inputFile = anyInputFile;
+ }
+
+ try {
+ int bytesRead = 0;
+ while ((bytesRead = inputFile.read(buffer)) > 0) {
+ outputFile.write(buffer, 0, bytesRead);
+ }
+ } catch (IOException e) {
+ Environment.logInfo("Error while copying file:" + inputPath.toString());
+ } finally {
+ inputFile.close();
+ }
+ }
+
+
+ public static void main(String [] args) {
+
+ Environment.prepare("./conf/failmon.properties");
+
+ try {
+ new HDFSMerger();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * An object of this class parses a Hadoop log file to create
+ * appropriate EventRecords. The log file can either be the log
+ * of a NameNode or JobTracker or DataNode or TaskTracker.
+ *
+ **********************************************************/
+
+public class HadoopLogParser extends LogParser {
+
+ /**
+ * Create a new parser object and try to find the hostname
+ * of the node that generated the log
+ */
+ public HadoopLogParser(String fname) {
+ super(fname);
+ if ((dateformat = Environment.getProperty("log.hadoop.dateformat")) == null)
+ dateformat = "\\d{4}-\\d{2}-\\d{2}";
+ if ((timeformat = Environment.getProperty("log.hadoop.timeformat")) == null)
+ timeformat = "\\d{2}:\\d{2}:\\d{2}";
+ findHostname();
+ }
+
+ /**
+ * Parses one line of the log. If the line contains a valid
+ * log entry, then an appropriate EventRecord is returned, after all
+ * relevant fields have been parsed.
+ *
+ * @param line the log line to be parsed
+ *
+ * @return the EventRecord representing the log entry of the line. If
+ * the line does not contain a valid log entry, then the EventRecord
+ * returned has isValid() = false. When the end-of-file has been reached,
+ * null is returned to the caller.
+ */
+ public EventRecord parseLine(String line) throws IOException {
+ EventRecord retval = null;
+
+ if (line != null) {
+ // process line
+ String patternStr = "(" + dateformat + ")";
+ patternStr += "\\s+";
+ patternStr += "(" + timeformat + ")";
+ patternStr += ".{4}\\s(\\w*)\\s"; // for logLevel
+ patternStr += "\\s*([\\w+\\.?]+)"; // for source
+ patternStr += ":\\s+(.+)"; // for the message
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(line);
+
+ if (matcher.find(0) && matcher.groupCount() >= 5) {
+ retval = new EventRecord(hostname, ips, parseDate(matcher.group(1),
+ matcher.group(2)),
+ "HadoopLog",
+ matcher.group(3), // loglevel
+ matcher.group(4), // source
+ matcher.group(5)); // message
+ } else {
+ retval = new EventRecord();
+ }
+ }
+
+ return retval;
+ }
+
+ /**
+ * Parse a date found in the Hadoop log.
+ *
+ * @return a Calendar representing the date
+ */
+ protected Calendar parseDate(String strDate, String strTime) {
+ Calendar retval = Calendar.getInstance();
+ // set date
+ String[] fields = strDate.split("-");
+ retval.set(Calendar.YEAR, Integer.parseInt(fields[0]));
+ retval.set(Calendar.MONTH, Integer.parseInt(fields[1]));
+ retval.set(Calendar.DATE, Integer.parseInt(fields[2]));
+ // set time
+ fields = strTime.split(":");
+ retval.set(Calendar.HOUR_OF_DAY, Integer.parseInt(fields[0]));
+ retval.set(Calendar.MINUTE, Integer.parseInt(fields[1]));
+ retval.set(Calendar.SECOND, Integer.parseInt(fields[2]));
+ return retval;
+ }
+
+ /**
+ * Attempt to determine the hostname of the node that created the
+ * log file. This information can be found in the STARTUP_MSG lines
+ * of the Hadoop log, which are emitted when the node starts.
+ *
+ */
+ private void findHostname() {
+ String startupInfo = Environment.runCommand(
+ "grep --max-count=1 STARTUP_MSG:\\s*host " + file.getName()).toString();
+ Pattern pattern = Pattern.compile("\\s+(\\w+/.+)\\s+");
+ Matcher matcher = pattern.matcher(startupInfo);
+ if (matcher.find(0)) {
+ hostname = matcher.group(1).split("/")[0];
+ ips = new String[1];
+ ips[0] = matcher.group(1).split("/")[1];
+ }
+ }
+
+ /**
+ * Return a String with information about this class
+ *
+ * @return A String describing this class
+ */
+ public String getInfo() {
+ return ("Hadoop Log Parser for file: " + file.getName());
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java?rev=686181&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java (added)
+++ hadoop/core/trunk/src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java Fri Aug 15 02:04:07 2008
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**********************************************************
+ * This class takes care of the temporary local storage of
+ * gathered metrics before they get uploaded into HDFS. It writes
+ * Serialized Records as lines in a temporary file and then
+ * compresses and uploads it into HDFS.
+ *
+ **********************************************************/
+
+public class LocalStore {
+
+ public final static char FIELD_SEPARATOR = '|';
+
+ public final static char RECORD_SEPARATOR = '\n';
+
+ public final static String COMPRESSION_SUFFIX = ".zip";
+
+ public final static int UPLOAD_INTERVAL = 600;
+
+ String filename;
+ String hdfsDir;
+
+ boolean compress;
+
+ FileWriter fw;
+
+ BufferedWriter writer;
+
+ /**
+ * Create an instance of the class and read the configuration
+ * file to determine some output parameters. Then, initiate the
+ * structured needed for the buffered I/O (so that smal appends
+ * can be handled efficiently).
+ *
+ */
+
+ public LocalStore() {
+ // determine the local output file name
+ if (Environment.getProperty("local.tmp.filename") == null)
+ Environment.setProperty("local.tmp.filename", "failmon.dat");
+
+ // local.tmp.dir has been set by the Executor
+ if (Environment.getProperty("local.tmp.dir") == null)
+ Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir"));
+
+ filename = Environment.getProperty("local.tmp.dir") + "/" +
+ Environment.getProperty("local.tmp.filename");
+
+ // determine the upload location
+ hdfsDir = Environment.getProperty("hdfs.upload.dir");
+ if (hdfsDir == null)
+ hdfsDir = "/failmon";
+
+ // determine if compression is enabled
+ compress = true;
+ if ("false".equalsIgnoreCase(Environment
+ .getProperty("local.tmp.compression")))
+ compress = false;
+
+ try {
+ fw = new FileWriter(filename, true);
+ writer = new BufferedWriter(fw);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Insert an EventRecord to the local storage, after it
+ * gets serialized and anonymized.
+ *
+ * @param er the EventRecord to be inserted
+ */
+
+ public void insert(EventRecord er) {
+ SerializedRecord sr = new SerializedRecord(er);
+ try {
+ Anonymizer.anonymize(sr);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ append(sr);
+ }
+
+ /**
+ * Insert an array of EventRecords to the local storage, after they
+ * get serialized and anonymized.
+ *
+ * @param ers the array of EventRecords to be inserted
+ */
+ public void insert(EventRecord[] ers) {
+ for (EventRecord er : ers)
+ insert(er);
+ }
+
+ private void append(SerializedRecord sr) {
+ try {
+ writer.write(pack(sr).toString());
+ writer.write(RECORD_SEPARATOR);
+ // writer.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Pack a SerializedRecord into an array of bytes
+ *
+ * @param sr the SerializedRecord to be packed
+ */
+ public static StringBuffer pack(SerializedRecord sr) {
+ StringBuffer sb = new StringBuffer();
+
+ ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet());
+
+ if (sr.isValid())
+ SerializedRecord.arrangeKeys(keys);
+
+ for (int i = 0; i < keys.size(); i++) {
+ String value = sr.fields.get(keys.get(i));
+ sb.append(keys.get(i) + ":" + value);
+ sb.append(FIELD_SEPARATOR);
+ }
+ return sb;
+ }
+
+ /**
+ * Upload the local file store into HDFS, after it
+ * compressing it. Then a new local file is created
+ * as a temporary record store.
+ *
+ */
+ public void upload() {
+ try {
+ writer.flush();
+ if (compress)
+ zipCompress(filename);
+ String remoteName = "failmon-";
+ if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames")))
+ remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-";
+ else
+ remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-";
+ remoteName += Calendar.getInstance().getTimeInMillis();//.toString();
+ if (compress)
+ copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX);
+ else
+ copyToHDFS(filename, hdfsDir + "/" + remoteName);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // delete and re-open
+ try {
+ fw.close();
+ fw = new FileWriter(filename);
+ writer = new BufferedWriter(fw);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Compress a text file using the ZIP compressing algorithm.
+ *
+ * @param filename the path to the file to be compressed
+ */
+ public static void zipCompress(String filename) throws IOException {
+ FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX);
+ CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32());
+ ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum));
+ out.setComment("Failmon records.");
+
+ BufferedReader in = new BufferedReader(new FileReader(filename));
+ out.putNextEntry(new ZipEntry(new File(filename).getName()));
+ int c;
+ while ((c = in.read()) != -1)
+ out.write(c);
+ in.close();
+
+ out.finish();
+ out.close();
+ }
+
+ /**
+ * Copy a local file to HDFS
+ *
+ * @param localFile the filename of the local file
+ * @param hdfsFile the HDFS filename to copy to
+ */
+ public static void copyToHDFS(String localFile, String hdfsFile) throws IOException {
+
+ String hadoopConfPath;
+
+ if (Environment.getProperty("hadoop.conf.path") == null)
+ hadoopConfPath = "../../../conf";
+ else
+ hadoopConfPath = Environment.getProperty("hadoop.conf.path");
+
+ // Read the configuration for the Hadoop environment
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
+ hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
+
+ // System.out.println(hadoopConf.get("hadoop.tmp.dir"));
+ // System.out.println(hadoopConf.get("fs.default.name"));
+ FileSystem fs = FileSystem.get(hadoopConf);
+
+ // HadoopDFS deals with Path
+ Path inFile = new Path("file://" + localFile);
+ Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile);
+
+ // Read from and write to new file
+ Environment.logInfo("Uploading to HDFS (file " + outFile + ") ...");
+ fs.copyFromLocalFile(false, inFile, outFile);
+ }
+
+ /**
+ * Close the temporary local file
+ *
+ */
+ public void close() {
+ try {
+ writer.flush();
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}