You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/10/06 09:31:12 UTC

svn commit: r822153 [1/3] - in /hadoop/hdfs/trunk: ./ src/contrib/ src/contrib/raid/ src/contrib/raid/conf/ src/contrib/raid/ivy/ src/contrib/raid/lib/ src/contrib/raid/src/ src/contrib/raid/src/java/ src/contrib/raid/src/java/org/ src/contrib/raid/src...

Author: dhruba
Date: Tue Oct  6 07:31:11 2009
New Revision: 822153

URL: http://svn.apache.org/viewvc?rev=822153&view=rev
Log:
HDFS-503. This patch implements an optional layer over HDFS that 
implements offline erasure-coding.  It can be used to reduce the 
total storage requirements of HDFS.  (dhruba)


Added:
    hadoop/hdfs/trunk/src/contrib/raid/
    hadoop/hdfs/trunk/src/contrib/raid/README
    hadoop/hdfs/trunk/src/contrib/raid/build.xml
    hadoop/hdfs/trunk/src/contrib/raid/conf/
    hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml
    hadoop/hdfs/trunk/src/contrib/raid/ivy/
    hadoop/hdfs/trunk/src/contrib/raid/ivy.xml
    hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties
    hadoop/hdfs/trunk/src/contrib/raid/lib/
    hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar   (with props)
    hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar   (with props)
    hadoop/hdfs/trunk/src/contrib/raid/src/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
    hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
    hadoop/hdfs/trunk/src/contrib/raid/src/test/
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
    hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/contrib/build.xml

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=822153&r1=822152&r2=822153&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Oct  6 07:31:11 2009
@@ -6,6 +6,10 @@
 
   NEW FEATURES
 
+  HDFS-503. This patch implements an optional layer over HDFS that 
+  implements offline erasure-coding.  It can be used to reduce the 
+  total storage requirements of HDFS.  (dhruba)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

Modified: hadoop/hdfs/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/build.xml?rev=822153&r1=822152&r2=822153&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/build.xml (original)
+++ hadoop/hdfs/trunk/src/contrib/build.xml Tue Oct  6 07:31:11 2009
@@ -46,6 +46,7 @@
   <!-- ====================================================== -->
   <target name="test">
     <subant target="test">
+      <fileset dir="." includes="raid/build.xml"/>
       <fileset dir="." includes="hdfsproxy/build.xml"/>
     </subant>
   </target>

Added: hadoop/hdfs/trunk/src/contrib/raid/README
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/README?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/README (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/README Tue Oct  6 07:31:11 2009
@@ -0,0 +1,176 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License.  You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.  See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements a Distributed Raid File System. It is used alongwith
+an instance of the Hadoop Distributed File System (HDFS). It can be used to
+provide better protection against data corruption. It can also be used to
+reduce the total storage requirements of HDFS. 
+
+Distributed Raid File System consists of two main software components. The first component 
+is the RaidNode, a daemon that creates parity files from specified HDFS files. 
+The second component "raidfs" is a software that is layered over a HDFS client and it 
+intercepts all calls that an application makes to the HDFS client. If HDFS encounters
+corrupted data while reading a file, the raidfs client detects it; it uses the
+relevant parity blocks to recover the corrupted data (if possible) and returns
+the data to the application. The application is completely transparent to the
+fact that parity data was used to satisfy it's read request.
+
+The primary use of this feature is to save disk space for HDFS files. 
+HDFS typically stores data in triplicate.
+The Distributed Raid File System can be configured in such a way that a set of
+data blocks of a file are combined together to form one or more parity blocks.
+This allows one to reduce the replication factor of a HDFS file from 3 to 2
+while keeping the failure probabilty relatively same as before. This typically
+results in saving 25% to 30% of storage space in a HDFS cluster. 
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING and CONFIGURING:
+
+The entire code is packaged in the form of a single jar file hadoop-*-raid.jar.
+To use HDFS Raid, you need to put the above mentioned jar file on
+the CLASSPATH. The easiest way is to copy the hadoop-*-raid.jar
+from HADOOP_HOME/build/contrib/raid to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+There is a single configuration file named raid.xml that describes the HDFS 
+path(s) that you want to raid. A sample of this file can be found in 
+sc/contrib/raid/conf/raid.xml. Please edit the entries in this file to list the 
+path(s) that you want to raid. Then, edit the hdfs-site.xml file for
+your installation to include a reference to this raid.xml. You can add the
+following to your hdfs-site.xml
+        <property>
+          <name>raid.config.file</name>
+          <value>/mnt/hdfs/DFS/conf/raid.xml</value>
+          <description>This is needed by the RaidNode </description>
+        </property>
+
+Please add an entry to your hdfs-site.xml to enable hdfs clients to use the
+parity bits to recover corrupted data.
+
+       <property>
+         <name>fs.hdfs.impl</name>
+         <value>org.apache.hadoop.dfs.DistributedRaidFileSystem</value>
+         <description>The FileSystem for hdfs: uris.</description>
+       </property>
+
+
+--------------------------------------------------------------------------------
+
+OPTIONAL CONFIGIURATION:
+
+The following properties can be set in hdfs-site.xml to further tune you configuration:
+
+    Specifies the location where parity files are located. 
+        <property>
+          <name>hdfs.raid.locations</name>
+          <value>hdfs://newdfs.data:8000/raid</value>
+          <description>The location for parity files. If this is
+          is not defined, then defaults to /raid. 
+          </descrition>
+        </property>
+
+    Specify the parity stripe length 
+        <property>
+          <name>hdfs.raid.stripeLength</name>
+          <value>10</value>
+          <description>The number of blocks in a file to be combined into 
+          a single raid parity block. The default value is 5. The lower
+          the number the greater is the disk space you will save when you
+          enable raid.
+          </description>
+        </property>
+
+    Specify RaidNode to not use a map-reduce cluster for raiding files in parallel.
+        <property>
+          <name>fs.raidnode.local</name>
+          <value>true</value>
+          <description>If you do not want to use your map-reduce cluster to
+          raid files in parallel, then specify "true". By default, this
+          value is false, which means that the RaidNode uses the default
+          map-reduce cluster to generate parity blocks.
+          </description>
+        </property>
+
+
+    Specify the periodicy at which the RaidNode re-calculates (if necessary)
+    the parity blocks
+        <property>
+          <name>raid.policy.rescan.interval</name>
+          <value>5000</value>
+          <description>Specify the periodicity in milliseconds after which
+          all source paths are rescanned and parity blocks recomputed if 
+          necessary. By default, this value is 1 hour.
+          </description>
+        </property>
+
+    By default, the DistributedRaidFileSystem assumes that the underlying file
+    system is the DistributedFileSystem. If you want to layer the DistributedRaidFileSystem
+    over some other file system, then define a property named fs.raid.underlyingfs.impl
+    that specifies the name of the underlying class. For example, if you want to layer
+    The DistributedRaidFileSystem over an instance of the NewFileSystem, then
+        <property>
+          <name>fs.raid.underlyingfs.impl</name>
+          <value>org.apche.hadoop.new.NewFileSystem</value>
+          <description>Specify the filesystem that is layered immediately below the
+          DistributedRaidFileSystem. By default, this value is DistributedFileSystem.
+          </description>
+
+
+--------------------------------------------------------------------------------
+
+ADMINISTRATION:
+
+The Distributed Raid File System  provides support for administration at runtime without
+any downtime to cluster services.  It is possible to add/delete new paths to be raided without
+interrupting any load on the cluster. If you change raid.xml, its contents will be
+reload within seconds and the new contents will take effect immediately.
+
+Designate one machine in your cluster to run the RaidNode software. You can run this daemon
+on any machine irrespective of whether that machine is running any other hadoop daemon or not. 
+You can start the RaidNode by running the following on the selected machine:
+nohup $HADOOP_HOME/bin/hadoop org.apache.hadoop.raid.RaidNode >> /xxx/logs/hadoop-root-raidnode-hadoop.xxx.com.log &
+
+Run fsckraid periodically (being developed as part of another JIRA). This valudates parity
+blocsk of a file.
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+The RaidNode periodically scans all the specified paths in the configuration
+file. For each path, it recursively scans all files that have more than 2 blocks
+and that has not been modified during the last few hours (default is 24 hours). 
+It picks the specified number of blocks (as specified by the stripe size),
+from the file, generates a parity block by combining them and
+stores the results as another HDFS file in the specified destination 
+directory. There is a one-to-one mapping between a HDFS
+file and its parity file. The RaidNode also periodically finds parity files
+that are orphaned and deletes them.
+
+The Distributed Raid FileSystem is layered over a DistributedFileSystem
+instance intercepts all calls that go into HDFS. HDFS throws a ChecksumException
+or a BlocMissingException when a file read encounters bad data. The layered
+Distributed Raid FileSystem catches these exceptions, locates the corresponding
+parity file, extract the original data from the parity files and feeds the
+extracted data back to the application in a completely transparent way.
+
+The layered Distributed Raid FileSystem does not fix the data-loss that it
+encounters while serving data. It merely make the application transparently
+use the parity blocks to re-create the original data. A command line tool 
+"fsckraid" is currently under development that will fix the corrupted files 
+by extracting the data from the associated parity files. An adminstrator 
+can run "fsckraid" manually as and when needed.

Added: hadoop/hdfs/trunk/src/contrib/raid/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/build.xml?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/build.xml (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/build.xml Tue Oct  6 07:31:11 2009
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="raid" default="jar" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+  <import file="../build-contrib.xml"/>
+
+  <!-- the unit test classpath -->
+  <path id="contrib.classpath.raid">
+    <pathelement location="${hadoop.root}/src/contrib/raid/lib"/>
+    <path refid="contrib-classpath"/>
+  </path>
+
+  <target name="test" depends="compile,compile-test,test-junit" description="Automated Test Framework" if="test.available"/>
+
+  <target name="test-junit" depends="compile,compile-test" if="test.available">
+    <junit showoutput="${test.output}" fork="yes" printsummary="yes" errorProperty="tests.failed" 
+           haltonfailure="no" failureProperty="tests.failed">
+
+        <classpath refid="test.classpath"/>
+        <sysproperty key="test.build.data" value="${build.test}/data"/>
+              <sysproperty key="build.test" value="${build.test}"/>
+              <sysproperty key="user.dir" value="${build.test}/data"/>
+              <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+              <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+              <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+        <sysproperty key="test.src.dir" value="${test.src.dir}"/>
+        <formatter type="plain" />
+        <batchtest todir="${build.test}" unless="testcase">
+           <fileset dir="${src.test}">
+             <include name="**/Test*.java"/>
+           </fileset>
+        </batchtest>
+        <batchtest todir="${build.test}" if="testcase">
+            <fileset dir="${src.test}">
+                <include name="**/${testcase}.java"/>
+            </fileset>
+         </batchtest>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+ </target>
+
+</project>
+

Added: hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml Tue Oct  6 07:31:11 2009
@@ -0,0 +1,58 @@
+   <configuration>
+    <srcPath prefix="hdfs://dfs1.xxx.com:8000/user/dhruba/">
+      <policy name = "dhruba">
+        <property>
+          <name>srcReplication</name>
+          <value>3</value>
+          <description> pick files for RAID only if their replication factor is
+                        greater than or equal to this value.
+          </description>
+        </property>
+        <property>
+          <name>targetReplication</name>
+          <value>2</value>
+          <description> after RAIDing, decrease the replication factor of a file to 
+                        this value.
+          </description>
+        </property>
+        <property>
+          <name>metaReplication</name>
+          <value>2</value>
+          <description> the replication factor of the RAID meta file
+          </description>
+        </property>
+        <property>
+          <name>modTimePeriod</name>
+          <value>3600000</value>
+          <description> time (milliseconds) after a file is modified to make it a
+                        candidate for RAIDing
+          </description>
+        </property>
+      </policy>
+    </srcPath>
+    <srcPath prefix="hdfs://dfs1.xxx.com:9000/warehouse/table1">
+      <policy name = "table1">
+        <property>
+          <name>targetReplication</name>
+          <value>1</value>
+          <description> after RAIDing, decrease the replication factor of a file to 
+                        this value.
+          </description>
+        </property>
+        <property>
+          <name>metaReplication</name>
+          <value>2</value>
+          <description> the replication factor of the RAID meta file
+          </description>
+        </property>
+        <property>
+          <name>modTimePeriod</name>
+          <value>3600000</value>
+          <description> time (milliseconds) after a file is modified to make it a
+                        candidate for RAIDing
+          </description>
+        </property>
+      </policy>
+    </srcPath>
+   </configuration>
+

Added: hadoop/hdfs/trunk/src/contrib/raid/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/ivy.xml?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/ivy.xml (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/ivy.xml Tue Oct  6 07:31:11 2009
@@ -0,0 +1,284 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}" revision="${version}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+    <description>
+        Hadoop Core
+    </description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact"
+      extends="client,server,s3-server,kfs"/>
+
+    <conf name="mandatory" description="contains the critical  dependencies"
+      extends="commons-logging,log4j"/>
+
+    <!--
+    These public configurations contain the core dependencies for running hadoop client or server.
+    The server is effectively a superset of the client.
+    -->
+    <conf name="client" description="client-side dependencies"
+      extends="mandatory,httpclient"/>
+    <conf name="server" description="server-side dependencies"
+      extends="client"/>
+    <conf name="s3-client" description="dependencies for working with S3/EC2 infrastructure"
+      extends="client"/>
+    <conf name="s3-server" description="dependencies for running on S3/EC2 infrastructure"
+      extends="s3-client,server"/>
+    <conf name="kfs" description="dependencies for KFS file system support"/>
+    <conf name="ftp" description="dependencies for workign with FTP filesytems"
+              extends="mandatory"/>
+   <conf name="jetty" description="Jetty provides the in-VM HTTP daemon" extends="commons-logging"/>
+
+    <!--Private configurations. -->
+
+    <conf name="common" visibility="private" extends="runtime,mandatory,httpclient,ftp,jetty"
+		      description="common artifacts"/>
+    <conf name="javadoc" visibility="private" description="artiracts required while performing doc generation"
+      extends="common,mandatory,jetty,lucene"/>
+    <!--Testing pulls in everything-->
+    <conf name="test" extends="common,s3-server,kfs" visibility="private"
+      description="the classpath needed to run tests"/>
+
+    <conf name="test-hdfswithmr" extends="test" visibility="private"
+      description="the classpath needed to run tests"/>
+
+    <conf name="releaseaudit" visibility="private"
+	description="Artifacts required for releaseaudit target"/>
+     
+    <conf name="commons-logging" visibility="private"/>
+    <conf name="httpclient" visibility="private" extends="commons-logging"/>
+    <conf name="log4j" visibility="private"/>
+    <conf name="lucene" visibility="private"/>
+    <conf name="jdiff" visibility="private" extends="log4j,s3-client,jetty,server"/>
+    <conf name="checkstyle" visibility="private"/>
+
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+
+ <!--used client side-->
+    <dependency org="commons-cli"
+      name="commons-cli"
+      rev="${commons-cli.version}"
+      conf="client->default"/>
+    <dependency org="checkstyle"
+      name="checkstyle"
+      rev="${checkstyle.version}"
+      conf="checkstyle->default"/>
+    <dependency org="jdiff"
+      name="jdiff"
+      rev="${jdiff.version}"
+      conf="jdiff->default"/>
+    <dependency org="xerces"
+      name="xerces"
+      rev="${xerces.version}"
+      conf="jdiff->default">
+    </dependency>
+
+    <dependency org="xmlenc"
+      name="xmlenc"
+      rev="${xmlenc.version}"
+      conf="server->default"/>
+
+    <!--Configuration: httpclient-->
+
+    <!--
+    commons-httpclient asks for too many files.
+    All it needs is commons-codec and commons-logging JARs
+    -->
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="httpclient->master">
+    </dependency>
+
+    <dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="httpclient->default"/>
+
+    <dependency org="commons-net"
+      name="commons-net"
+      rev="${commons-net.version}"
+      conf="ftp->default"/>
+
+    <!--Configuration: Jetty -->
+
+<!-- <dependency org="javax.servlet"
+      name="servlet-api"
+      rev="${servlet-api.version}"
+      conf="jetty->master"/>   -->
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="jetty->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="jetty->master"/>
+
+    <dependency org="tomcat"
+      name="jasper-runtime"
+      rev="${jasper.version}"
+      conf="jetty->master"/>
+    <dependency org="tomcat"
+      name="jasper-compiler"
+      rev="${jasper.version}"
+      conf="jetty->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-api-2.1"
+      rev="${jetty.version}"
+      conf="jetty->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-2.1"
+      rev="${jetty.version}"
+      conf="jetty->master"/>
+    <dependency org="commons-el"
+      name="commons-el"
+      rev="${commons-el.version}"
+      conf="jetty->master"/>
+
+
+    <!--Configuration: commons-logging -->
+
+    <!--it is essential that only the master JAR of commons logging
+    is pulled in, as its dependencies are usually a mess, including things
+    like out of date servlet APIs, bits of Avalon, etc.
+    -->
+    <dependency org="commons-logging"
+      name="commons-logging"
+      rev="${commons-logging.version}"
+      conf="commons-logging->master"/>
+
+
+    <!--Configuration: commons-logging -->
+
+    <!--log4J is not optional until commons-logging.properties is stripped out of the JAR -->
+    <dependency org="log4j"
+      name="log4j"
+      rev="${log4j.version}"
+      conf="log4j->master"/>
+
+    <!--Configuration: s3-client -->
+    <!--there are two jets3t projects in the repository; this one goes up to 0.6 and
+    is assumed to be the live one-->
+    <dependency org="net.java.dev.jets3t"
+      name="jets3t"
+      rev="${jets3t.version}"
+      conf="s3-client->master"/>
+    <dependency org="commons-net"
+      name="commons-net"
+      rev="${commons-net.version}"
+      conf="s3-client->master"/> 
+    <dependency org="org.mortbay.jetty"
+      name="servlet-api-2.5"
+      rev="${servlet-api-2.5.version}"
+      conf="s3-client->master"/>
+    <dependency org="net.sf.kosmosfs"
+      name="kfs"
+      rev="${kfs.version}"
+      conf="kfs->default"/>
+
+    <!--Configuration: test -->
+    <!--artifacts needed for testing -->
+
+    <dependency org="org.apache.ftpserver"
+      name="ftplet-api"
+      rev="${ftplet-api.version}"
+      conf="test->default"/>
+    <dependency org="org.apache.mina"
+      name="mina-core"
+      rev="${mina-core.version}"
+      conf="test->default"/>
+    <dependency org="org.apache.ftpserver"
+      name="ftpserver-core"
+      rev="${ftpserver-core.version}"
+      conf="test->default"/>
+
+    <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+    <dependency org="org.apache.rat"
+      name="apache-rat-tasks"
+      rev="${rats-lib.version}"
+      conf="releaseaudit->default"/>
+    <dependency org="commons-lang"
+      name="commons-lang"
+      rev="${commons-lang.version}"
+      conf="releaseaudit->default"/>
+    <dependency org="commons-collections"
+      name="commons-collections"
+      rev="${commons-collections.version}"
+      conf="releaseaudit->default"/>
+    <dependency org="hsqldb"
+      name="hsqldb"
+      rev="${hsqldb.version}"
+      conf="common->default"/>
+    <dependency org="org.apache.lucene"
+      name="lucene-core"
+      rev="${lucene-core.version}"
+      conf="javadoc->default"/> 
+    <dependency org="commons-logging"
+      name="commons-logging-api"
+      rev="${commons-logging-api.version}"
+      conf="common->default"/>
+    <dependency org="org.slf4j"
+      name="slf4j-api"
+      rev="${slf4j-api.version}"
+      conf="common->master"/>
+    <dependency org="org.apache.hadoop"
+      name="avro"
+      rev="1.0.0"
+      conf="common->default"/>
+    <dependency org="org.eclipse.jdt"
+      name="core"
+      rev="${core.version}"
+      conf="common->master"/>
+    <dependency org="oro"
+      name="oro"
+      rev="${oro.version}"
+      conf="common->default"/>
+    <dependency org="org.slf4j"
+      name="slf4j-log4j12"
+      rev="${slf4j-log4j12.version}"
+      conf="common->master">
+    </dependency>
+    <dependency org="org.aspectj"
+      name="aspectjrt"
+      rev="${aspectj.version}"
+      conf="common->default">
+    </dependency>
+    <dependency org="org.aspectj"
+      name="aspectjtools"
+      rev="${aspectj.version}"
+      conf="common->default">
+    </dependency>
+    </dependencies>
+  
+</ivy-module>

Added: hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties Tue Oct  6 07:31:11 2009
@@ -0,0 +1,18 @@
+#This properties file lists the versions of the various artifacts used by hadoop.
+#It drives ivy and the generation of a maven POM
+#These are the versions of our dependencies (in alphabetical order)
+ivy.version=2.0.0-rc2
+
+log4j.version=1.2.15
+slf4j-api.version=1.4.3
+slf4j-log4j12.version=1.4.3
+jetty.version=6.1.14
+jetty-util.version=6.1.14
+servlet-api-2.5.version=6.1.14
+cactus.version=1.8.0
+commons-logging.version=1.1
+commons-logging-api.version=1.0.4
+junit.version=3.8.2
+jsp.version=2.1
+core.version=3.1.1
+xmlenc.version=0.52
\ No newline at end of file

Added: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar?rev=822153&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar?rev=822153&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.DataInput;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.hdfs.BlockMissingException;
+
+/**
+ * This is an implementation of the Hadoop  RAID Filesystem. This FileSystem 
+ * wraps an instance of the DistributedFileSystem.
+ * If a file is corrupted, this FileSystem uses the parity blocks to 
+ * regenerate the bad block.
+ */
+
+public class DistributedRaidFileSystem extends FilterFileSystem {
+
+  // these are alternate locations that can be used for read-only access
+  Path[]     alternates;
+  Configuration conf;
+  int stripeLength;
+
+  DistributedRaidFileSystem() throws IOException {
+  }
+
+  DistributedRaidFileSystem(FileSystem fs) throws IOException {
+    super(fs);
+    alternates = null;
+    stripeLength = 0;
+  }
+
+  /* Initialize a Raid FileSystem
+   */
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+    this.conf = conf;
+
+    String alt = conf.get("hdfs.raid.locations");
+    
+    // If no alternates are specified, then behave absolutely same as 
+    // the original file system.
+    if (alt == null || alt.length() == 0) {
+      LOG.info("hdfs.raid.locations not defined. Using defaults...");
+      alt = RaidNode.DEFAULT_RAID_LOCATION;
+    }
+
+    // fs.alternate.filesystem.prefix can be of the form:
+    // "hdfs://host:port/myPrefixPath, file:///localPrefix,hftp://host1:port1/"
+    String[] strs  = alt.split(",");
+    if (strs == null || strs.length == 0) {
+      LOG.info("hdfs.raid.locations badly defined. Ignoring...");
+      return;
+    }
+
+    // find stripe length configured
+    stripeLength = conf.getInt("hdfs.raid.stripeLength", RaidNode.DEFAULT_STRIPE_LENGTH);
+    if (stripeLength == 0) {
+      LOG.info("dfs.raid.stripeLength is incorrectly defined to be " + 
+               stripeLength + " Ignoring...");
+      return;
+    }
+
+    // create a reference to all underlying alternate path prefix
+    alternates = new Path[strs.length];
+    for (int i = 0; i < strs.length; i++) {
+      alternates[i] = new Path(strs[i].trim());
+      alternates[i] = alternates[i].makeQualified(fs);
+    }
+  }
+
+  /*
+   * Returns the underlying filesystem
+   */
+  public FileSystem getFileSystem() throws IOException {
+    return fs;
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    ExtFSDataInputStream fd = new ExtFSDataInputStream(conf, this, alternates, f,
+                                                       stripeLength, bufferSize);
+    return fd;
+  }
+
+  public void close() throws IOException {
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch(IOException ie) {
+        //this might already be closed, ignore
+      }
+    }
+  }
+
+  /**
+   * Layered filesystem input stream. This input stream tries reading
+   * from alternate locations if it encoumters read errors in the primary location.
+   */
+  private static class ExtFSDataInputStream extends FSDataInputStream {
+    /**
+     * Create an input stream that wraps all the reads/positions/seeking.
+     */
+    private static class ExtFsInputStream extends FSInputStream {
+
+      //The underlying data input stream that the
+      // underlying filesystem will return.
+      private FSDataInputStream underLyingStream;
+      private byte[] oneBytebuff = new byte[1];
+      private int nextLocation;
+      private DistributedRaidFileSystem lfs;
+      private Path path;
+      private final Path[] alternates;
+      private final int buffersize;
+      private final Configuration conf;
+      private final int stripeLength;
+
+      ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs, Path[] alternates,
+                       Path path, int stripeLength, int buffersize)
+          throws IOException {
+        this.underLyingStream = lfs.fs.open(path, buffersize);
+        this.path = path;
+        this.nextLocation = 0;
+        this.alternates = alternates;
+        this.buffersize = buffersize;
+        this.conf = conf;
+        this.lfs = lfs;
+        this.stripeLength = stripeLength;
+      }
+      
+      @Override
+      public synchronized int available() throws IOException {
+        int value = underLyingStream.available();
+        nextLocation = 0;
+        return value;
+      }
+      
+      @Override
+      public synchronized  void close() throws IOException {
+        underLyingStream.close();
+        super.close();
+      }
+      
+      @Override
+      public void mark(int readLimit) {
+        underLyingStream.mark(readLimit);
+        nextLocation = 0;
+      }
+      
+      @Override
+      public void reset() throws IOException {
+        underLyingStream.reset();
+        nextLocation = 0;
+      }
+      
+      @Override
+      public synchronized int read() throws IOException {
+        long pos = underLyingStream.getPos();
+        while (true) {
+          try {
+            int value = underLyingStream.read();
+            nextLocation = 0;
+            return value;
+          } catch (BlockMissingException e) {
+            setAlternateLocations(e, pos);
+          } catch (ChecksumException e) {
+            setAlternateLocations(e, pos);
+          }
+        }
+      }
+      
+      @Override
+      public synchronized int read(byte[] b) throws IOException {
+        long pos = underLyingStream.getPos();
+        while (true) {
+          try{
+            int value = underLyingStream.read(b);
+            nextLocation = 0;
+            return value;
+          } catch (BlockMissingException e) {
+            setAlternateLocations(e, pos);
+          } catch (ChecksumException e) {
+            setAlternateLocations(e, pos);
+          }
+        }
+      }
+
+      @Override
+      public synchronized int read(byte[] b, int offset, int len) 
+        throws IOException {
+        long pos = underLyingStream.getPos();
+        while (true) {
+          try{
+            int value = underLyingStream.read(b, offset, len);
+            nextLocation = 0;
+            return value;
+          } catch (BlockMissingException e) {
+            setAlternateLocations(e, pos);
+          } catch (ChecksumException e) {
+            setAlternateLocations(e, pos);
+          }
+        }
+      }
+      
+      @Override
+      public synchronized int read(long position, byte[] b, int offset, int len) 
+        throws IOException {
+        long pos = underLyingStream.getPos();
+        while (true) {
+          try {
+            int value = underLyingStream.read(position, b, offset, len);
+            nextLocation = 0;
+            return value;
+          } catch (BlockMissingException e) {
+            setAlternateLocations(e, pos);
+          } catch (ChecksumException e) {
+            setAlternateLocations(e, pos);
+          }
+        }
+      }
+      
+      @Override
+      public synchronized long skip(long n) throws IOException {
+        long value = underLyingStream.skip(n);
+        nextLocation = 0;
+        return value;
+      }
+      
+      @Override
+      public synchronized long getPos() throws IOException {
+        long value = underLyingStream.getPos();
+        nextLocation = 0;
+        return value;
+      }
+      
+      @Override
+      public synchronized void seek(long pos) throws IOException {
+        underLyingStream.seek(pos);
+        nextLocation = 0;
+      }
+
+      @Override
+      public boolean seekToNewSource(long targetPos) throws IOException {
+        boolean value = underLyingStream.seekToNewSource(targetPos);
+        nextLocation = 0;
+        return value;
+      }
+      
+      /**
+       * position readable again.
+       */
+      @Override
+      public void readFully(long pos, byte[] b, int offset, int length) 
+        throws IOException {
+        long post = underLyingStream.getPos();
+        while (true) {
+          try {
+            underLyingStream.readFully(pos, b, offset, length);
+            nextLocation = 0;
+          } catch (BlockMissingException e) {
+            setAlternateLocations(e, post);
+          } catch (ChecksumException e) {
+            setAlternateLocations(e, pos);
+          }
+        }
+      }
+      
+      @Override
+      public void readFully(long pos, byte[] b) throws IOException {
+        long post = underLyingStream.getPos();
+        while (true) {
+          try {
+            underLyingStream.readFully(pos, b);
+            nextLocation = 0;
+          } catch (BlockMissingException e) {
+            setAlternateLocations(e, post);
+          } catch (ChecksumException e) {
+            setAlternateLocations(e, pos);
+          }
+        }
+      }
+
+      /**
+       * Extract good file from RAID
+       * @param curpos curexp the current exception
+       * @param curpos the position of the current operation to be retried
+       * @throws IOException if all alternate locations are exhausted
+       */
+      private void setAlternateLocations(IOException curexp, long curpos) 
+        throws IOException {
+        while (alternates != null && nextLocation < alternates.length) {
+          try {
+            int idx = nextLocation++;
+            long corruptOffset = -1;
+            if (curexp instanceof BlockMissingException) {
+              corruptOffset = ((BlockMissingException)curexp).getOffset();
+            } else if (curexp instanceof ChecksumException) {
+              corruptOffset = ((ChecksumException)curexp).getPos();
+            }
+            Path npath = RaidNode.unRaid(conf, path, alternates[idx], stripeLength, 
+                                         corruptOffset);
+            FileSystem fs1 = getUnderlyingFileSystem(conf);
+            fs1.initialize(npath.toUri(), conf);
+            LOG.info("Opening alternate path " + npath + " at offset " + curpos);
+            FSDataInputStream fd = fs1.open(npath, buffersize);
+            fd.seek(curpos);
+            underLyingStream.close();
+            underLyingStream = fd;
+            lfs.fs = fs1;
+            path = npath;
+            return;
+          } catch (Exception e) {
+            LOG.info("Error in using alternate path " + path + ". " + e +
+                     " Ignoring...");
+          }
+        }
+        throw curexp;
+      }
+
+      /**
+       * The name of the file system that is immediately below the
+       * DistributedRaidFileSystem. This is specified by the
+       * configuration parameter called fs.raid.underlyingfs.impl.
+       * If this parameter is not specified in the configuration, then
+       * the default class DistributedFileSystem is returned.
+       * @param conf the configuration object
+       * @return the filesystem object immediately below DistributedRaidFileSystem
+       * @throws IOException if all alternate locations are exhausted
+       */
+      private FileSystem getUnderlyingFileSystem(Configuration conf) {
+        Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl", DistributedFileSystem.class);
+        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+        return fs;
+      }
+    }
+  
+    /**
+     * constructor for ext input stream.
+     * @param fs the underlying filesystem
+     * @param p the path in the underlying file system
+     * @param buffersize the size of IO
+     * @throws IOException
+     */
+    public ExtFSDataInputStream(Configuration conf, DistributedRaidFileSystem lfs,
+      Path[] alternates, Path  p, int stripeLength, int buffersize) throws IOException {
+        super(new ExtFsInputStream(conf, lfs, alternates, p, stripeLength, buffersize));
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.protocol.PolicyList;
+
+/**
+ * Maintains the configuration xml file that is read into memory.
+ */
+class ConfigManager {
+  public static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.raid.ConfigManager");
+
+  /** Time to wait between checks of the config file */
+  public static final long RELOAD_INTERVAL = 10 * 1000;
+
+  /** Time to wait between successive runs of all policies */
+  public static final long RESCAN_INTERVAL = 3600 * 1000;
+  
+  /**
+   * Time to wait after the config file has been modified before reloading it
+   * (this is done to prevent loading a file that hasn't been fully written).
+   */
+  public static final long RELOAD_WAIT = 5 * 1000; 
+  
+  private Configuration conf;    // Hadoop configuration
+  private String configFileName; // Path to config XML file
+  
+  private long lastReloadAttempt; // Last time we tried to reload the config file
+  private long lastSuccessfulReload; // Last time we successfully reloaded config
+  private boolean lastReloadAttemptFailed = false;
+  private long reloadInterval = RELOAD_INTERVAL;
+  private long periodicity; // time between runs of all policies
+
+  // Reload the configuration
+  private boolean doReload;
+  private Thread reloadThread;
+  private volatile boolean running = false;
+
+  // Collection of all configured policies.
+  Collection<PolicyList> allPolicies = new ArrayList<PolicyList>();
+
+  public ConfigManager(Configuration conf) throws IOException, SAXException,
+      RaidConfigurationException, ClassNotFoundException, ParserConfigurationException {
+    this.conf = conf;
+    this.configFileName = conf.get("raid.config.file");
+    this.doReload = conf.getBoolean("raid.config.reload", true);
+    this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
+    this.periodicity = conf.getLong("raid.policy.rescan.interval",  RESCAN_INTERVAL);
+    if (configFileName == null) {
+      String msg = "No raid.config.file given in conf - " +
+                   "the Hadoop Raid utility cannot run. Aborting....";
+      LOG.warn(msg);
+      throw new IOException(msg);
+    }
+    reloadConfigs();
+    lastSuccessfulReload = RaidNode.now();
+    lastReloadAttempt = RaidNode.now();
+    running = true;
+  }
+  
+  /**
+   * Reload config file if it hasn't been loaded in a while
+   * Returns true if the file was reloaded.
+   */
+  public synchronized boolean reloadConfigsIfNecessary() {
+    long time = RaidNode.now();
+    if (time > lastReloadAttempt + reloadInterval) {
+      lastReloadAttempt = time;
+      try {
+        File file = new File(configFileName);
+        long lastModified = file.lastModified();
+        if (lastModified > lastSuccessfulReload &&
+            time > lastModified + RELOAD_WAIT) {
+          reloadConfigs();
+          lastSuccessfulReload = time;
+          lastReloadAttemptFailed = false;
+          return true;
+        }
+      } catch (Exception e) {
+        if (!lastReloadAttemptFailed) {
+          LOG.error("Failed to reload config file - " +
+              "will use existing configuration.", e);
+        }
+        lastReloadAttemptFailed = true;
+      }
+    }
+    return false;
+  }
+  
+  /**
+   * Updates the in-memory data structures from the config file. This file is
+   * expected to be in the following whitespace-separated format:
+   * 
+   <configuration>
+    <srcPath prefix="hdfs://hadoop.myhost.com:9000/user/warehouse/u_full/*">
+      <destPath> hdfs://dfsarch.data.facebook.com:9000/archive/</destPath>
+      <policy name = RaidScanWeekly>
+        <property>
+          <name>targetReplication</name>
+          <value>2</value>
+          <description> after RAIDing, decrease the replication factor of the file to 
+                        this value.
+          </description>
+        </property>
+        <property>
+          <name>metaReplication</name>
+          <value>2</value>
+          <description> the replication factor of the RAID meta file
+          </description>
+        </property>
+        <property>
+          <name>stripeLength</name>
+          <value>10</value>
+          <description> the number of blocks to RAID together
+          </description>
+        </property>
+      </policy>
+    </srcPath>
+   </configuration>
+   *
+   * Blank lines and lines starting with # are ignored.
+   *  
+   * @throws IOException if the config file cannot be read.
+   * @throws RaidConfigurationException if configuration entries are invalid.
+   * @throws ClassNotFoundException if user-defined policy classes cannot be loaded
+   * @throws ParserConfigurationException if XML parser is misconfigured.
+   * @throws SAXException if config file is malformed.
+   * @returns A new set of policy categories.
+   */
+  void reloadConfigs() throws IOException, ParserConfigurationException, 
+      SAXException, ClassNotFoundException, RaidConfigurationException {
+
+    if (configFileName == null) {
+       return;
+    }
+    
+    File file = new File(configFileName);
+    if (!file.exists()) {
+      throw new RaidConfigurationException("Configuration file " + configFileName +
+                                           " does not exist.");
+    }
+
+    // Create some temporary hashmaps to hold the new allocs, and we only save
+    // them in our fields if we have parsed the entire allocs file successfully.
+    List<PolicyList> all = new ArrayList<PolicyList>();
+    long periodicityValue = periodicity;
+    
+    
+    // Read and parse the configuration file.
+    // allow include files in configuration file
+    DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    docBuilderFactory.setNamespaceAware(true);
+    try {
+      docBuilderFactory.setXIncludeAware(true);
+    } catch (UnsupportedOperationException e) {
+        LOG.error("Failed to set setXIncludeAware(true) for raid parser "
+                + docBuilderFactory + ":" + e, e);
+    }
+    LOG.error("Reloading config file " + file);
+
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = builder.parse(file);
+    Element root = doc.getDocumentElement();
+    if (!"configuration".equalsIgnoreCase(root.getTagName()))
+      throw new RaidConfigurationException("Bad configuration file: " + 
+          "top-level element not <configuration>");
+    NodeList elements = root.getChildNodes();
+
+    // loop through all the configured source paths.
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (!(node instanceof Element)) {
+        continue;
+      }
+      Element element = (Element)node;
+      String elementTagName = element.getTagName();
+      if ("srcPath".equalsIgnoreCase(elementTagName)) {
+        String srcPathPrefix = element.getAttribute("prefix");
+
+        if (srcPathPrefix == null || srcPathPrefix.length() == 0) {
+          throw new RaidConfigurationException("Bad configuration file: " + 
+             "srcPathPrefix not set.");
+        }
+        PolicyList policyList = new PolicyList();
+        all.add(policyList);
+
+        policyList.setSrcPath(conf, srcPathPrefix);
+        
+        // loop through all the policies for this source path
+        NodeList policies = element.getChildNodes();
+        for (int j = 0; j < policies.getLength(); j++) {
+          Node node1 = policies.item(j);
+          if (!(node1 instanceof Element)) {
+            continue;
+          }
+          Element policy = (Element)node1;
+          if (!"policy".equalsIgnoreCase(policy.getTagName())) {
+            throw new RaidConfigurationException("Bad configuration file: " + 
+              "Expecting <policy> for srcPath " + srcPathPrefix);
+          }
+          String policyName = policy.getAttribute("name");
+          PolicyInfo pinfo = new PolicyInfo(policyName, conf);
+          pinfo.setSrcPath(srcPathPrefix);
+          policyList.add(pinfo);
+
+          // loop through all the properties of this policy
+          NodeList properties = policy.getChildNodes();
+          for (int k = 0; k < properties.getLength(); k++) {
+            Node node2 = properties.item(k);
+            if (!(node2 instanceof Element)) {
+              continue;
+            }
+            Element property = (Element)node2;
+            String propertyName = property.getTagName();
+            if ("destPath".equalsIgnoreCase(propertyName)) {
+              String text = ((Text)property.getFirstChild()).getData().trim();
+              LOG.info(policyName + ".destPath = " + text);
+              pinfo.setDestinationPath(text);
+            } else if ("description".equalsIgnoreCase(propertyName)) {
+              String text = ((Text)property.getFirstChild()).getData().trim();
+              pinfo.setDescription(text);
+            } else if ("property".equalsIgnoreCase(propertyName)) {
+              NodeList nl = property.getChildNodes();
+              String pname=null,pvalue=null;
+              for (int l = 0; l < nl.getLength(); l++){
+                Node node3 = nl.item(l);
+                if (!(node3 instanceof Element)) {
+                  continue;
+                }
+                Element item = (Element) node3;
+                String itemName = item.getTagName();
+                if ("name".equalsIgnoreCase(itemName)){
+                  pname = ((Text)item.getFirstChild()).getData().trim();
+                } else if ("value".equalsIgnoreCase(itemName)){
+                  pvalue = ((Text)item.getFirstChild()).getData().trim();
+                }
+              }
+              if (pname != null && pvalue != null) {
+                LOG.info(policyName + "." + pname + " = " + pvalue);
+                pinfo.setProperty(pname,pvalue);
+              }
+            } else {
+              LOG.info("Found bad property " + propertyName +
+                       " for srcPath" + srcPathPrefix +
+                       " policy name " + policyName +
+                       ". Ignoring."); 
+            }
+          }  // done with all properties of this policy
+        }    // done with all policies for this srcpath
+      } 
+    }        // done with all srcPaths
+    setAllPolicies(all);
+    periodicity = periodicityValue;
+    return;
+  }
+
+
+  public synchronized long getPeriodicity() {
+    return periodicity;
+  }
+  
+  /**
+   * Get a collection of all policies
+   */
+  public synchronized Collection<PolicyList> getAllPolicies() {
+    return new ArrayList(allPolicies);
+  }
+  
+  /**
+   * Set a collection of all policies
+   */
+  protected synchronized void setAllPolicies(Collection<PolicyList> value) {
+    this.allPolicies = value;
+  }
+
+  /**
+   * Start a background thread to reload the config file
+   */
+  void startReload() {
+    if (doReload) {
+      reloadThread = new UpdateThread();
+      reloadThread.start();
+    }
+  }
+
+  /**
+   * Stop the background thread that reload the config file
+   */
+  void stopReload() throws InterruptedException {
+    if (reloadThread != null) {
+      running = false;
+      reloadThread.interrupt();
+      reloadThread.join();
+      reloadThread = null;
+    }
+  }
+
+  /**
+   * A thread which reloads the config file.
+   */
+  private class UpdateThread extends Thread {
+    private UpdateThread() {
+      super("Raid update thread");
+    }
+
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(reloadInterval);
+          reloadConfigsIfNecessary();
+        } catch (InterruptedException e) {
+          // do nothing 
+        } catch (Exception e) {
+          LOG.error("Failed to reload config file ", e);
+        }
+      }
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,328 @@
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.raid.RaidNode.Statistics;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistRaid {
+
+  protected static final Log LOG = LogFactory.getLog(DistRaid.class);
+
+  static final String NAME = "distRaid";
+  static final String JOB_DIR_LABEL = NAME + ".job.dir";
+  static final String OP_LIST_LABEL = NAME + ".op.list";
+  static final String OP_COUNT_LABEL = NAME + ".op.count";
+
+  private static final long OP_PER_MAP = 100;
+  private static final int MAX_MAPS_PER_NODE = 20;
+  private static final int SYNC_FILE_MAX = 10;
+  private static final SimpleDateFormat dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+  private static String jobName = NAME;
+
+  static enum Counter {
+    FILES_SUCCEEDED, FILES_FAILED, PROCESSED_BLOCKS, PROCESSED_SIZE, META_BLOCKS, META_SIZE
+  }
+
+  protected JobConf jobconf;
+
+  /** {@inheritDoc} */
+  public void setConf(Configuration conf) {
+    if (jobconf != conf) {
+      jobconf = conf instanceof JobConf ? (JobConf) conf : new JobConf(conf);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public JobConf getConf() {
+    return jobconf;
+  }
+
+  public DistRaid(Configuration conf) {
+    setConf(createJobConf(conf));
+  }
+
+  private static final Random RANDOM = new Random();
+
+  protected static String getRandomId() {
+    return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
+  }
+
+  /**
+   * 
+   * helper class which holds the policy and paths
+   * 
+   */
+  public static class RaidPolicyPathPair {
+    public PolicyInfo policy;
+    public List<FileStatus> srcPaths;
+
+    RaidPolicyPathPair(PolicyInfo policy, List<FileStatus> srcPaths) {
+      this.policy = policy;
+      this.srcPaths = srcPaths;
+    }
+  }
+
+  List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
+
+  /** Responsible for generating splits of the src file list. */
+  static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
+    /** Do nothing. */
+    public void validateInput(JobConf job) {
+    }
+
+    /**
+     * Produce splits such that each is no greater than the quotient of the
+     * total size and the number of splits requested.
+     * 
+     * @param job
+     *          The handle to the JobConf object
+     * @param numSplits
+     *          Number of splits requested
+     */
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
+      final int targetcount = srcCount / numSplits;
+      String srclist = job.get(OP_LIST_LABEL, "");
+      if (srcCount < 0 || "".equals(srclist)) {
+        throw new RuntimeException("Invalid metadata: #files(" + srcCount
+            + ") listuri(" + srclist + ")");
+      }
+      Path srcs = new Path(srclist);
+      FileSystem fs = srcs.getFileSystem(job);
+
+      List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+
+      Text key = new Text();
+      PolicyInfo value = new PolicyInfo();
+      SequenceFile.Reader in = null;
+      long prev = 0L;
+      int count = 0; // count src
+      try {
+        for (in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value);) {
+          long curr = in.getPosition();
+          long delta = curr - prev;
+          if (++count > targetcount) {
+            count = 0;
+            splits.add(new FileSplit(srcs, prev, delta, (String[]) null));
+            prev = curr;
+          }
+        }
+      } finally {
+        in.close();
+      }
+      long remaining = fs.getFileStatus(srcs).getLen() - prev;
+      if (remaining != 0) {
+        splits.add(new FileSplit(srcs, prev, remaining, (String[]) null));
+      }
+      LOG.info("jobname= " + jobName + " numSplits=" + numSplits + 
+               ", splits.size()=" + splits.size());
+      return splits.toArray(new FileSplit[splits.size()]);
+    }
+
+    /** {@inheritDoc} */
+    public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) throws IOException {
+      return new SequenceFileRecordReader<Text, PolicyInfo>(job,
+          (FileSplit) split);
+    }
+  }
+
+  /** The mapper for raiding files. */
+  static class DistRaidMapper implements
+      Mapper<Text, PolicyInfo, WritableComparable, Text> {
+    private JobConf jobconf;
+    private boolean ignoreFailures;
+
+    private int failcount = 0;
+    private int succeedcount = 0;
+    private Statistics st = null;
+
+    private String getCountString() {
+      return "Succeeded: " + succeedcount + " Failed: " + failcount;
+    }
+
+    /** {@inheritDoc} */
+    public void configure(JobConf job) {
+      this.jobconf = job;
+      ignoreFailures = false;
+      st = new Statistics();
+    }
+
+    /** Run a FileOperation */
+    public void map(Text key, PolicyInfo policy,
+        OutputCollector<WritableComparable, Text> out, Reporter reporter)
+        throws IOException {
+      try {
+        LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
+        Path p = new Path(key.toString());
+        FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
+        st.clear();
+        RaidNode.doRaid(jobconf, policy, fs, st, reporter);
+
+        ++succeedcount;
+
+        reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
+        reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
+        reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
+        reporter.incrCounter(Counter.META_SIZE, st.metaSize);
+
+        reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
+      } catch (IOException e) {
+        ++failcount;
+        reporter.incrCounter(Counter.FILES_FAILED, 1);
+
+        String s = "FAIL: " + policy + ", " + key + " "
+            + StringUtils.stringifyException(e);
+        out.collect(null, new Text(s));
+        LOG.info(s);
+      } finally {
+        reporter.setStatus(getCountString());
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      if (failcount == 0 || ignoreFailures) {
+        return;
+      }
+      throw new IOException(getCountString());
+    }
+  }
+
+  /**
+   * create new job conf based on configuration passed.
+   * 
+   * @param conf
+   * @return
+   */
+  private static JobConf createJobConf(Configuration conf) {
+    JobConf jobconf = new JobConf(conf, DistRaid.class);
+    jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
+    jobconf.setJobName(jobName);
+    jobconf.setMapSpeculativeExecution(false);
+
+    jobconf.setJarByClass(DistRaid.class);
+    jobconf.setInputFormat(DistRaidInputFormat.class);
+    jobconf.setOutputKeyClass(Text.class);
+    jobconf.setOutputValueClass(Text.class);
+
+    jobconf.setMapperClass(DistRaidMapper.class);
+    jobconf.setNumReduceTasks(0);
+    return jobconf;
+  }
+
+  /** Add paths to be raided */
+  public void addRaidPaths(PolicyInfo info, List<FileStatus> paths) {
+    raidPolicyPathPairList.add(new RaidPolicyPathPair(info, paths));
+  }
+
+  /** Calculate how many maps to run. */
+  private static int getMapCount(int srcCount, int numNodes) {
+    int numMaps = (int) (srcCount / OP_PER_MAP);
+    numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
+    return Math.max(numMaps, 1);
+  }
+
+  /** invokes mapred job do parallel raiding */
+  public void doDistRaid() throws IOException {
+    if (raidPolicyPathPairList.size() == 0) {
+      LOG.info("DistRaid has no paths to raid.");
+      return;
+    }
+    try {
+      if (setup()) {
+        JobClient.runJob(jobconf);
+      }
+    } finally {
+      // delete job directory
+      final String jobdir = jobconf.get(JOB_DIR_LABEL);
+      if (jobdir != null) {
+        final Path jobpath = new Path(jobdir);
+        jobpath.getFileSystem(jobconf).delete(jobpath, true);
+      }
+    }
+    raidPolicyPathPairList.clear();
+  }
+
+  /**
+   * set up input file which has the list of input files.
+   * 
+   * @return boolean
+   * @throws IOException
+   */
+  private boolean setup() throws IOException {
+    final String randomId = getRandomId();
+    JobClient jClient = new JobClient(jobconf);
+    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+
+    LOG.info(JOB_DIR_LABEL + "=" + jobdir);
+    jobconf.set(JOB_DIR_LABEL, jobdir.toString());
+    Path log = new Path(jobdir, "_logs");
+
+    FileOutputFormat.setOutputPath(jobconf, log);
+    LOG.info("log=" + log);
+
+    // create operation list
+    FileSystem fs = jobdir.getFileSystem(jobconf);
+    Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
+    jobconf.set(OP_LIST_LABEL, opList.toString());
+    int opCount = 0, synCount = 0;
+    SequenceFile.Writer opWriter = null;
+    try {
+      opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
+          PolicyInfo.class, SequenceFile.CompressionType.NONE);
+      for (RaidPolicyPathPair p : raidPolicyPathPairList) {
+        for (FileStatus st : p.srcPaths) {
+          opWriter.append(new Text(st.getPath().toString()), p.policy);
+          opCount++;
+          if (++synCount > SYNC_FILE_MAX) {
+            opWriter.sync();
+            synCount = 0;
+          }
+        }
+      }
+
+    } finally {
+      if (opWriter != null) {
+        opWriter.close();
+      }
+    }
+    raidPolicyPathPairList.clear();
+    
+    jobconf.setInt(OP_COUNT_LABEL, opCount);
+    LOG.info("Number of files=" + opCount);
+    jobconf.setNumMapTasks(getMapCount(opCount, new JobClient(jobconf)
+        .getClusterStatus().getTaskTrackers()));
+    LOG.info("jobName= " + jobName + " numMapTasks=" + jobconf.getNumMapTasks());
+    return opCount != 0;
+
+  }
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+/**
+ * Thrown when the config file for {@link CronNode} is malformed.  
+ */
+public class RaidConfigurationException extends Exception {
+  private static final long serialVersionUID = 4046516718965587999L;
+  
+  public RaidConfigurationException(String message) {
+    super(message);
+  }
+}