You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC
svn commit: r1077079 [1/11] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/
src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/
src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/
src/contrib/gr...
Author: omalley
Date: Fri Mar 4 03:38:20 2011
New Revision: 1077079
URL: http://svn.apache.org/viewvc?rev=1077079&view=rev
Log:
commit ab3374db39ac9f87def8db69b6e1c55609a310f2
Author: Chris Douglas <cd...@apache.org>
Date: Tue Dec 15 20:19:19 2009 -0800
MAPREDUCE:1124 from https://issues.apache.org/jira/secure/attachment/12427971/M1124-y20-1.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1124. Import Gridmix3 and Rumen. (cdouglas)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/README
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/build.xml
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy.xml
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy/libraries.properties
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CombineFileSplit.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-three-values.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/input-minimal.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/input-one-value-many-repeats.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/input-only-one-value.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/input-three-values.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/job-tracker-logs-topology-output
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/job-tracker-logs-trace-output.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-job-tracker-logs.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/truncated-job-tracker-log
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/truncated-topology-output
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/truncated-trace-output
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/zombie/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/zombie/input-topology.json
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/zombie/input-trace.json
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepCompare.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeepInequalityException.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Histogram.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStory.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LogRecordType.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Node.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pair.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RackNode.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/build.xml
hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml
hadoop/common/branches/branch-0.20-security-patches/src/contrib/build.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
Modified: hadoop/common/branches/branch-0.20-security-patches/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/build.xml?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/build.xml Fri Mar 4 03:38:20 2011
@@ -109,6 +109,7 @@
<property name="test.junit.printsummary" value="yes" />
<property name="test.junit.haltonfailure" value="no" />
<property name="test.junit.maxmemory" value="512m" />
+ <property name="test.tools.input.dir" value="${basedir}/src/test/tools/data"/>
<property name="test.libhdfs.conf.dir" value="${c++.libhdfs.src}/tests/conf"/>
<property name="test.libhdfs.dir" value="${test.build.dir}/libhdfs"/>
@@ -508,7 +509,7 @@
description="Compile core only">
</target>
- <target name="compile-contrib" depends="compile-core,compile-c++-libhdfs">
+ <target name="compile-contrib" depends="compile-core,tools-jar,compile-c++-libhdfs">
<subant target="compile">
<property name="version" value="${version}"/>
<fileset file="${contrib.dir}/build.xml"/>
@@ -734,6 +735,7 @@
dir="${basedir}" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.build.data" value="${test.build.data}"/>
+ <sysproperty key="test.tools.input.dir" value="${test.tools.input.dir}"/>
<sysproperty key="test.cache.data" value="${test.cache.data}"/>
<sysproperty key="test.debug.data" value="${test.debug.data}"/>
<sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml Fri Mar 4 03:38:20 2011
@@ -80,6 +80,7 @@
<!-- the normal classpath -->
<path id="contrib-classpath">
<pathelement location="${build.classes}"/>
+ <pathelement location="${hadoop.root}/build/tools"/>
<fileset refid="lib.jars"/>
<pathelement location="${hadoop.root}/build/classes"/>
<fileset dir="${hadoop.root}/lib">
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/build.xml?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/build.xml Fri Mar 4 03:38:20 2011
@@ -50,6 +50,7 @@
<fileset dir="." includes="streaming/build.xml"/>
<fileset dir="." includes="fairscheduler/build.xml"/>
<fileset dir="." includes="capacity-scheduler/build.xml"/>
+ <fileset dir="." includes="gridmix/build.xml"/>
</subant>
</target>
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/README
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/README?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/README (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/README Fri Mar 4 03:38:20 2011
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+This project implements the third version of Gridmix, a benchmark for live
+clusters. Given a description of jobs (a "trace") annotated with information
+about I/O, memory, etc. a synthetic mix of jobs will be generated and submitted
+to the cluster.
+
+Documentation of usage and configuration properties in forrest is available in
+src/docs/src/documentation/content/xdocs/gridmix.xml
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/build.xml?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/build.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/build.xml Fri Mar 4 03:38:20 2011
@@ -0,0 +1,23 @@
+<?xml version="1.0" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project name="gridmix" default="jar">
+
+ <import file="../build-contrib.xml"/>
+
+</project>
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy.xml?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy.xml Fri Mar 4 03:38:20 2011
@@ -0,0 +1,97 @@
+<?xml version="1.0" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<ivy-module version="1.0">
+ <info organisation="org.apache.hadoop" module="${ant.project.name}">
+ <license name="Apache 2.0"/>
+ <description>Rumen</description>
+ </info>
+ <configurations defaultconfmapping="default">
+ <!--these match the Maven configurations-->
+ <conf name="default" extends="master,runtime"/>
+ <conf name="master" description="contains the artifact but no dependencies"/>
+ <conf name="runtime" description="runtime but not the artifact" />
+
+ <conf name="common" visibility="private" extends="runtime"
+ description="artifacts needed to compile/test the application"/>
+ </configurations>
+
+ <publications>
+ <!--get the artifact from our module name-->
+ <artifact conf="master"/>
+ </publications>
+ <dependencies>
+ <dependency org="commons-logging"
+ name="commons-logging"
+ rev="${commons-logging.version}"
+ conf="common->default"/>
+ <dependency org="log4j"
+ name="log4j"
+ rev="${log4j.version}"
+ conf="common->master"/>
+ <dependency org="junit"
+ name="junit"
+ rev="${junit.version}"
+ conf="common->default"/>
+
+ <!-- necessary for Mini*Clusters -->
+ <dependency org="commons-httpclient"
+ name="commons-httpclient"
+ rev="${commons-httpclient.version}"
+ conf="common->master"/>
+ <dependency org="commons-codec"
+ name="commons-codec"
+ rev="${commons-codec.version}"
+ conf="common->default"/>
+ <dependency org="commons-net"
+ name="commons-net"
+ rev="${commons-net.version}"
+ conf="common->default"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty-util"
+ rev="${jetty-util.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-api-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="servlet-api-2.5"
+ rev="${servlet-api-2.5.version}"
+ conf="common->master"/>
+ <dependency org="commons-cli"
+ name="commons-cli"
+ rev="${commons-cli.version}"
+ conf="common->default"/>
+ <dependency org="org.codehaus.jackson"
+ name="jackson-mapper-asl"
+ rev="${jackson.version}"
+ conf="common->default"/>
+ <dependency org="org.codehaus.jackson"
+ name="jackson-core-asl"
+ rev="${jackson.version}"
+ conf="common->default"/>
+ </dependencies>
+</ivy-module>
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy/libraries.properties?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy/libraries.properties (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/ivy/libraries.properties Fri Mar 4 03:38:20 2011
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones
+#listed in the global libraries.properties file (in alphabetical order)
+
+jackson.version=1.0.1
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Given byte and record targets, emit roughly equal-sized records satisfying
+ * the contract.
+ */
+class AvgRecordFactory extends RecordFactory {
+
+ /**
+ * Percentage of record for key data.
+ */
+ public static final String GRIDMIX_KEY_FRC = "gridmix.key.fraction";
+
+
+ private final long targetBytes;
+ private final long targetRecords;
+ private final long step;
+ private final int avgrec;
+ private final int keyLen;
+ private long accBytes = 0L;
+ private long accRecords = 0L;
+
+ /**
+ * @param targetBytes Expected byte count.
+ * @param targetRecords Expected record count.
+ * @param conf Used to resolve edge cases @see #GRIDMIX_KEY_FRC
+ */
+ public AvgRecordFactory(long targetBytes, long targetRecords,
+ Configuration conf) {
+ this.targetBytes = targetBytes;
+ this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
+ ? Math.max(1,
+ this.targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+ : targetRecords;
+ final long tmp = this.targetBytes / this.targetRecords;
+ step = this.targetBytes - this.targetRecords * tmp;
+ avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
+ keyLen = Math.max(1,
+ (int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+ }
+
+ @Override
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+ if (accBytes >= targetBytes) {
+ return false;
+ }
+ final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
+ final int len = (int) Math.min(targetBytes - accBytes, reclen);
+ // len != reclen?
+ if (key != null) {
+ key.setSize(keyLen);
+ val.setSize(len - key.getSize());
+ } else {
+ val.setSize(len);
+ }
+ accBytes += len;
+ return true;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return Math.min(1.0f, accBytes / ((float)targetBytes));
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CombineFileSplit.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CombineFileSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CombineFileSplit.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * A sub-collection of input files.
+ *
+ * Unlike {@link FileSplit}, CombineFileSplit class does not represent
+ * a split of a file, but a split of input files into smaller sets.
+ * A split may contain blocks from different file but all
+ * the blocks in the same split are probably local to some rack <br>
+ * CombineFileSplit can be used to implement {@link RecordReader}'s,
+ * with reading one record per file.
+ *
+ * @see FileSplit
+ * @see CombineFileInputFormat
+ */
+public class CombineFileSplit extends InputSplit implements Writable {
+
+ private Path[] paths;
+ private long[] startoffset;
+ private long[] lengths;
+ private String[] locations;
+ private long totLength;
+
+ /**
+ * default constructor
+ */
+ public CombineFileSplit() {}
+ public CombineFileSplit(Path[] files, long[] start,
+ long[] lengths, String[] locations) {
+ initSplit(files, start, lengths, locations);
+ }
+
+ public CombineFileSplit(Path[] files, long[] lengths) {
+ long[] startoffset = new long[files.length];
+ for (int i = 0; i < startoffset.length; i++) {
+ startoffset[i] = 0;
+ }
+ String[] locations = new String[files.length];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = "";
+ }
+ initSplit(files, startoffset, lengths, locations);
+ }
+
+ private void initSplit(Path[] files, long[] start,
+ long[] lengths, String[] locations) {
+ this.startoffset = start;
+ this.lengths = lengths;
+ this.paths = files;
+ this.totLength = 0;
+ this.locations = locations;
+ for(long length : lengths) {
+ totLength += length;
+ }
+ }
+
+ /**
+ * Copy constructor
+ */
+ public CombineFileSplit(CombineFileSplit old) throws IOException {
+ this(old.getPaths(), old.getStartOffsets(),
+ old.getLengths(), old.getLocations());
+ }
+
+ public long getLength() {
+ return totLength;
+ }
+
+ /** Returns an array containing the start offsets of the files in the split*/
+ public long[] getStartOffsets() {
+ return startoffset;
+ }
+
+ /** Returns an array containing the lengths of the files in the split*/
+ public long[] getLengths() {
+ return lengths;
+ }
+
+ /** Returns the start offset of the i<sup>th</sup> Path */
+ public long getOffset(int i) {
+ return startoffset[i];
+ }
+
+ /** Returns the length of the i<sup>th</sup> Path */
+ public long getLength(int i) {
+ return lengths[i];
+ }
+
+ /** Returns the number of Paths in the split */
+ public int getNumPaths() {
+ return paths.length;
+ }
+
+ /** Returns the i<sup>th</sup> Path */
+ public Path getPath(int i) {
+ return paths[i];
+ }
+
+ /** Returns all the Paths in the split */
+ public Path[] getPaths() {
+ return paths;
+ }
+
+ /** Returns all the Paths where this input-split resides */
+ public String[] getLocations() throws IOException {
+ return locations;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ totLength = in.readLong();
+ int arrLength = in.readInt();
+ lengths = new long[arrLength];
+ for(int i=0; i<arrLength;i++) {
+ lengths[i] = in.readLong();
+ }
+ int filesLength = in.readInt();
+ paths = new Path[filesLength];
+ for(int i=0; i<filesLength;i++) {
+ paths[i] = new Path(Text.readString(in));
+ }
+ arrLength = in.readInt();
+ startoffset = new long[arrLength];
+ for(int i=0; i<arrLength;i++) {
+ startoffset[i] = in.readLong();
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(totLength);
+ out.writeInt(lengths.length);
+ for(long length : lengths) {
+ out.writeLong(length);
+ }
+ out.writeInt(paths.length);
+ for(Path p : paths) {
+ Text.writeString(out, p.toString());
+ }
+ out.writeInt(startoffset.length);
+ for(long length : startoffset) {
+ out.writeLong(length);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < paths.length; i++) {
+ if (i == 0 ) {
+ sb.append("Paths:");
+ }
+ sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
+ "+" + lengths[i]);
+ if (i < paths.length -1) {
+ sb.append(",");
+ }
+ }
+ if (locations != null) {
+ String locs = "";
+ StringBuffer locsb = new StringBuffer();
+ for (int i = 0; i < locations.length; i++) {
+ locsb.append(locations[i] + ":");
+ }
+ locs = locsb.toString();
+ sb.append(" Locations:" + locs + "; ");
+ }
+ return sb.toString();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class for caching a pool of input data to be used by synthetic jobs for
+ * simulating read traffic.
+ */
+class FilePool {
+
+ public static final Log LOG = LogFactory.getLog(FilePool.class);
+
+ /**
+ * The minimum file size added to the pool. Default 128MiB.
+ */
+ public static final String GRIDMIX_MIN_FILE = "gridmix.min.file.size";
+
+ /**
+ * The maximum size for files added to the pool. Defualts to 100TiB.
+ */
+ public static final String GRIDMIX_MAX_TOTAL = "gridmix.max.total.scan";
+
+ private Node root;
+ private final Path path;
+ private final FileSystem fs;
+ private final Configuration conf;
+ private final ReadWriteLock updateLock;
+
+ /**
+ * Initialize a filepool under the path provided, but do not populate the
+ * cache.
+ */
+ public FilePool(Configuration conf, Path input) throws IOException {
+ root = null;
+ this.conf = conf;
+ this.path = input;
+ this.fs = path.getFileSystem(conf);
+ updateLock = new ReentrantReadWriteLock();
+ }
+
+ /**
+ * Gather a collection of files at least as large as minSize.
+ * @return The total size of files returned.
+ */
+ public long getInputFiles(long minSize, Collection<FileStatus> files)
+ throws IOException {
+ updateLock.readLock().lock();
+ try {
+ return root.selectFiles(minSize, files);
+ } finally {
+ updateLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * (Re)generate cache of input FileStatus objects.
+ */
+ public void refresh() throws IOException {
+ updateLock.writeLock().lock();
+ try {
+ root = new InnerDesc(fs, fs.getFileStatus(path),
+ new MinFileFilter(conf.getLong(GRIDMIX_MIN_FILE, 128 * 1024 * 1024),
+ conf.getLong(GRIDMIX_MAX_TOTAL, 100L * (1L << 40))));
+ if (0 == root.getSize()) {
+ throw new IOException("Found no satisfactory file in " + path);
+ }
+ } finally {
+ updateLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Get a set of locations for the given file.
+ */
+ public BlockLocation[] locationsFor(FileStatus stat, long start, long len)
+ throws IOException {
+ // TODO cache
+ return fs.getFileBlockLocations(stat, start, len);
+ }
+
+ static abstract class Node {
+
+ protected final static Random rand = new Random();
+
+ /**
+ * Total size of files and directories under the current node.
+ */
+ abstract long getSize();
+
+ /**
+ * Return a set of files whose cumulative size is at least
+ * <tt>targetSize</tt>.
+ * TODO Clearly size is not the only criterion, e.g. refresh from
+ * generated data without including running task output, tolerance
+ * for permission issues, etc.
+ */
+ abstract long selectFiles(long targetSize, Collection<FileStatus> files)
+ throws IOException;
+ }
+
+ interface IndexMapper {
+ int get(int pos);
+ void swap(int a, int b);
+ }
+
+ /**
+ * A sparse index mapping table - useful when we want to
+ * non-destructively permute a small fraction of a large array.
+ */
+ static class SparseIndexMapper implements IndexMapper {
+ Map<Integer, Integer> mapping = new HashMap<Integer, Integer>();
+
+ public int get(int pos) {
+ Integer mapped = mapping.get(pos);
+ if (mapped == null) return pos;
+ return mapped;
+ }
+
+ public void swap(int a, int b) {
+ int valA = get(a);
+ int valB = get(b);
+ if (b == valA) {
+ mapping.remove(b);
+ } else {
+ mapping.put(b, valA);
+ }
+ if (a == valB) {
+ mapping.remove(a);
+ } else {
+ mapping.put(a, valB);
+ }
+ }
+ }
+
+ /**
+ * A dense index mapping table - useful when we want to
+ * non-destructively permute a large fraction of an array.
+ */
+ static class DenseIndexMapper implements IndexMapper {
+ int[] mapping;
+
+ DenseIndexMapper(int size) {
+ mapping = new int[size];
+ for (int i=0; i<size; ++i) {
+ mapping[i] = i;
+ }
+ }
+
+ public int get(int pos) {
+ if ( (pos < 0) || (pos>=mapping.length) ) {
+ throw new IndexOutOfBoundsException();
+ }
+ return mapping[pos];
+ }
+
+ public void swap(int a, int b) {
+ int valA = get(a);
+ int valB = get(b);
+ mapping[a]=valB;
+ mapping[b]=valA;
+ }
+ }
+
+ /**
+ * Files in current directory of this Node.
+ */
+ static class LeafDesc extends Node {
+ final long size;
+ final ArrayList<FileStatus> curdir;
+
+ LeafDesc(ArrayList<FileStatus> curdir, long size) {
+ this.size = size;
+ this.curdir = curdir;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public long selectFiles(long targetSize, Collection<FileStatus> files)
+ throws IOException {
+ if (targetSize >= getSize()) {
+ files.addAll(curdir);
+ return getSize();
+ }
+
+ IndexMapper mapping;
+ if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
+ mapping = new DenseIndexMapper(curdir.size());
+ } else {
+ mapping = new SparseIndexMapper();
+ }
+
+ ArrayList<Integer> selected = new ArrayList<Integer>();
+ long ret = 0L;
+ int poolSize = curdir.size();
+ do {
+ int pos = rand.nextInt(poolSize);
+ int index = mapping.get(pos);
+ selected.add(index);
+ ret += curdir.get(index).getLen();
+ mapping.swap(pos, --poolSize);
+ } while (ret < targetSize);
+
+ for (Integer i : selected) {
+ files.add(curdir.get(i));
+ }
+
+ return ret;
+ }
+ }
+
+ /**
+ * A subdirectory of the current Node.
+ */
+ static class InnerDesc extends Node {
+ final long size;
+ final double[] dist;
+ final Node[] subdir;
+
+ private static final Comparator<Node> nodeComparator =
+ new Comparator<Node>() {
+ public int compare(Node n1, Node n2) {
+ return n1.getSize() < n2.getSize() ? -1
+ : n1.getSize() > n2.getSize() ? 1 : 0;
+ }
+ };
+
+ InnerDesc(final FileSystem fs, FileStatus thisDir, MinFileFilter filter)
+ throws IOException {
+ long fileSum = 0L;
+ final ArrayList<FileStatus> curFiles = new ArrayList<FileStatus>();
+ final ArrayList<FileStatus> curDirs = new ArrayList<FileStatus>();
+ for (FileStatus stat : fs.listStatus(thisDir.getPath())) {
+ if (stat.isDir()) {
+ curDirs.add(stat);
+ } else if (filter.accept(stat)) {
+ curFiles.add(stat);
+ fileSum += stat.getLen();
+ }
+ }
+ ArrayList<Node> subdirList = new ArrayList<Node>();
+ if (!curFiles.isEmpty()) {
+ subdirList.add(new LeafDesc(curFiles, fileSum));
+ }
+ for (Iterator<FileStatus> i = curDirs.iterator();
+ !filter.done() && i.hasNext();) {
+ // add subdirectories
+ final Node d = new InnerDesc(fs, i.next(), filter);
+ final long dSize = d.getSize();
+ if (dSize > 0) {
+ fileSum += dSize;
+ subdirList.add(d);
+ }
+ }
+ size = fileSum;
+ LOG.debug(size + " bytes in " + thisDir.getPath());
+ subdir = subdirList.toArray(new Node[subdirList.size()]);
+ Arrays.sort(subdir, nodeComparator);
+ dist = new double[subdir.length];
+ for (int i = dist.length - 1; i > 0; --i) {
+ fileSum -= subdir[i].getSize();
+ dist[i] = fileSum / (1.0 * size);
+ }
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public long selectFiles(long targetSize, Collection<FileStatus> files)
+ throws IOException {
+ long ret = 0L;
+ if (targetSize >= getSize()) {
+ // request larger than all subdirs; add everything
+ for (Node n : subdir) {
+ long added = n.selectFiles(targetSize, files);
+ ret += added;
+ targetSize -= added;
+ }
+ return ret;
+ }
+
+ // can satisfy request in proper subset of contents
+ // select random set, weighted by size
+ final HashSet<Node> sub = new HashSet<Node>();
+ do {
+ assert sub.size() < subdir.length;
+ final double r = rand.nextDouble();
+ int pos = Math.abs(Arrays.binarySearch(dist, r) + 1) - 1;
+ while (sub.contains(subdir[pos])) {
+ pos = (pos + 1) % subdir.length;
+ }
+ long added = subdir[pos].selectFiles(targetSize, files);
+ ret += added;
+ targetSize -= added;
+ sub.add(subdir[pos]);
+ } while (targetSize > 0);
+ return ret;
+ }
+ }
+
+ /**
+ * Filter enforcing the minFile/maxTotal parameters of the scan.
+ */
+ private static class MinFileFilter {
+
+ private long totalScan;
+ private final long minFileSize;
+
+ public MinFileFilter(long minFileSize, long totalScan) {
+ this.minFileSize = minFileSize;
+ this.totalScan = totalScan;
+ }
+ public boolean done() {
+ return totalScan <= 0;
+ }
+ public boolean accept(FileStatus stat) {
+ final boolean done = done();
+ if (!done && stat.getLen() >= minFileSize) {
+ totalScan -= stat.getLen();
+ return true;
+ }
+ return false;
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Given a {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit},
+ * circularly read through each input source.
+ */
+class FileQueue extends InputStream {
+
+ private int idx = -1;
+ private long curlen = -1L;
+ private FSDataInputStream input;
+ private final byte[] z = new byte[1];
+ private final Path[] paths;
+ private final long[] lengths;
+ private final long[] startoffset;
+ private final Configuration conf;
+
+ /**
+ * @param split Description of input sources.
+ * @param conf Used to resolve FileSystem instances.
+ */
+ public FileQueue(CombineFileSplit split, Configuration conf)
+ throws IOException {
+ this.conf = conf;
+ paths = split.getPaths();
+ startoffset = split.getStartOffsets();
+ lengths = split.getLengths();
+ nextSource();
+ }
+
+ protected void nextSource() throws IOException {
+ if (0 == paths.length) {
+ return;
+ }
+ if (input != null) {
+ input.close();
+ }
+ idx = (idx + 1) % paths.length;
+ curlen = lengths[idx];
+ final Path file = paths[idx];
+ final FileSystem fs = file.getFileSystem(conf);
+ input = fs.open(file);
+ input.seek(startoffset[idx]);
+ }
+
+ @Override
+ public int read() throws IOException {
+ final int tmp = read(z);
+ return tmp == -1 ? -1 : (0xFF & z[0]);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int kvread = 0;
+ while (kvread < len) {
+ if (curlen <= 0) {
+ nextSource();
+ continue;
+ }
+ final int srcRead = (int) Math.min(len - kvread, curlen);
+ IOUtils.readFully(input, b, kvread, srcRead);
+ curlen -= srcRead;
+ kvread += srcRead;
+ }
+ return kvread;
+ }
+
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+// TODO can replace with form of GridmixJob
+class GenerateData extends GridmixJob {
+
+ /**
+ * Total bytes to write.
+ */
+ public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes";
+
+ /**
+ * Maximum size per file written.
+ */
+ public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file";
+
+ /**
+ * Size of writes to output file.
+ */
+ public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes";
+
+ /**
+ * Status reporting interval, in megabytes.
+ */
+ public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";
+
+ public GenerateData(Configuration conf, Path outdir, long genbytes)
+ throws IOException {
+ super(conf, 0L, "GRIDMIX_GENDATA");
+ job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
+ FileOutputFormat.setOutputPath(job, outdir);
+ }
+
+ @Override
+ public Job call() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ job.setMapperClass(GenDataMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(BytesWritable.class);
+ job.setInputFormatClass(GenDataFormat.class);
+ job.setOutputFormatClass(RawBytesOutputFormat.class);
+ job.setJarByClass(GenerateData.class);
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ job.submit();
+ return job;
+ }
+
+ public static class GenDataMapper
+ extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {
+
+ private BytesWritable val;
+ private final Random r = new Random();
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ val = new BytesWritable(new byte[
+ context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]);
+ }
+
+ @Override
+ public void map(NullWritable key, LongWritable value, Context context)
+ throws IOException, InterruptedException {
+ for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
+ r.nextBytes(val.getBytes());
+ val.setSize((int)Math.min(val.getLength(), bytes));
+ context.write(key, val);
+ }
+ }
+
+ }
+
+ static class GenDataFormat extends InputFormat<NullWritable,LongWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ final JobClient client =
+ new JobClient(new JobConf(jobCtxt.getConfiguration()));
+ ClusterStatus stat = client.getClusterStatus(true);
+ final long toGen =
+ jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
+ if (toGen < 0) {
+ throw new IOException("Invalid/missing generation bytes: " + toGen);
+ }
+ final int nTrackers = stat.getTaskTrackers();
+ final long bytesPerTracker = toGen / nTrackers;
+ final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers);
+ final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
+ final Matcher m = trackerPattern.matcher("");
+ for (String tracker : stat.getActiveTrackerNames()) {
+ m.reset(tracker);
+ if (!m.find()) {
+ System.err.println("Skipping node: " + tracker);
+ continue;
+ }
+ final String name = m.group(1);
+ splits.add(new GenSplit(bytesPerTracker, new String[] { name }));
+ }
+ return splits;
+ }
+
+ @Override
+ public RecordReader<NullWritable,LongWritable> createRecordReader(
+ InputSplit split, final TaskAttemptContext taskContext)
+ throws IOException {
+ return new RecordReader<NullWritable,LongWritable>() {
+ long written = 0L;
+ long write = 0L;
+ long RINTERVAL;
+ long toWrite;
+ final NullWritable key = NullWritable.get();
+ final LongWritable val = new LongWritable();
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext ctxt)
+ throws IOException, InterruptedException {
+ toWrite = split.getLength();
+ RINTERVAL = ctxt.getConfiguration().getInt(
+ GRIDMIX_GEN_INTERVAL, 10) << 20;
+ }
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ written += write;
+ write = Math.min(toWrite - written, RINTERVAL);
+ val.set(write);
+ return written < toWrite;
+ }
+ @Override
+ public float getProgress() throws IOException {
+ return written / ((float)toWrite);
+ }
+ @Override
+ public NullWritable getCurrentKey() { return key; }
+ @Override
+ public LongWritable getCurrentValue() { return val; }
+ @Override
+ public void close() throws IOException {
+ taskContext.setStatus("Wrote " + toWrite);
+ }
+ };
+ }
+ }
+
+ static class GenSplit extends InputSplit implements Writable {
+ private long bytes;
+ private int nLoc;
+ private String[] locations;
+
+ public GenSplit() { }
+ public GenSplit(long bytes, String[] locations) {
+ this(bytes, locations.length, locations);
+ }
+ public GenSplit(long bytes, int nLoc, String[] locations) {
+ this.bytes = bytes;
+ this.nLoc = nLoc;
+ this.locations = Arrays.copyOf(locations, nLoc);
+ }
+ @Override
+ public long getLength() {
+ return bytes;
+ }
+ @Override
+ public String[] getLocations() {
+ return locations;
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ bytes = in.readLong();
+ nLoc = in.readInt();
+ if (null == locations || locations.length < nLoc) {
+ locations = new String[nLoc];
+ }
+ for (int i = 0; i < nLoc; ++i) {
+ locations[i] = Text.readString(in);
+ }
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(bytes);
+ out.writeInt(nLoc);
+ for (int i = 0; i < nLoc; ++i) {
+ Text.writeString(out, locations[i]);
+ }
+ }
+ }
+
+ static class RawBytesOutputFormat
+ extends FileOutputFormat<NullWritable,BytesWritable> {
+
+ @Override
+ public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
+ TaskAttemptContext job) throws IOException {
+
+ return new ChunkWriter(getDefaultWorkFile(job, ""),
+ job.getConfiguration());
+ }
+
+ static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
+ private final Path outDir;
+ private final FileSystem fs;
+ private final long maxFileBytes;
+
+ private long accFileBytes = 0L;
+ private long fileIdx = -1L;
+ private OutputStream fileOut = null;
+
+ public ChunkWriter(Path outDir, Configuration conf) throws IOException {
+ this.outDir = outDir;
+ fs = outDir.getFileSystem(conf);
+ maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
+ nextDestination();
+ }
+ private void nextDestination() throws IOException {
+ if (fileOut != null) {
+ fileOut.close();
+ }
+ fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)), false);
+ accFileBytes = 0L;
+ }
+ @Override
+ public void write(NullWritable key, BytesWritable value)
+ throws IOException {
+ int written = 0;
+ final int total = value.getLength();
+ while (written < total) {
+ final int write = (int)
+ Math.min(total - written, maxFileBytes - accFileBytes);
+ fileOut.write(value.getBytes(), written, write);
+ written += write;
+ accFileBytes += write;
+ if (accFileBytes >= maxFileBytes) {
+ nextDestination();
+ }
+ }
+ }
+ @Override
+ public void close(TaskAttemptContext ctxt) throws IOException {
+ fileOut.close();
+ }
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Driver class for the Gridmix3 benchmark. Gridmix accepts a timestamped
+ * stream (trace) of job/task descriptions. For each job in the trace, the
+ * client will submit a corresponding, synthetic job to the target cluster at
+ * the rate in the original trace. The intent is to provide a benchmark that
+ * can be configured and extended to closely match the measured resource
+ * profile of actual, production loads.
+ */
+public class Gridmix extends Configured implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+ /**
+ * Output (scratch) directory for submitted jobs. Relative paths are
+ * resolved against the path provided as input and absolute paths remain
+ * independent of it. The default is "gridmix".
+ */
+ public static final String GRIDMIX_OUT_DIR = "gridmix.output.directory";
+
+ /**
+ * Number of submitting threads at the client and upper bound for
+ * in-memory split data. Submitting threads precompute InputSplits for
+ * submitted jobs. This limits the number of splits held in memory waiting
+ * for submission and also permits parallel computation of split data.
+ */
+ public static final String GRIDMIX_SUB_THR = "gridmix.client.submit.threads";
+
+ /**
+ * The depth of the queue of job descriptions. Before splits are computed,
+ * a queue of pending descriptions is stored in memoory. This parameter
+ * limits the depth of that queue.
+ */
+ public static final String GRIDMIX_QUE_DEP =
+ "gridmix.client.pending.queue.depth";
+
+ /**
+ * Multiplier to accelerate or decelerate job submission. As a crude means of
+ * sizing a job trace to a cluster, the time separating two jobs is
+ * multiplied by this factor.
+ */
+ public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
+
+ // Submit data structures
+ private JobFactory factory;
+ private JobSubmitter submitter;
+ private JobMonitor monitor;
+
+ // Shutdown hook
+ private final Shutdown sdh = new Shutdown();
+
+ /**
+ * Write random bytes at the path provided.
+ * @see org.apache.hadoop.mapred.gridmix.GenerateData
+ */
+ protected void writeInputData(long genbytes, Path ioPath)
+ throws IOException, InterruptedException {
+ final Configuration conf = getConf();
+ final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
+ submitter.add(genData);
+ LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
+ " of test data...");
+ // TODO add listeners, use for job dependencies
+ TimeUnit.SECONDS.sleep(10);
+ try {
+ genData.getJob().waitForCompletion(false);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Internal error", e);
+ }
+ if (!genData.getJob().isSuccessful()) {
+ throw new IOException("Data generation failed!");
+ }
+ LOG.info("Done.");
+ }
+
+ protected InputStream createInputStream(String in) throws IOException {
+ if ("-".equals(in)) {
+ return System.in;
+ }
+ final Path pin = new Path(in);
+ return pin.getFileSystem(getConf()).open(pin);
+ }
+
+ /**
+ * Create each component in the pipeline and start it.
+ * @param conf Configuration data, no keys specific to this context
+ * @param traceIn Either a Path to the trace data or "-" for
+ * stdin
+ * @param ioPath Path from which input data is read
+ * @param scratchDir Path into which job output is written
+ * @param startFlag Semaphore for starting job trace pipeline
+ */
+ private void startThreads(Configuration conf, String traceIn, Path ioPath,
+ Path scratchDir, CountDownLatch startFlag) throws IOException {
+ monitor = createJobMonitor();
+ submitter = createJobSubmitter(monitor,
+ conf.getInt(GRIDMIX_SUB_THR,
+ Runtime.getRuntime().availableProcessors() + 1),
+ conf.getInt(GRIDMIX_QUE_DEP, 5),
+ new FilePool(conf, ioPath));
+ factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
+ monitor.start();
+ submitter.start();
+ factory.start();
+ }
+
+ protected JobMonitor createJobMonitor() throws IOException {
+ return new JobMonitor();
+ }
+
+ protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
+ int queueDepth, FilePool pool) throws IOException {
+ return new JobSubmitter(monitor, threads, queueDepth, pool);
+ }
+
+ protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
+ Path scratchDir, Configuration conf, CountDownLatch startFlag)
+ throws IOException {
+ return new JobFactory(submitter, createInputStream(traceIn), scratchDir,
+ conf, startFlag);
+ }
+
+ public int run(String[] argv) throws IOException, InterruptedException {
+ if (argv.length < 2) {
+ printUsage(System.err);
+ return 1;
+ }
+ long genbytes = 0;
+ String traceIn = null;
+ Path ioPath = null;
+ try {
+ int i = 0;
+ genbytes = "-generate".equals(argv[i++])
+ ? StringUtils.TraditionalBinaryPrefix.string2long(argv[i++])
+ : --i;
+ ioPath = new Path(argv[i++]);
+ traceIn = argv[i++];
+ if (i != argv.length) {
+ printUsage(System.err);
+ return 1;
+ }
+ } catch (Exception e) {
+ printUsage(System.err);
+ return 1;
+ }
+ InputStream trace = null;
+ try {
+ final Configuration conf = getConf();
+ Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
+ // add shutdown hook for SIGINT, etc.
+ Runtime.getRuntime().addShutdownHook(sdh);
+ CountDownLatch startFlag = new CountDownLatch(1);
+ try {
+ // Create, start job submission threads
+ startThreads(conf, traceIn, ioPath, scratchDir, startFlag);
+ // Write input data if specified
+ if (genbytes > 0) {
+ writeInputData(genbytes, ioPath);
+ }
+ // scan input dir contents
+ submitter.refreshFilePool();
+ } catch (Throwable e) {
+ LOG.error("Startup failed", e);
+ if (factory != null) factory.abort(); // abort pipeline
+ } finally {
+ // signal for factory to start; sets start time
+ startFlag.countDown();
+ }
+
+ if (factory != null) {
+ // wait for input exhaustion
+ factory.join(Long.MAX_VALUE);
+ final Throwable badTraceException = factory.error();
+ if (null != badTraceException) {
+ LOG.error("Error in trace", badTraceException);
+ throw new IOException("Error in trace", badTraceException);
+ }
+ // wait for pending tasks to be submitted
+ submitter.shutdown();
+ submitter.join(Long.MAX_VALUE);
+ // wait for running tasks to complete
+ monitor.shutdown();
+ monitor.join(Long.MAX_VALUE);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, trace);
+ }
+ return 0;
+ }
+
+ /**
+ * Handles orderly shutdown by requesting that each component in the
+ * pipeline abort its progress, waiting for each to exit and killing
+ * any jobs still running on the cluster.
+ */
+ class Shutdown extends Thread {
+
+ static final long FAC_SLEEP = 1000;
+ static final long SUB_SLEEP = 4000;
+ static final long MON_SLEEP = 15000;
+
+ private void killComponent(Component<?> component, long maxwait) {
+ if (component == null) {
+ return;
+ }
+ component.abort();
+ try {
+ component.join(maxwait);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted waiting for " + component);
+ }
+
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Exiting...");
+ try {
+ killComponent(factory, FAC_SLEEP); // read no more tasks
+ killComponent(submitter, SUB_SLEEP); // submit no more tasks
+ killComponent(monitor, MON_SLEEP); // process remaining jobs here
+ } finally {
+ if (monitor == null) {
+ return;
+ }
+ List<Job> remainingJobs = monitor.getRemainingJobs();
+ if (remainingJobs.isEmpty()) {
+ return;
+ }
+ LOG.info("Killing running jobs...");
+ for (Job job : remainingJobs) {
+ try {
+ if (!job.isComplete()) {
+ job.killJob();
+ LOG.info("Killed " + job.getJobName() + " (" +
+ job.getJobID() + ")");
+ } else {
+ if (job.isSuccessful()) {
+ monitor.onSuccess(job);
+ } else {
+ monitor.onFailure(job);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failure killing " + job.getJobName(), e);
+ } catch (Exception e) {
+ LOG.error("Unexcpected exception", e);
+ }
+ }
+ LOG.info("Done.");
+ }
+ }
+
+ }
+
+ public static void main(String[] argv) throws Exception {
+ int res = -1;
+ try {
+ res = ToolRunner.run(new Configuration(), new Gridmix(), argv);
+ } finally {
+ System.exit(res);
+ }
+ }
+
+ protected void printUsage(PrintStream out) {
+ ToolRunner.printGenericCommandUsage(out);
+ out.println("Usage: gridmix [-generate <MiB>] <iopath> <trace>");
+ out.println(" e.g. gridmix -generate 100m foo -");
+ out.println("Configuration parameters:");
+ out.printf(" %-40s : Output directory\n", GRIDMIX_OUT_DIR);
+ out.printf(" %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
+ out.printf(" %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
+ out.printf(" %-40s : Key fraction of rec\n",
+ AvgRecordFactory.GRIDMIX_KEY_FRC);
+ }
+
+ /**
+ * Components in the pipeline must support the following operations for
+ * orderly startup and shutdown.
+ */
+ interface Component<T> {
+
+ /**
+ * Accept an item into this component from an upstream component. If
+ * shutdown or abort have been called, this may fail, depending on the
+ * semantics for the component.
+ */
+ void add(T item) throws InterruptedException;
+
+ /**
+ * Attempt to start the service.
+ */
+ void start();
+
+ /**
+ * Wait until the service completes. It is assumed that either a
+ * {@link #shutdown} or {@link #abort} has been requested.
+ */
+ void join(long millis) throws InterruptedException;
+
+ /**
+ * Shut down gracefully, finishing all pending work. Reject new requests.
+ */
+ void shutdown();
+
+ /**
+ * Shut down immediately, aborting any work in progress and discarding
+ * all pending work. It is legal to store pending work for another
+ * thread to process.
+ */
+ void abort();
+ }
+
+}