You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/10/06 09:31:12 UTC
svn commit: r822153 [1/3] - in /hadoop/hdfs/trunk: ./ src/contrib/
src/contrib/raid/ src/contrib/raid/conf/ src/contrib/raid/ivy/
src/contrib/raid/lib/ src/contrib/raid/src/ src/contrib/raid/src/java/
src/contrib/raid/src/java/org/ src/contrib/raid/src...
Author: dhruba
Date: Tue Oct 6 07:31:11 2009
New Revision: 822153
URL: http://svn.apache.org/viewvc?rev=822153&view=rev
Log:
HDFS-503. This patch implements an optional layer over HDFS that
implements offline erasure-coding. It can be used to reduce the
total storage requirements of HDFS. (dhruba)
Added:
hadoop/hdfs/trunk/src/contrib/raid/
hadoop/hdfs/trunk/src/contrib/raid/README
hadoop/hdfs/trunk/src/contrib/raid/build.xml
hadoop/hdfs/trunk/src/contrib/raid/conf/
hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml
hadoop/hdfs/trunk/src/contrib/raid/ivy/
hadoop/hdfs/trunk/src/contrib/raid/ivy.xml
hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties
hadoop/hdfs/trunk/src/contrib/raid/lib/
hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar (with props)
hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar (with props)
hadoop/hdfs/trunk/src/contrib/raid/src/
hadoop/hdfs/trunk/src/contrib/raid/src/java/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
hadoop/hdfs/trunk/src/contrib/raid/src/test/
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/contrib/build.xml
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=822153&r1=822152&r2=822153&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Oct 6 07:31:11 2009
@@ -6,6 +6,10 @@
NEW FEATURES
+ HDFS-503. This patch implements an optional layer over HDFS that
+ implements offline erasure-coding. It can be used to reduce the
+ total storage requirements of HDFS. (dhruba)
+
IMPROVEMENTS
OPTIMIZATIONS
Modified: hadoop/hdfs/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/build.xml?rev=822153&r1=822152&r2=822153&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/build.xml (original)
+++ hadoop/hdfs/trunk/src/contrib/build.xml Tue Oct 6 07:31:11 2009
@@ -46,6 +46,7 @@
<!-- ====================================================== -->
<target name="test">
<subant target="test">
+ <fileset dir="." includes="raid/build.xml"/>
<fileset dir="." includes="hdfsproxy/build.xml"/>
</subant>
</target>
Added: hadoop/hdfs/trunk/src/contrib/raid/README
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/README?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/README (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/README Tue Oct 6 07:31:11 2009
@@ -0,0 +1,176 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements a Distributed Raid File System. It is used alongwith
+an instance of the Hadoop Distributed File System (HDFS). It can be used to
+provide better protection against data corruption. It can also be used to
+reduce the total storage requirements of HDFS.
+
+Distributed Raid File System consists of two main software components. The first component
+is the RaidNode, a daemon that creates parity files from specified HDFS files.
+The second component "raidfs" is a software that is layered over a HDFS client and it
+intercepts all calls that an application makes to the HDFS client. If HDFS encounters
+corrupted data while reading a file, the raidfs client detects it; it uses the
+relevant parity blocks to recover the corrupted data (if possible) and returns
+the data to the application. The application is completely transparent to the
+fact that parity data was used to satisfy it's read request.
+
+The primary use of this feature is to save disk space for HDFS files.
+HDFS typically stores data in triplicate.
+The Distributed Raid File System can be configured in such a way that a set of
+data blocks of a file are combined together to form one or more parity blocks.
+This allows one to reduce the replication factor of a HDFS file from 3 to 2
+while keeping the failure probabilty relatively same as before. This typically
+results in saving 25% to 30% of storage space in a HDFS cluster.
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING and CONFIGURING:
+
+The entire code is packaged in the form of a single jar file hadoop-*-raid.jar.
+To use HDFS Raid, you need to put the above mentioned jar file on
+the CLASSPATH. The easiest way is to copy the hadoop-*-raid.jar
+from HADOOP_HOME/build/contrib/raid to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+There is a single configuration file named raid.xml that describes the HDFS
+path(s) that you want to raid. A sample of this file can be found in
+sc/contrib/raid/conf/raid.xml. Please edit the entries in this file to list the
+path(s) that you want to raid. Then, edit the hdfs-site.xml file for
+your installation to include a reference to this raid.xml. You can add the
+following to your hdfs-site.xml
+ <property>
+ <name>raid.config.file</name>
+ <value>/mnt/hdfs/DFS/conf/raid.xml</value>
+ <description>This is needed by the RaidNode </description>
+ </property>
+
+Please add an entry to your hdfs-site.xml to enable hdfs clients to use the
+parity bits to recover corrupted data.
+
+ <property>
+ <name>fs.hdfs.impl</name>
+ <value>org.apache.hadoop.dfs.DistributedRaidFileSystem</value>
+ <description>The FileSystem for hdfs: uris.</description>
+ </property>
+
+
+--------------------------------------------------------------------------------
+
+OPTIONAL CONFIGIURATION:
+
+The following properties can be set in hdfs-site.xml to further tune you configuration:
+
+ Specifies the location where parity files are located.
+ <property>
+ <name>hdfs.raid.locations</name>
+ <value>hdfs://newdfs.data:8000/raid</value>
+ <description>The location for parity files. If this is
+ is not defined, then defaults to /raid.
+ </descrition>
+ </property>
+
+ Specify the parity stripe length
+ <property>
+ <name>hdfs.raid.stripeLength</name>
+ <value>10</value>
+ <description>The number of blocks in a file to be combined into
+ a single raid parity block. The default value is 5. The lower
+ the number the greater is the disk space you will save when you
+ enable raid.
+ </description>
+ </property>
+
+ Specify RaidNode to not use a map-reduce cluster for raiding files in parallel.
+ <property>
+ <name>fs.raidnode.local</name>
+ <value>true</value>
+ <description>If you do not want to use your map-reduce cluster to
+ raid files in parallel, then specify "true". By default, this
+ value is false, which means that the RaidNode uses the default
+ map-reduce cluster to generate parity blocks.
+ </description>
+ </property>
+
+
+ Specify the periodicy at which the RaidNode re-calculates (if necessary)
+ the parity blocks
+ <property>
+ <name>raid.policy.rescan.interval</name>
+ <value>5000</value>
+ <description>Specify the periodicity in milliseconds after which
+ all source paths are rescanned and parity blocks recomputed if
+ necessary. By default, this value is 1 hour.
+ </description>
+ </property>
+
+ By default, the DistributedRaidFileSystem assumes that the underlying file
+ system is the DistributedFileSystem. If you want to layer the DistributedRaidFileSystem
+ over some other file system, then define a property named fs.raid.underlyingfs.impl
+ that specifies the name of the underlying class. For example, if you want to layer
+ The DistributedRaidFileSystem over an instance of the NewFileSystem, then
+ <property>
+ <name>fs.raid.underlyingfs.impl</name>
+ <value>org.apche.hadoop.new.NewFileSystem</value>
+ <description>Specify the filesystem that is layered immediately below the
+ DistributedRaidFileSystem. By default, this value is DistributedFileSystem.
+ </description>
+
+
+--------------------------------------------------------------------------------
+
+ADMINISTRATION:
+
+The Distributed Raid File System provides support for administration at runtime without
+any downtime to cluster services. It is possible to add/delete new paths to be raided without
+interrupting any load on the cluster. If you change raid.xml, its contents will be
+reload within seconds and the new contents will take effect immediately.
+
+Designate one machine in your cluster to run the RaidNode software. You can run this daemon
+on any machine irrespective of whether that machine is running any other hadoop daemon or not.
+You can start the RaidNode by running the following on the selected machine:
+nohup $HADOOP_HOME/bin/hadoop org.apache.hadoop.raid.RaidNode >> /xxx/logs/hadoop-root-raidnode-hadoop.xxx.com.log &
+
+Run fsckraid periodically (being developed as part of another JIRA). This valudates parity
+blocsk of a file.
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+The RaidNode periodically scans all the specified paths in the configuration
+file. For each path, it recursively scans all files that have more than 2 blocks
+and that has not been modified during the last few hours (default is 24 hours).
+It picks the specified number of blocks (as specified by the stripe size),
+from the file, generates a parity block by combining them and
+stores the results as another HDFS file in the specified destination
+directory. There is a one-to-one mapping between a HDFS
+file and its parity file. The RaidNode also periodically finds parity files
+that are orphaned and deletes them.
+
+The Distributed Raid FileSystem is layered over a DistributedFileSystem
+instance intercepts all calls that go into HDFS. HDFS throws a ChecksumException
+or a BlocMissingException when a file read encounters bad data. The layered
+Distributed Raid FileSystem catches these exceptions, locates the corresponding
+parity file, extract the original data from the parity files and feeds the
+extracted data back to the application in a completely transparent way.
+
+The layered Distributed Raid FileSystem does not fix the data-loss that it
+encounters while serving data. It merely make the application transparently
+use the parity blocks to re-create the original data. A command line tool
+"fsckraid" is currently under development that will fix the corrupted files
+by extracting the data from the associated parity files. An adminstrator
+can run "fsckraid" manually as and when needed.
Added: hadoop/hdfs/trunk/src/contrib/raid/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/build.xml?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/build.xml (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/build.xml Tue Oct 6 07:31:11 2009
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="raid" default="jar" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <import file="../build-contrib.xml"/>
+
+ <!-- the unit test classpath -->
+ <path id="contrib.classpath.raid">
+ <pathelement location="${hadoop.root}/src/contrib/raid/lib"/>
+ <path refid="contrib-classpath"/>
+ </path>
+
+ <target name="test" depends="compile,compile-test,test-junit" description="Automated Test Framework" if="test.available"/>
+
+ <target name="test-junit" depends="compile,compile-test" if="test.available">
+ <junit showoutput="${test.output}" fork="yes" printsummary="yes" errorProperty="tests.failed"
+ haltonfailure="no" failureProperty="tests.failed">
+
+ <classpath refid="test.classpath"/>
+ <sysproperty key="test.build.data" value="${build.test}/data"/>
+ <sysproperty key="build.test" value="${build.test}"/>
+ <sysproperty key="user.dir" value="${build.test}/data"/>
+ <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+ <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+ <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+ <sysproperty key="test.src.dir" value="${test.src.dir}"/>
+ <formatter type="plain" />
+ <batchtest todir="${build.test}" unless="testcase">
+ <fileset dir="${src.test}">
+ <include name="**/Test*.java"/>
+ </fileset>
+ </batchtest>
+ <batchtest todir="${build.test}" if="testcase">
+ <fileset dir="${src.test}">
+ <include name="**/${testcase}.java"/>
+ </fileset>
+ </batchtest>
+ </junit>
+ <fail if="tests.failed">Tests failed!</fail>
+ </target>
+
+</project>
+
Added: hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/conf/raid.xml Tue Oct 6 07:31:11 2009
@@ -0,0 +1,58 @@
+ <configuration>
+ <srcPath prefix="hdfs://dfs1.xxx.com:8000/user/dhruba/">
+ <policy name = "dhruba">
+ <property>
+ <name>srcReplication</name>
+ <value>3</value>
+ <description> pick files for RAID only if their replication factor is
+ greater than or equal to this value.
+ </description>
+ </property>
+ <property>
+ <name>targetReplication</name>
+ <value>2</value>
+ <description> after RAIDing, decrease the replication factor of a file to
+ this value.
+ </description>
+ </property>
+ <property>
+ <name>metaReplication</name>
+ <value>2</value>
+ <description> the replication factor of the RAID meta file
+ </description>
+ </property>
+ <property>
+ <name>modTimePeriod</name>
+ <value>3600000</value>
+ <description> time (milliseconds) after a file is modified to make it a
+ candidate for RAIDing
+ </description>
+ </property>
+ </policy>
+ </srcPath>
+ <srcPath prefix="hdfs://dfs1.xxx.com:9000/warehouse/table1">
+ <policy name = "table1">
+ <property>
+ <name>targetReplication</name>
+ <value>1</value>
+ <description> after RAIDing, decrease the replication factor of a file to
+ this value.
+ </description>
+ </property>
+ <property>
+ <name>metaReplication</name>
+ <value>2</value>
+ <description> the replication factor of the RAID meta file
+ </description>
+ </property>
+ <property>
+ <name>modTimePeriod</name>
+ <value>3600000</value>
+ <description> time (milliseconds) after a file is modified to make it a
+ candidate for RAIDing
+ </description>
+ </property>
+ </policy>
+ </srcPath>
+ </configuration>
+
Added: hadoop/hdfs/trunk/src/contrib/raid/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/ivy.xml?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/ivy.xml (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/ivy.xml Tue Oct 6 07:31:11 2009
@@ -0,0 +1,284 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<ivy-module version="1.0">
+ <info organisation="org.apache.hadoop" module="${ant.project.name}" revision="${version}">
+ <license name="Apache 2.0"/>
+ <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+ <description>
+ Hadoop Core
+ </description>
+ </info>
+ <configurations defaultconfmapping="default">
+ <!--these match the Maven configurations-->
+ <conf name="default" extends="master,runtime"/>
+ <conf name="master" description="contains the artifact but no dependencies"/>
+ <conf name="runtime" description="runtime but not the artifact"
+ extends="client,server,s3-server,kfs"/>
+
+ <conf name="mandatory" description="contains the critical dependencies"
+ extends="commons-logging,log4j"/>
+
+ <!--
+ These public configurations contain the core dependencies for running hadoop client or server.
+ The server is effectively a superset of the client.
+ -->
+ <conf name="client" description="client-side dependencies"
+ extends="mandatory,httpclient"/>
+ <conf name="server" description="server-side dependencies"
+ extends="client"/>
+ <conf name="s3-client" description="dependencies for working with S3/EC2 infrastructure"
+ extends="client"/>
+ <conf name="s3-server" description="dependencies for running on S3/EC2 infrastructure"
+ extends="s3-client,server"/>
+ <conf name="kfs" description="dependencies for KFS file system support"/>
+ <conf name="ftp" description="dependencies for workign with FTP filesytems"
+ extends="mandatory"/>
+ <conf name="jetty" description="Jetty provides the in-VM HTTP daemon" extends="commons-logging"/>
+
+ <!--Private configurations. -->
+
+ <conf name="common" visibility="private" extends="runtime,mandatory,httpclient,ftp,jetty"
+ description="common artifacts"/>
+ <conf name="javadoc" visibility="private" description="artiracts required while performing doc generation"
+ extends="common,mandatory,jetty,lucene"/>
+ <!--Testing pulls in everything-->
+ <conf name="test" extends="common,s3-server,kfs" visibility="private"
+ description="the classpath needed to run tests"/>
+
+ <conf name="test-hdfswithmr" extends="test" visibility="private"
+ description="the classpath needed to run tests"/>
+
+ <conf name="releaseaudit" visibility="private"
+ description="Artifacts required for releaseaudit target"/>
+
+ <conf name="commons-logging" visibility="private"/>
+ <conf name="httpclient" visibility="private" extends="commons-logging"/>
+ <conf name="log4j" visibility="private"/>
+ <conf name="lucene" visibility="private"/>
+ <conf name="jdiff" visibility="private" extends="log4j,s3-client,jetty,server"/>
+ <conf name="checkstyle" visibility="private"/>
+
+ </configurations>
+
+ <publications>
+ <!--get the artifact from our module name-->
+ <artifact conf="master"/>
+ </publications>
+ <dependencies>
+
+ <!--used client side-->
+ <dependency org="commons-cli"
+ name="commons-cli"
+ rev="${commons-cli.version}"
+ conf="client->default"/>
+ <dependency org="checkstyle"
+ name="checkstyle"
+ rev="${checkstyle.version}"
+ conf="checkstyle->default"/>
+ <dependency org="jdiff"
+ name="jdiff"
+ rev="${jdiff.version}"
+ conf="jdiff->default"/>
+ <dependency org="xerces"
+ name="xerces"
+ rev="${xerces.version}"
+ conf="jdiff->default">
+ </dependency>
+
+ <dependency org="xmlenc"
+ name="xmlenc"
+ rev="${xmlenc.version}"
+ conf="server->default"/>
+
+ <!--Configuration: httpclient-->
+
+ <!--
+ commons-httpclient asks for too many files.
+ All it needs is commons-codec and commons-logging JARs
+ -->
+ <dependency org="commons-httpclient"
+ name="commons-httpclient"
+ rev="${commons-httpclient.version}"
+ conf="httpclient->master">
+ </dependency>
+
+ <dependency org="commons-codec"
+ name="commons-codec"
+ rev="${commons-codec.version}"
+ conf="httpclient->default"/>
+
+ <dependency org="commons-net"
+ name="commons-net"
+ rev="${commons-net.version}"
+ conf="ftp->default"/>
+
+ <!--Configuration: Jetty -->
+
+<!-- <dependency org="javax.servlet"
+ name="servlet-api"
+ rev="${servlet-api.version}"
+ conf="jetty->master"/> -->
+ <dependency org="org.mortbay.jetty"
+ name="jetty"
+ rev="${jetty.version}"
+ conf="jetty->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty-util"
+ rev="${jetty-util.version}"
+ conf="jetty->master"/>
+
+ <dependency org="tomcat"
+ name="jasper-runtime"
+ rev="${jasper.version}"
+ conf="jetty->master"/>
+ <dependency org="tomcat"
+ name="jasper-compiler"
+ rev="${jasper.version}"
+ conf="jetty->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-api-2.1"
+ rev="${jetty.version}"
+ conf="jetty->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-2.1"
+ rev="${jetty.version}"
+ conf="jetty->master"/>
+ <dependency org="commons-el"
+ name="commons-el"
+ rev="${commons-el.version}"
+ conf="jetty->master"/>
+
+
+ <!--Configuration: commons-logging -->
+
+ <!--it is essential that only the master JAR of commons logging
+ is pulled in, as its dependencies are usually a mess, including things
+ like out of date servlet APIs, bits of Avalon, etc.
+ -->
+ <dependency org="commons-logging"
+ name="commons-logging"
+ rev="${commons-logging.version}"
+ conf="commons-logging->master"/>
+
+
+ <!--Configuration: commons-logging -->
+
+ <!--log4J is not optional until commons-logging.properties is stripped out of the JAR -->
+ <dependency org="log4j"
+ name="log4j"
+ rev="${log4j.version}"
+ conf="log4j->master"/>
+
+ <!--Configuration: s3-client -->
+ <!--there are two jets3t projects in the repository; this one goes up to 0.6 and
+ is assumed to be the live one-->
+ <dependency org="net.java.dev.jets3t"
+ name="jets3t"
+ rev="${jets3t.version}"
+ conf="s3-client->master"/>
+ <dependency org="commons-net"
+ name="commons-net"
+ rev="${commons-net.version}"
+ conf="s3-client->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="servlet-api-2.5"
+ rev="${servlet-api-2.5.version}"
+ conf="s3-client->master"/>
+ <dependency org="net.sf.kosmosfs"
+ name="kfs"
+ rev="${kfs.version}"
+ conf="kfs->default"/>
+
+ <!--Configuration: test -->
+ <!--artifacts needed for testing -->
+
+ <dependency org="org.apache.ftpserver"
+ name="ftplet-api"
+ rev="${ftplet-api.version}"
+ conf="test->default"/>
+ <dependency org="org.apache.mina"
+ name="mina-core"
+ rev="${mina-core.version}"
+ conf="test->default"/>
+ <dependency org="org.apache.ftpserver"
+ name="ftpserver-core"
+ rev="${ftpserver-core.version}"
+ conf="test->default"/>
+
+ <dependency org="junit"
+ name="junit"
+ rev="${junit.version}"
+ conf="common->default"/>
+ <dependency org="org.apache.rat"
+ name="apache-rat-tasks"
+ rev="${rats-lib.version}"
+ conf="releaseaudit->default"/>
+ <dependency org="commons-lang"
+ name="commons-lang"
+ rev="${commons-lang.version}"
+ conf="releaseaudit->default"/>
+ <dependency org="commons-collections"
+ name="commons-collections"
+ rev="${commons-collections.version}"
+ conf="releaseaudit->default"/>
+ <dependency org="hsqldb"
+ name="hsqldb"
+ rev="${hsqldb.version}"
+ conf="common->default"/>
+ <dependency org="org.apache.lucene"
+ name="lucene-core"
+ rev="${lucene-core.version}"
+ conf="javadoc->default"/>
+ <dependency org="commons-logging"
+ name="commons-logging-api"
+ rev="${commons-logging-api.version}"
+ conf="common->default"/>
+ <dependency org="org.slf4j"
+ name="slf4j-api"
+ rev="${slf4j-api.version}"
+ conf="common->master"/>
+ <dependency org="org.apache.hadoop"
+ name="avro"
+ rev="1.0.0"
+ conf="common->default"/>
+ <dependency org="org.eclipse.jdt"
+ name="core"
+ rev="${core.version}"
+ conf="common->master"/>
+ <dependency org="oro"
+ name="oro"
+ rev="${oro.version}"
+ conf="common->default"/>
+ <dependency org="org.slf4j"
+ name="slf4j-log4j12"
+ rev="${slf4j-log4j12.version}"
+ conf="common->master">
+ </dependency>
+ <dependency org="org.aspectj"
+ name="aspectjrt"
+ rev="${aspectj.version}"
+ conf="common->default">
+ </dependency>
+ <dependency org="org.aspectj"
+ name="aspectjtools"
+ rev="${aspectj.version}"
+ conf="common->default">
+ </dependency>
+ </dependencies>
+
+</ivy-module>
Added: hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/ivy/libraries.properties Tue Oct 6 07:31:11 2009
@@ -0,0 +1,18 @@
+#This properties file lists the versions of the various artifacts used by hadoop.
+#It drives ivy and the generation of a maven POM
+#These are the versions of our dependencies (in alphabetical order)
+ivy.version=2.0.0-rc2
+
+log4j.version=1.2.15
+slf4j-api.version=1.4.3
+slf4j-log4j12.version=1.4.3
+jetty.version=6.1.14
+jetty-util.version=6.1.14
+servlet-api-2.5.version=6.1.14
+cactus.version=1.8.0
+commons-logging.version=1.1
+commons-logging-api.version=1.0.4
+junit.version=3.8.2
+jsp.version=2.1
+core.version=3.1.1
+xmlenc.version=0.52
\ No newline at end of file
Added: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar?rev=822153&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-0.22.0-dev.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar?rev=822153&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/hdfs/trunk/src/contrib/raid/lib/hadoop-mapred-test-0.22.0-dev.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.DataInput;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.hdfs.BlockMissingException;
+
+/**
+ * This is an implementation of the Hadoop RAID Filesystem. This FileSystem
+ * wraps an instance of the DistributedFileSystem.
+ * If a file is corrupted, this FileSystem uses the parity blocks to
+ * regenerate the bad block.
+ */
+
+public class DistributedRaidFileSystem extends FilterFileSystem {
+
+ // these are alternate locations that can be used for read-only access
+ Path[] alternates;
+ Configuration conf;
+ int stripeLength;
+
+ DistributedRaidFileSystem() throws IOException {
+ }
+
+ DistributedRaidFileSystem(FileSystem fs) throws IOException {
+ super(fs);
+ alternates = null;
+ stripeLength = 0;
+ }
+
+ /* Initialize a Raid FileSystem
+ */
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ this.conf = conf;
+
+ String alt = conf.get("hdfs.raid.locations");
+
+ // If no alternates are specified, then behave absolutely same as
+ // the original file system.
+ if (alt == null || alt.length() == 0) {
+ LOG.info("hdfs.raid.locations not defined. Using defaults...");
+ alt = RaidNode.DEFAULT_RAID_LOCATION;
+ }
+
+ // fs.alternate.filesystem.prefix can be of the form:
+ // "hdfs://host:port/myPrefixPath, file:///localPrefix,hftp://host1:port1/"
+ String[] strs = alt.split(",");
+ if (strs == null || strs.length == 0) {
+ LOG.info("hdfs.raid.locations badly defined. Ignoring...");
+ return;
+ }
+
+ // find stripe length configured
+ stripeLength = conf.getInt("hdfs.raid.stripeLength", RaidNode.DEFAULT_STRIPE_LENGTH);
+ if (stripeLength == 0) {
+ LOG.info("dfs.raid.stripeLength is incorrectly defined to be " +
+ stripeLength + " Ignoring...");
+ return;
+ }
+
+ // create a reference to all underlying alternate path prefix
+ alternates = new Path[strs.length];
+ for (int i = 0; i < strs.length; i++) {
+ alternates[i] = new Path(strs[i].trim());
+ alternates[i] = alternates[i].makeQualified(fs);
+ }
+ }
+
+ /*
+ * Returns the underlying filesystem
+ */
+ public FileSystem getFileSystem() throws IOException {
+ return fs;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ ExtFSDataInputStream fd = new ExtFSDataInputStream(conf, this, alternates, f,
+ stripeLength, bufferSize);
+ return fd;
+ }
+
+ public void close() throws IOException {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch(IOException ie) {
+ //this might already be closed, ignore
+ }
+ }
+ }
+
+ /**
+ * Layered filesystem input stream. This input stream tries reading
+ * from alternate locations if it encoumters read errors in the primary location.
+ */
+ private static class ExtFSDataInputStream extends FSDataInputStream {
+ /**
+ * Create an input stream that wraps all the reads/positions/seeking.
+ */
+ private static class ExtFsInputStream extends FSInputStream {
+
+ //The underlying data input stream that the
+ // underlying filesystem will return.
+ private FSDataInputStream underLyingStream;
+ private byte[] oneBytebuff = new byte[1];
+ private int nextLocation;
+ private DistributedRaidFileSystem lfs;
+ private Path path;
+ private final Path[] alternates;
+ private final int buffersize;
+ private final Configuration conf;
+ private final int stripeLength;
+
+ ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs, Path[] alternates,
+ Path path, int stripeLength, int buffersize)
+ throws IOException {
+ this.underLyingStream = lfs.fs.open(path, buffersize);
+ this.path = path;
+ this.nextLocation = 0;
+ this.alternates = alternates;
+ this.buffersize = buffersize;
+ this.conf = conf;
+ this.lfs = lfs;
+ this.stripeLength = stripeLength;
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ int value = underLyingStream.available();
+ nextLocation = 0;
+ return value;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ underLyingStream.close();
+ super.close();
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ underLyingStream.mark(readLimit);
+ nextLocation = 0;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ underLyingStream.reset();
+ nextLocation = 0;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ long pos = underLyingStream.getPos();
+ while (true) {
+ try {
+ int value = underLyingStream.read();
+ nextLocation = 0;
+ return value;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, pos);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, pos);
+ }
+ }
+ }
+
+ @Override
+ public synchronized int read(byte[] b) throws IOException {
+ long pos = underLyingStream.getPos();
+ while (true) {
+ try{
+ int value = underLyingStream.read(b);
+ nextLocation = 0;
+ return value;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, pos);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, pos);
+ }
+ }
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int offset, int len)
+ throws IOException {
+ long pos = underLyingStream.getPos();
+ while (true) {
+ try{
+ int value = underLyingStream.read(b, offset, len);
+ nextLocation = 0;
+ return value;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, pos);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, pos);
+ }
+ }
+ }
+
+ @Override
+ public synchronized int read(long position, byte[] b, int offset, int len)
+ throws IOException {
+ long pos = underLyingStream.getPos();
+ while (true) {
+ try {
+ int value = underLyingStream.read(position, b, offset, len);
+ nextLocation = 0;
+ return value;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, pos);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, pos);
+ }
+ }
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ long value = underLyingStream.skip(n);
+ nextLocation = 0;
+ return value;
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ long value = underLyingStream.getPos();
+ nextLocation = 0;
+ return value;
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ underLyingStream.seek(pos);
+ nextLocation = 0;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ boolean value = underLyingStream.seekToNewSource(targetPos);
+ nextLocation = 0;
+ return value;
+ }
+
+ /**
+ * position readable again.
+ */
+ @Override
+ public void readFully(long pos, byte[] b, int offset, int length)
+ throws IOException {
+ long post = underLyingStream.getPos();
+ while (true) {
+ try {
+ underLyingStream.readFully(pos, b, offset, length);
+ nextLocation = 0;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, post);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, pos);
+ }
+ }
+ }
+
+ @Override
+ public void readFully(long pos, byte[] b) throws IOException {
+ long post = underLyingStream.getPos();
+ while (true) {
+ try {
+ underLyingStream.readFully(pos, b);
+ nextLocation = 0;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, post);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, pos);
+ }
+ }
+ }
+
+ /**
+ * Extract good file from RAID
+ * @param curpos curexp the current exception
+ * @param curpos the position of the current operation to be retried
+ * @throws IOException if all alternate locations are exhausted
+ */
+ private void setAlternateLocations(IOException curexp, long curpos)
+ throws IOException {
+ while (alternates != null && nextLocation < alternates.length) {
+ try {
+ int idx = nextLocation++;
+ long corruptOffset = -1;
+ if (curexp instanceof BlockMissingException) {
+ corruptOffset = ((BlockMissingException)curexp).getOffset();
+ } else if (curexp instanceof ChecksumException) {
+ corruptOffset = ((ChecksumException)curexp).getPos();
+ }
+ Path npath = RaidNode.unRaid(conf, path, alternates[idx], stripeLength,
+ corruptOffset);
+ FileSystem fs1 = getUnderlyingFileSystem(conf);
+ fs1.initialize(npath.toUri(), conf);
+ LOG.info("Opening alternate path " + npath + " at offset " + curpos);
+ FSDataInputStream fd = fs1.open(npath, buffersize);
+ fd.seek(curpos);
+ underLyingStream.close();
+ underLyingStream = fd;
+ lfs.fs = fs1;
+ path = npath;
+ return;
+ } catch (Exception e) {
+ LOG.info("Error in using alternate path " + path + ". " + e +
+ " Ignoring...");
+ }
+ }
+ throw curexp;
+ }
+
+ /**
+ * The name of the file system that is immediately below the
+ * DistributedRaidFileSystem. This is specified by the
+ * configuration parameter called fs.raid.underlyingfs.impl.
+ * If this parameter is not specified in the configuration, then
+ * the default class DistributedFileSystem is returned.
+ * @param conf the configuration object
+ * @return the filesystem object immediately below DistributedRaidFileSystem
+ * @throws IOException if all alternate locations are exhausted
+ */
+ private FileSystem getUnderlyingFileSystem(Configuration conf) {
+ Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl", DistributedFileSystem.class);
+ FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+ return fs;
+ }
+ }
+
+ /**
+ * constructor for ext input stream.
+ * @param fs the underlying filesystem
+ * @param p the path in the underlying file system
+ * @param buffersize the size of IO
+ * @throws IOException
+ */
+ public ExtFSDataInputStream(Configuration conf, DistributedRaidFileSystem lfs,
+ Path[] alternates, Path p, int stripeLength, int buffersize) throws IOException {
+ super(new ExtFsInputStream(conf, lfs, alternates, p, stripeLength, buffersize));
+ }
+ }
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.protocol.PolicyList;
+
+/**
+ * Maintains the configuration xml file that is read into memory.
+ */
+class ConfigManager {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.ConfigManager");
+
+ /** Time to wait between checks of the config file */
+ public static final long RELOAD_INTERVAL = 10 * 1000;
+
+ /** Time to wait between successive runs of all policies */
+ public static final long RESCAN_INTERVAL = 3600 * 1000;
+
+ /**
+ * Time to wait after the config file has been modified before reloading it
+ * (this is done to prevent loading a file that hasn't been fully written).
+ */
+ public static final long RELOAD_WAIT = 5 * 1000;
+
+ private Configuration conf; // Hadoop configuration
+ private String configFileName; // Path to config XML file
+
+ private long lastReloadAttempt; // Last time we tried to reload the config file
+ private long lastSuccessfulReload; // Last time we successfully reloaded config
+ private boolean lastReloadAttemptFailed = false;
+ private long reloadInterval = RELOAD_INTERVAL;
+ private long periodicity; // time between runs of all policies
+
+ // Reload the configuration
+ private boolean doReload;
+ private Thread reloadThread;
+ private volatile boolean running = false;
+
+ // Collection of all configured policies.
+ Collection<PolicyList> allPolicies = new ArrayList<PolicyList>();
+
+ public ConfigManager(Configuration conf) throws IOException, SAXException,
+ RaidConfigurationException, ClassNotFoundException, ParserConfigurationException {
+ this.conf = conf;
+ this.configFileName = conf.get("raid.config.file");
+ this.doReload = conf.getBoolean("raid.config.reload", true);
+ this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
+ this.periodicity = conf.getLong("raid.policy.rescan.interval", RESCAN_INTERVAL);
+ if (configFileName == null) {
+ String msg = "No raid.config.file given in conf - " +
+ "the Hadoop Raid utility cannot run. Aborting....";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+ reloadConfigs();
+ lastSuccessfulReload = RaidNode.now();
+ lastReloadAttempt = RaidNode.now();
+ running = true;
+ }
+
+ /**
+ * Reload config file if it hasn't been loaded in a while
+ * Returns true if the file was reloaded.
+ */
+ public synchronized boolean reloadConfigsIfNecessary() {
+ long time = RaidNode.now();
+ if (time > lastReloadAttempt + reloadInterval) {
+ lastReloadAttempt = time;
+ try {
+ File file = new File(configFileName);
+ long lastModified = file.lastModified();
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + RELOAD_WAIT) {
+ reloadConfigs();
+ lastSuccessfulReload = time;
+ lastReloadAttemptFailed = false;
+ return true;
+ }
+ } catch (Exception e) {
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload config file - " +
+ "will use existing configuration.", e);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Updates the in-memory data structures from the config file. This file is
+ * expected to be in the following whitespace-separated format:
+ *
+ <configuration>
+ <srcPath prefix="hdfs://hadoop.myhost.com:9000/user/warehouse/u_full/*">
+ <destPath> hdfs://dfsarch.data.facebook.com:9000/archive/</destPath>
+ <policy name = RaidScanWeekly>
+ <property>
+ <name>targetReplication</name>
+ <value>2</value>
+ <description> after RAIDing, decrease the replication factor of the file to
+ this value.
+ </description>
+ </property>
+ <property>
+ <name>metaReplication</name>
+ <value>2</value>
+ <description> the replication factor of the RAID meta file
+ </description>
+ </property>
+ <property>
+ <name>stripeLength</name>
+ <value>10</value>
+ <description> the number of blocks to RAID together
+ </description>
+ </property>
+ </policy>
+ </srcPath>
+ </configuration>
+ *
+ * Blank lines and lines starting with # are ignored.
+ *
+ * @throws IOException if the config file cannot be read.
+ * @throws RaidConfigurationException if configuration entries are invalid.
+ * @throws ClassNotFoundException if user-defined policy classes cannot be loaded
+ * @throws ParserConfigurationException if XML parser is misconfigured.
+ * @throws SAXException if config file is malformed.
+ * @returns A new set of policy categories.
+ */
+ void reloadConfigs() throws IOException, ParserConfigurationException,
+ SAXException, ClassNotFoundException, RaidConfigurationException {
+
+ if (configFileName == null) {
+ return;
+ }
+
+ File file = new File(configFileName);
+ if (!file.exists()) {
+ throw new RaidConfigurationException("Configuration file " + configFileName +
+ " does not exist.");
+ }
+
+ // Create some temporary hashmaps to hold the new allocs, and we only save
+ // them in our fields if we have parsed the entire allocs file successfully.
+ List<PolicyList> all = new ArrayList<PolicyList>();
+ long periodicityValue = periodicity;
+
+
+ // Read and parse the configuration file.
+ // allow include files in configuration file
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setIgnoringComments(true);
+ docBuilderFactory.setNamespaceAware(true);
+ try {
+ docBuilderFactory.setXIncludeAware(true);
+ } catch (UnsupportedOperationException e) {
+ LOG.error("Failed to set setXIncludeAware(true) for raid parser "
+ + docBuilderFactory + ":" + e, e);
+ }
+ LOG.error("Reloading config file " + file);
+
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc = builder.parse(file);
+ Element root = doc.getDocumentElement();
+ if (!"configuration".equalsIgnoreCase(root.getTagName()))
+ throw new RaidConfigurationException("Bad configuration file: " +
+ "top-level element not <configuration>");
+ NodeList elements = root.getChildNodes();
+
+ // loop through all the configured source paths.
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (!(node instanceof Element)) {
+ continue;
+ }
+ Element element = (Element)node;
+ String elementTagName = element.getTagName();
+ if ("srcPath".equalsIgnoreCase(elementTagName)) {
+ String srcPathPrefix = element.getAttribute("prefix");
+
+ if (srcPathPrefix == null || srcPathPrefix.length() == 0) {
+ throw new RaidConfigurationException("Bad configuration file: " +
+ "srcPathPrefix not set.");
+ }
+ PolicyList policyList = new PolicyList();
+ all.add(policyList);
+
+ policyList.setSrcPath(conf, srcPathPrefix);
+
+ // loop through all the policies for this source path
+ NodeList policies = element.getChildNodes();
+ for (int j = 0; j < policies.getLength(); j++) {
+ Node node1 = policies.item(j);
+ if (!(node1 instanceof Element)) {
+ continue;
+ }
+ Element policy = (Element)node1;
+ if (!"policy".equalsIgnoreCase(policy.getTagName())) {
+ throw new RaidConfigurationException("Bad configuration file: " +
+ "Expecting <policy> for srcPath " + srcPathPrefix);
+ }
+ String policyName = policy.getAttribute("name");
+ PolicyInfo pinfo = new PolicyInfo(policyName, conf);
+ pinfo.setSrcPath(srcPathPrefix);
+ policyList.add(pinfo);
+
+ // loop through all the properties of this policy
+ NodeList properties = policy.getChildNodes();
+ for (int k = 0; k < properties.getLength(); k++) {
+ Node node2 = properties.item(k);
+ if (!(node2 instanceof Element)) {
+ continue;
+ }
+ Element property = (Element)node2;
+ String propertyName = property.getTagName();
+ if ("destPath".equalsIgnoreCase(propertyName)) {
+ String text = ((Text)property.getFirstChild()).getData().trim();
+ LOG.info(policyName + ".destPath = " + text);
+ pinfo.setDestinationPath(text);
+ } else if ("description".equalsIgnoreCase(propertyName)) {
+ String text = ((Text)property.getFirstChild()).getData().trim();
+ pinfo.setDescription(text);
+ } else if ("property".equalsIgnoreCase(propertyName)) {
+ NodeList nl = property.getChildNodes();
+ String pname=null,pvalue=null;
+ for (int l = 0; l < nl.getLength(); l++){
+ Node node3 = nl.item(l);
+ if (!(node3 instanceof Element)) {
+ continue;
+ }
+ Element item = (Element) node3;
+ String itemName = item.getTagName();
+ if ("name".equalsIgnoreCase(itemName)){
+ pname = ((Text)item.getFirstChild()).getData().trim();
+ } else if ("value".equalsIgnoreCase(itemName)){
+ pvalue = ((Text)item.getFirstChild()).getData().trim();
+ }
+ }
+ if (pname != null && pvalue != null) {
+ LOG.info(policyName + "." + pname + " = " + pvalue);
+ pinfo.setProperty(pname,pvalue);
+ }
+ } else {
+ LOG.info("Found bad property " + propertyName +
+ " for srcPath" + srcPathPrefix +
+ " policy name " + policyName +
+ ". Ignoring.");
+ }
+ } // done with all properties of this policy
+ } // done with all policies for this srcpath
+ }
+ } // done with all srcPaths
+ setAllPolicies(all);
+ periodicity = periodicityValue;
+ return;
+ }
+
+
+ public synchronized long getPeriodicity() {
+ return periodicity;
+ }
+
+ /**
+ * Get a collection of all policies
+ */
+ public synchronized Collection<PolicyList> getAllPolicies() {
+ return new ArrayList(allPolicies);
+ }
+
+ /**
+ * Set a collection of all policies
+ */
+ protected synchronized void setAllPolicies(Collection<PolicyList> value) {
+ this.allPolicies = value;
+ }
+
+ /**
+ * Start a background thread to reload the config file
+ */
+ void startReload() {
+ if (doReload) {
+ reloadThread = new UpdateThread();
+ reloadThread.start();
+ }
+ }
+
+ /**
+ * Stop the background thread that reload the config file
+ */
+ void stopReload() throws InterruptedException {
+ if (reloadThread != null) {
+ running = false;
+ reloadThread.interrupt();
+ reloadThread.join();
+ reloadThread = null;
+ }
+ }
+
+ /**
+ * A thread which reloads the config file.
+ */
+ private class UpdateThread extends Thread {
+ private UpdateThread() {
+ super("Raid update thread");
+ }
+
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(reloadInterval);
+ reloadConfigsIfNecessary();
+ } catch (InterruptedException e) {
+ // do nothing
+ } catch (Exception e) {
+ LOG.error("Failed to reload config file ", e);
+ }
+ }
+ }
+ }
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,328 @@
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.raid.RaidNode.Statistics;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistRaid {
+
+ protected static final Log LOG = LogFactory.getLog(DistRaid.class);
+
+ static final String NAME = "distRaid";
+ static final String JOB_DIR_LABEL = NAME + ".job.dir";
+ static final String OP_LIST_LABEL = NAME + ".op.list";
+ static final String OP_COUNT_LABEL = NAME + ".op.count";
+
+ private static final long OP_PER_MAP = 100;
+ private static final int MAX_MAPS_PER_NODE = 20;
+ private static final int SYNC_FILE_MAX = 10;
+ private static final SimpleDateFormat dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ private static String jobName = NAME;
+
+ static enum Counter {
+ FILES_SUCCEEDED, FILES_FAILED, PROCESSED_BLOCKS, PROCESSED_SIZE, META_BLOCKS, META_SIZE
+ }
+
+ protected JobConf jobconf;
+
+ /** {@inheritDoc} */
+ public void setConf(Configuration conf) {
+ if (jobconf != conf) {
+ jobconf = conf instanceof JobConf ? (JobConf) conf : new JobConf(conf);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public JobConf getConf() {
+ return jobconf;
+ }
+
+ public DistRaid(Configuration conf) {
+ setConf(createJobConf(conf));
+ }
+
+ private static final Random RANDOM = new Random();
+
+ protected static String getRandomId() {
+ return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
+ }
+
+ /**
+ *
+ * helper class which holds the policy and paths
+ *
+ */
+ public static class RaidPolicyPathPair {
+ public PolicyInfo policy;
+ public List<FileStatus> srcPaths;
+
+ RaidPolicyPathPair(PolicyInfo policy, List<FileStatus> srcPaths) {
+ this.policy = policy;
+ this.srcPaths = srcPaths;
+ }
+ }
+
+ List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
+
+ /** Responsible for generating splits of the src file list. */
+ static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
+ /** Do nothing. */
+ public void validateInput(JobConf job) {
+ }
+
+ /**
+ * Produce splits such that each is no greater than the quotient of the
+ * total size and the number of splits requested.
+ *
+ * @param job
+ * The handle to the JobConf object
+ * @param numSplits
+ * Number of splits requested
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
+ final int targetcount = srcCount / numSplits;
+ String srclist = job.get(OP_LIST_LABEL, "");
+ if (srcCount < 0 || "".equals(srclist)) {
+ throw new RuntimeException("Invalid metadata: #files(" + srcCount
+ + ") listuri(" + srclist + ")");
+ }
+ Path srcs = new Path(srclist);
+ FileSystem fs = srcs.getFileSystem(job);
+
+ List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+
+ Text key = new Text();
+ PolicyInfo value = new PolicyInfo();
+ SequenceFile.Reader in = null;
+ long prev = 0L;
+ int count = 0; // count src
+ try {
+ for (in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value);) {
+ long curr = in.getPosition();
+ long delta = curr - prev;
+ if (++count > targetcount) {
+ count = 0;
+ splits.add(new FileSplit(srcs, prev, delta, (String[]) null));
+ prev = curr;
+ }
+ }
+ } finally {
+ in.close();
+ }
+ long remaining = fs.getFileStatus(srcs).getLen() - prev;
+ if (remaining != 0) {
+ splits.add(new FileSplit(srcs, prev, remaining, (String[]) null));
+ }
+ LOG.info("jobname= " + jobName + " numSplits=" + numSplits +
+ ", splits.size()=" + splits.size());
+ return splits.toArray(new FileSplit[splits.size()]);
+ }
+
+ /** {@inheritDoc} */
+ public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ return new SequenceFileRecordReader<Text, PolicyInfo>(job,
+ (FileSplit) split);
+ }
+ }
+
+ /** The mapper for raiding files. */
+ static class DistRaidMapper implements
+ Mapper<Text, PolicyInfo, WritableComparable, Text> {
+ private JobConf jobconf;
+ private boolean ignoreFailures;
+
+ private int failcount = 0;
+ private int succeedcount = 0;
+ private Statistics st = null;
+
+ private String getCountString() {
+ return "Succeeded: " + succeedcount + " Failed: " + failcount;
+ }
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+ this.jobconf = job;
+ ignoreFailures = false;
+ st = new Statistics();
+ }
+
+ /** Run a FileOperation */
+ public void map(Text key, PolicyInfo policy,
+ OutputCollector<WritableComparable, Text> out, Reporter reporter)
+ throws IOException {
+ try {
+ LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
+ Path p = new Path(key.toString());
+ FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
+ st.clear();
+ RaidNode.doRaid(jobconf, policy, fs, st, reporter);
+
+ ++succeedcount;
+
+ reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
+ reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
+ reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
+ reporter.incrCounter(Counter.META_SIZE, st.metaSize);
+
+ reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
+ } catch (IOException e) {
+ ++failcount;
+ reporter.incrCounter(Counter.FILES_FAILED, 1);
+
+ String s = "FAIL: " + policy + ", " + key + " "
+ + StringUtils.stringifyException(e);
+ out.collect(null, new Text(s));
+ LOG.info(s);
+ } finally {
+ reporter.setStatus(getCountString());
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ if (failcount == 0 || ignoreFailures) {
+ return;
+ }
+ throw new IOException(getCountString());
+ }
+ }
+
+ /**
+ * create new job conf based on configuration passed.
+ *
+ * @param conf
+ * @return
+ */
+ private static JobConf createJobConf(Configuration conf) {
+ JobConf jobconf = new JobConf(conf, DistRaid.class);
+ jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
+ jobconf.setJobName(jobName);
+ jobconf.setMapSpeculativeExecution(false);
+
+ jobconf.setJarByClass(DistRaid.class);
+ jobconf.setInputFormat(DistRaidInputFormat.class);
+ jobconf.setOutputKeyClass(Text.class);
+ jobconf.setOutputValueClass(Text.class);
+
+ jobconf.setMapperClass(DistRaidMapper.class);
+ jobconf.setNumReduceTasks(0);
+ return jobconf;
+ }
+
+ /** Add paths to be raided */
+ public void addRaidPaths(PolicyInfo info, List<FileStatus> paths) {
+ raidPolicyPathPairList.add(new RaidPolicyPathPair(info, paths));
+ }
+
+ /** Calculate how many maps to run. */
+ private static int getMapCount(int srcCount, int numNodes) {
+ int numMaps = (int) (srcCount / OP_PER_MAP);
+ numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
+ return Math.max(numMaps, 1);
+ }
+
+ /** invokes mapred job do parallel raiding */
+ public void doDistRaid() throws IOException {
+ if (raidPolicyPathPairList.size() == 0) {
+ LOG.info("DistRaid has no paths to raid.");
+ return;
+ }
+ try {
+ if (setup()) {
+ JobClient.runJob(jobconf);
+ }
+ } finally {
+ // delete job directory
+ final String jobdir = jobconf.get(JOB_DIR_LABEL);
+ if (jobdir != null) {
+ final Path jobpath = new Path(jobdir);
+ jobpath.getFileSystem(jobconf).delete(jobpath, true);
+ }
+ }
+ raidPolicyPathPairList.clear();
+ }
+
+ /**
+ * set up input file which has the list of input files.
+ *
+ * @return boolean
+ * @throws IOException
+ */
+ private boolean setup() throws IOException {
+ final String randomId = getRandomId();
+ JobClient jClient = new JobClient(jobconf);
+ Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+
+ LOG.info(JOB_DIR_LABEL + "=" + jobdir);
+ jobconf.set(JOB_DIR_LABEL, jobdir.toString());
+ Path log = new Path(jobdir, "_logs");
+
+ FileOutputFormat.setOutputPath(jobconf, log);
+ LOG.info("log=" + log);
+
+ // create operation list
+ FileSystem fs = jobdir.getFileSystem(jobconf);
+ Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
+ jobconf.set(OP_LIST_LABEL, opList.toString());
+ int opCount = 0, synCount = 0;
+ SequenceFile.Writer opWriter = null;
+ try {
+ opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
+ PolicyInfo.class, SequenceFile.CompressionType.NONE);
+ for (RaidPolicyPathPair p : raidPolicyPathPairList) {
+ for (FileStatus st : p.srcPaths) {
+ opWriter.append(new Text(st.getPath().toString()), p.policy);
+ opCount++;
+ if (++synCount > SYNC_FILE_MAX) {
+ opWriter.sync();
+ synCount = 0;
+ }
+ }
+ }
+
+ } finally {
+ if (opWriter != null) {
+ opWriter.close();
+ }
+ }
+ raidPolicyPathPairList.clear();
+
+ jobconf.setInt(OP_COUNT_LABEL, opCount);
+ LOG.info("Number of files=" + opCount);
+ jobconf.setNumMapTasks(getMapCount(opCount, new JobClient(jobconf)
+ .getClusterStatus().getTaskTrackers()));
+ LOG.info("jobName= " + jobName + " numMapTasks=" + jobconf.getNumMapTasks());
+ return opCount != 0;
+
+ }
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+/**
+ * Thrown when the config file for {@link CronNode} is malformed.
+ */
+public class RaidConfigurationException extends Exception {
+ private static final long serialVersionUID = 4046516718965587999L;
+
+ public RaidConfigurationException(String message) {
+ super(message);
+ }
+}