You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/10/18 16:45:51 UTC
svn commit: r1185694 [1/7] - in
/hadoop/common/branches/branch-0.20-security: ./ src/contrib/
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/
src/contrib/gridmix/sr...
Author: amarrk
Date: Tue Oct 18 14:45:48 2011
New Revision: 1185694
URL: http://svn.apache.org/viewvc?rev=1185694&view=rev
Log:
MAPREDUCE-3118. Backport Gridmix and Rumen features to branch-0.20-security (Ravi Gummadi via amarrk)
Added:
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/data/
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/data/wordcount.json.gz (with props)
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestHighRamJob.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/rumen.xml
hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz (with props)
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/package-info.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/build.xml
hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/Progress.java
hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/gridmix.xml
hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/site.xml
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Reporter.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/UtilsForTests.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/job-tracker-logs-trace-output.gz
hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/truncated-trace-output
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/Node.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Oct 18 14:45:48 2011
@@ -4,6 +4,8 @@ Release 0.20.206.0 - unreleased
NEW FEATURES
+ MAPREDUCE-3118. Backport Gridmix and Rumen features to
+ branch-0.20-security (Ravi Gummadi via amarrk)
BUG FIXES
HDFS-2305. Running multiple 2NNs can result in corrupt file system. (atm)
Modified: hadoop/common/branches/branch-0.20-security/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/build.xml?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security/build.xml Tue Oct 18 14:45:48 2011
@@ -1291,6 +1291,7 @@
<packageset dir="${mapred.src.dir}"/>
<packageset dir="${hdfs.src.dir}"/>
<packageset dir="${examples.dir}"/>
+ <packageset dir="${tools.src}"/>
<packageset dir="src/contrib/streaming/src/java"/>
<packageset dir="src/contrib/data_join/src/java"/>
@@ -1371,6 +1372,8 @@
<packageset dir="src/core"/>
<packageset dir="src/mapred"/>
<packageset dir="src/tools"/>
+ <packageset dir="${tools.src}"/>
+ <packageset dir="${tools.src}"/>
<classpath >
<path refid="classpath" />
<path refid="jdiff-classpath" />
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml Tue Oct 18 14:45:48 2011
@@ -33,6 +33,7 @@
<property name="src.dir" location="${root}/src/java"/>
<property name="src.test" location="${root}/src/test"/>
+ <property name="src.test.data" location="${root}/src/test/data"/>
<!-- Property added for contrib system tests -->
<property name="src.test.system" location="${root}/src/test/system"/>
@@ -289,6 +290,7 @@
<sysproperty key="test.build.data" value="${build.test}/data"/>
<sysproperty key="build.test" value="${build.test}"/>
+ <sysproperty key="src.test.data" value="${src.test.data}"/>
<sysproperty key="contrib.name" value="${name}"/>
<!-- requires fork=yes for:
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Tue Oct 18 14:45:48 2011
@@ -40,6 +40,8 @@ class AvgRecordFactory extends RecordFac
private final int keyLen;
private long accBytes = 0L;
private long accRecords = 0L;
+ private int unspilledBytes = 0;
+ private int minSpilledBytes = 0;
/**
* @param targetBytes Expected byte count.
@@ -48,6 +50,14 @@ class AvgRecordFactory extends RecordFac
*/
public AvgRecordFactory(long targetBytes, long targetRecords,
Configuration conf) {
+ this(targetBytes, targetRecords, conf, 0);
+ }
+
+ /**
+ * @param minSpilledBytes Minimum amount of data expected per record
+ */
+ public AvgRecordFactory(long targetBytes, long targetRecords,
+ Configuration conf, int minSpilledBytes) {
this.targetBytes = targetBytes;
this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
? Math.max(1,
@@ -58,6 +68,7 @@ class AvgRecordFactory extends RecordFac
avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
keyLen = Math.max(1,
(int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+ this.minSpilledBytes = minSpilledBytes;
}
@Override
@@ -67,14 +78,33 @@ class AvgRecordFactory extends RecordFac
}
final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
final int len = (int) Math.min(targetBytes - accBytes, reclen);
+
+ unspilledBytes += len;
+
// len != reclen?
if (key != null) {
- key.setSize(keyLen);
- val.setSize(len - key.getSize());
+ if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+ key.setSize(1);
+ val.setSize(1);
+ accBytes += key.getSize() + val.getSize();
+ unspilledBytes -= (key.getSize() + val.getSize());
+ } else {
+ key.setSize(keyLen);
+ val.setSize(unspilledBytes - key.getSize());
+ accBytes += unspilledBytes;
+ unspilledBytes = 0;
+ }
} else {
- val.setSize(len);
+ if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+ val.setSize(1);
+ accBytes += val.getSize();
+ unspilledBytes -= val.getSize();
+ } else {
+ val.setSize(unspilledBytes);
+ accBytes += unspilledBytes;
+ unspilledBytes = 0;
+ }
}
- accBytes += len;
return true;
}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+
+/**
+ * Summarizes the Hadoop cluster used in this {@link Gridmix} run.
+ * Statistics that are reported are
+ * <ul>
+ * <li>Total number of active trackers in the cluster</li>
+ * <li>Total number of blacklisted trackers in the cluster</li>
+ * <li>Max map task capacity of the cluster</li>
+ * <li>Max reduce task capacity of the cluster</li>
+ * </ul>
+ *
+ * Apart from these statistics, {@link JobTracker} and {@link FileSystem}
+ * addresses are also recorded in the summary.
+ */
+class ClusterSummarizer implements StatListener<ClusterStats> {
+ static final Log LOG = LogFactory.getLog(ClusterSummarizer.class);
+
+ private int numBlacklistedTrackers;
+ private int numActiveTrackers;
+ private int maxMapTasks;
+ private int maxReduceTasks;
+ private String jobTrackerInfo = Summarizer.NA;
+ private String namenodeInfo = Summarizer.NA;
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void update(ClusterStats item) {
+ try {
+ numBlacklistedTrackers = item.getStatus().getBlacklistedTrackers();
+ numActiveTrackers = item.getStatus().getTaskTrackers();
+ maxMapTasks = item.getStatus().getMaxMapTasks();
+ maxReduceTasks = item.getStatus().getMaxReduceTasks();
+ } catch (Exception e) {
+ long time = System.currentTimeMillis();
+ LOG.info("Error in processing cluster status at "
+ + FastDateFormat.getInstance().format(time));
+ }
+ }
+
+ /**
+ * Summarizes the cluster used for this {@link Gridmix} run.
+ */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Cluster Summary:-");
+ builder.append("\nJobTracker: ").append(getJobTrackerInfo());
+ builder.append("\nFileSystem: ").append(getNamenodeInfo());
+ builder.append("\nNumber of blacklisted trackers: ")
+ .append(getNumBlacklistedTrackers());
+ builder.append("\nNumber of active trackers: ")
+ .append(getNumActiveTrackers());
+ builder.append("\nMax map task capacity: ")
+ .append(getMaxMapTasks());
+ builder.append("\nMax reduce task capacity: ").append(getMaxReduceTasks());
+ builder.append("\n\n");
+ return builder.toString();
+ }
+
+ void start(Configuration conf) {
+ jobTrackerInfo = conf.get("mapred.job.tracker");
+ namenodeInfo = conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY);
+ }
+
+ // Getters
+ protected int getNumBlacklistedTrackers() {
+ return numBlacklistedTrackers;
+ }
+
+ protected int getNumActiveTrackers() {
+ return numActiveTrackers;
+ }
+
+ protected int getMaxMapTasks() {
+ return maxMapTasks;
+ }
+
+ protected int getMaxReduceTasks() {
+ return maxReduceTasks;
+ }
+
+ protected String getJobTrackerInfo() {
+ return jobTrackerInfo;
+ }
+
+ protected String getNamenodeInfo() {
+ return namenodeInfo;
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This is a utility class for all the compression related modules.
+ */
+class CompressionEmulationUtil {
+ static final Log LOG = LogFactory.getLog(CompressionEmulationUtil.class);
+
+ /**
+ * Enable compression usage in GridMix runs.
+ */
+ private static final String COMPRESSION_EMULATION_ENABLE =
+ "gridmix.compression-emulation.enable";
+
+ /**
+ * Enable input data decompression.
+ */
+ private static final String INPUT_DECOMPRESSION_EMULATION_ENABLE =
+ "gridmix.compression-emulation.input-decompression.enable";
+
+ /**
+ * Configuration property for setting the compression ratio for map input
+ * data.
+ */
+ private static final String GRIDMIX_MAP_INPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.map-input.decompression-ratio";
+
+ /**
+ * Configuration property for setting the compression ratio of map output.
+ */
+ private static final String GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.map-output.compression-ratio";
+
+ /**
+ * Configuration property for setting the compression ratio of reduce output.
+ */
+ private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.reduce-output.compression-ratio";
+
+ /**
+ * Default compression ratio.
+ */
+ static final float DEFAULT_COMPRESSION_RATIO = 0.5F;
+
+ private static final CompressionRatioLookupTable COMPRESSION_LOOKUP_TABLE =
+ new CompressionRatioLookupTable();
+
+ /**
+ * This is a {@link Mapper} implementation for generating random text data.
+ * It uses {@link RandomTextDataGenerator} for generating text data and the
+ * output files are compressed.
+ */
+ public static class RandomTextDataMapper
+ extends Mapper<NullWritable, LongWritable, Text, Text> {
+ private RandomTextDataGenerator rtg;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int listSize =
+ RandomTextDataGenerator.getRandomTextDataGeneratorListSize(conf);
+ int wordSize =
+ RandomTextDataGenerator.getRandomTextDataGeneratorWordSize(conf);
+ rtg = new RandomTextDataGenerator(listSize, wordSize);
+ }
+
+ /**
+ * Emits random words sequence of desired size. Note that the desired output
+ * size is passed as the value parameter to this map.
+ */
+ @Override
+ public void map(NullWritable key, LongWritable value, Context context)
+ throws IOException, InterruptedException {
+ //TODO Control the extra data written ..
+ //TODO Should the key\tvalue\n be considered for measuring size?
+ // Can counters like BYTES_WRITTEN be used? What will be the value of
+ // such counters in LocalJobRunner?
+ for (long bytes = value.get(); bytes > 0;) {
+ String randomKey = rtg.getRandomWord();
+ String randomValue = rtg.getRandomWord();
+ context.write(new Text(randomKey), new Text(randomValue));
+ bytes -= (randomValue.getBytes().length + randomKey.getBytes().length);
+ }
+ }
+ }
+
+ /**
+ * Configure the {@link Job} for enabling compression emulation.
+ */
+ static void configure(final Job job) throws IOException, InterruptedException,
+ ClassNotFoundException {
+ // set the random text mapper
+ job.setMapperClass(RandomTextDataMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setInputFormatClass(GenDataFormat.class);
+ job.setJarByClass(GenerateData.class);
+
+ // set the output compression true
+ FileOutputFormat.setCompressOutput(job, true);
+ try {
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ } catch (IOException e) {
+ LOG.error("Error while adding input path ", e);
+ }
+ }
+
+ /**
+ * This is the lookup table for mapping compression ratio to the size of the
+ * word in the {@link RandomTextDataGenerator}'s dictionary.
+ *
+ * Note that this table is computed (empirically) using a dictionary of
+ * default length i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+ */
+ private static class CompressionRatioLookupTable {
+ private static Map<Float, Integer> map = new HashMap<Float, Integer>(60);
+ private static final float MIN_RATIO = 0.07F;
+ private static final float MAX_RATIO = 0.68F;
+
+ // add the empirically obtained data points in the lookup table
+ CompressionRatioLookupTable() {
+ map.put(.07F,30);
+ map.put(.08F,25);
+ map.put(.09F,60);
+ map.put(.10F,20);
+ map.put(.11F,70);
+ map.put(.12F,15);
+ map.put(.13F,80);
+ map.put(.14F,85);
+ map.put(.15F,90);
+ map.put(.16F,95);
+ map.put(.17F,100);
+ map.put(.18F,105);
+ map.put(.19F,110);
+ map.put(.20F,115);
+ map.put(.21F,120);
+ map.put(.22F,125);
+ map.put(.23F,130);
+ map.put(.24F,140);
+ map.put(.25F,145);
+ map.put(.26F,150);
+ map.put(.27F,155);
+ map.put(.28F,160);
+ map.put(.29F,170);
+ map.put(.30F,175);
+ map.put(.31F,180);
+ map.put(.32F,190);
+ map.put(.33F,195);
+ map.put(.34F,205);
+ map.put(.35F,215);
+ map.put(.36F,225);
+ map.put(.37F,230);
+ map.put(.38F,240);
+ map.put(.39F,250);
+ map.put(.40F,260);
+ map.put(.41F,270);
+ map.put(.42F,280);
+ map.put(.43F,295);
+ map.put(.44F,310);
+ map.put(.45F,325);
+ map.put(.46F,335);
+ map.put(.47F,355);
+ map.put(.48F,375);
+ map.put(.49F,395);
+ map.put(.50F,420);
+ map.put(.51F,440);
+ map.put(.52F,465);
+ map.put(.53F,500);
+ map.put(.54F,525);
+ map.put(.55F,550);
+ map.put(.56F,600);
+ map.put(.57F,640);
+ map.put(.58F,680);
+ map.put(.59F,734);
+ map.put(.60F,813);
+ map.put(.61F,905);
+ map.put(.62F,1000);
+ map.put(.63F,1055);
+ map.put(.64F,1160);
+ map.put(.65F,1355);
+ map.put(.66F,1510);
+ map.put(.67F,1805);
+ map.put(.68F,2170);
+ }
+
+ /**
+ * Returns the size of the word in {@link RandomTextDataGenerator}'s
+ * dictionary that can generate text with the desired compression ratio.
+ *
+ * @throws RuntimeException If ratio is less than {@value #MIN_RATIO} or
+ * greater than {@value #MAX_RATIO}.
+ */
+ int getWordSizeForRatio(float ratio) {
+ ratio = standardizeCompressionRatio(ratio);
+ if (ratio >= MIN_RATIO && ratio <= MAX_RATIO) {
+ return map.get(ratio);
+ } else {
+ throw new RuntimeException("Compression ratio should be in the range ["
+ + MIN_RATIO + "," + MAX_RATIO + "]. Configured compression ratio is "
+ + ratio + ".");
+ }
+ }
+ }
+
+ /**
+ * Setup the data generator's configuration to generate compressible random
+ * text data with the desired compression ratio.
+ * Note that the compression ratio, if configured, will set the
+ * {@link RandomTextDataGenerator}'s list-size and word-size based on
+ * empirical values using the compression ratio set in the configuration.
+ *
+ * Hence to achieve the desired compression ratio,
+ * {@link RandomTextDataGenerator}'s list-size will be set to the default
+ * value i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+ */
+ static void setupDataGeneratorConfig(Configuration conf) {
+ boolean compress = isCompressionEmulationEnabled(conf);
+ if (compress) {
+ float ratio = getMapInputCompressionEmulationRatio(conf);
+ LOG.info("GridMix is configured to generate compressed input data with "
+ + " a compression ratio of " + ratio);
+ int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+ RandomTextDataGenerator.setRandomTextDataGeneratorWordSize(conf,
+ wordSize);
+
+ // since the compression ratios are computed using the default value of
+ // list size
+ RandomTextDataGenerator.setRandomTextDataGeneratorListSize(conf,
+ RandomTextDataGenerator.DEFAULT_LIST_SIZE);
+ }
+ }
+
+ /**
+ * Returns a {@link RandomTextDataGenerator} that generates random
+ * compressible text with the desired compression ratio.
+ */
+ static RandomTextDataGenerator getRandomTextDataGenerator(float ratio,
+ long seed) {
+ int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+ RandomTextDataGenerator rtg =
+ new RandomTextDataGenerator(RandomTextDataGenerator.DEFAULT_LIST_SIZE,
+ seed, wordSize);
+ return rtg;
+ }
+
+ /** Publishes compression related data statistics. Following statistics are
+ * published
+ * <ul>
+ * <li>Total compressed input data size</li>
+ * <li>Number of compressed input data files</li>
+ * <li>Compression Ratio</li>
+ * <li>Text data dictionary size</li>
+ * <li>Random text word size</li>
+ * </ul>
+ */
+ static DataStatistics publishCompressedDataStatistics(Path inputDir,
+ Configuration conf, long uncompressedDataSize)
+ throws IOException {
+ FileSystem fs = inputDir.getFileSystem(conf);
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(conf);
+
+ // iterate over compressed files and sum up the compressed file sizes
+ long compressedDataSize = 0;
+ int numCompressedFiles = 0;
+ // obtain input data file statuses
+ FileStatus[] outFileStatuses =
+ fs.listStatus(inputDir, new Utils.OutputFileUtils.OutputFilesFilter());
+ for (FileStatus status : outFileStatuses) {
+ // check if the input file is compressed
+ if (compressionCodecs != null) {
+ CompressionCodec codec = compressionCodecs.getCodec(status.getPath());
+ if (codec != null) {
+ ++numCompressedFiles;
+ compressedDataSize += status.getLen();
+ }
+ }
+ }
+
+ LOG.info("Gridmix is configured to use compressed input data.");
+ // publish the input data size
+ LOG.info("Total size of compressed input data : "
+ + StringUtils.humanReadableInt(compressedDataSize));
+ LOG.info("Total number of compressed input data files : "
+ + numCompressedFiles);
+
+ if (numCompressedFiles == 0) {
+ throw new RuntimeException("No compressed file found in the input"
+ + " directory : " + inputDir.toString() + ". To enable compression"
+ + " emulation, run Gridmix either with "
+ + " an input directory containing compressed input file(s) or"
+ + " use the -generate option to (re)generate it. If compression"
+ + " emulation is not desired, disable it by setting '"
+ + COMPRESSION_EMULATION_ENABLE + "' to 'false'.");
+ }
+
+ // publish compression ratio only if its generated in this gridmix run
+ if (uncompressedDataSize > 0) {
+ // compute the compression ratio
+ double ratio = ((double)compressedDataSize) / uncompressedDataSize;
+
+ // publish the compression ratio
+ LOG.info("Input Data Compression Ratio : " + ratio);
+ }
+
+ return new DataStatistics(compressedDataSize, numCompressedFiles, true);
+ }
+
+ /**
+ * Enables/Disables compression emulation.
+ * @param conf Target configuration where the parameter
+ * {@value #COMPRESSION_EMULATION_ENABLE} will be set.
+ * @param val The value to be set.
+ */
+ static void setCompressionEmulationEnabled(Configuration conf, boolean val) {
+ conf.setBoolean(COMPRESSION_EMULATION_ENABLE, val);
+ }
+
+ /**
+ * Checks if compression emulation is enabled or not. Default is {@code true}.
+ */
+ static boolean isCompressionEmulationEnabled(Configuration conf) {
+ return conf.getBoolean(COMPRESSION_EMULATION_ENABLE, true);
+ }
+
+ /**
+ * Enables/Disables input decompression emulation.
+ * @param conf Target configuration where the parameter
+ * {@value #INPUT_DECOMPRESSION_EMULATION_ENABLE} will be set.
+ * @param val The value to be set.
+ */
+ static void setInputCompressionEmulationEnabled(Configuration conf,
+ boolean val) {
+ conf.setBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, val);
+ }
+
+ /**
+ * Check if input decompression emulation is enabled or not.
+ * Default is {@code false}.
+ */
+ static boolean isInputCompressionEmulationEnabled(Configuration conf) {
+ return conf.getBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, false);
+ }
+
+ /**
+ * Set the map input data compression ratio in the given conf.
+ */
+ static void setMapInputCompressionEmulationRatio(Configuration conf,
+ float ratio) {
+ conf.setFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO, ratio);
+ }
+
+ /**
+ * Get the map input data compression ratio using the given configuration.
+ * If the compression ratio is not set in the configuration then use the
+ * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+ */
+ static float getMapInputCompressionEmulationRatio(Configuration conf) {
+ return conf.getFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO,
+ DEFAULT_COMPRESSION_RATIO);
+ }
+
+ /**
+ * Set the map output data compression ratio in the given configuration.
+ */
+ static void setMapOutputCompressionEmulationRatio(Configuration conf,
+ float ratio) {
+ conf.setFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO, ratio);
+ }
+
+ /**
+ * Get the map output data compression ratio using the given configuration.
+ * If the compression ratio is not set in the configuration then use the
+ * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+ */
+ static float getMapOutputCompressionEmulationRatio(Configuration conf) {
+ return conf.getFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO,
+ DEFAULT_COMPRESSION_RATIO);
+ }
+
+ /**
+ * Set the reduce output data compression ratio in the given configuration.
+ */
+ static void setReduceOutputCompressionEmulationRatio(Configuration conf,
+ float ratio) {
+ conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio);
+ }
+
+ /**
+ * Get the reduce output data compression ratio using the given configuration.
+ * If the compression ratio is not set in the configuration then use the
+ * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+ */
+ static float getReduceOutputCompressionEmulationRatio(Configuration conf) {
+ return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO,
+ DEFAULT_COMPRESSION_RATIO);
+ }
+
+ /**
+ * Standardize the compression ratio i.e round off the compression ratio to
+ * only 2 significant digits.
+ */
+ static float standardizeCompressionRatio(float ratio) {
+ // round off to 2 significant digits
+ int significant = (int)Math.round(ratio * 100);
+ return ((float)significant)/100;
+ }
+
+ /**
+ * Returns a {@link InputStream} for a file that might be compressed.
+ */
+ static InputStream getPossiblyDecompressedInputStream(Path file,
+ Configuration conf,
+ long offset)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ if (isCompressionEmulationEnabled(conf)
+ && isInputCompressionEmulationEnabled(conf)) {
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(conf);
+ CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (codec != null) {
+ Decompressor decompressor = CodecPool.getDecompressor(codec);
+ if (decompressor != null) {
+ CompressionInputStream in =
+ codec.createInputStream(fs.open(file), decompressor);
+ //TODO Seek doesnt work with compressed input stream.
+ // Use SplittableCompressionCodec?
+ return (InputStream)in;
+ }
+ }
+ }
+ FSDataInputStream in = fs.open(file);
+ in.seek(offset);
+ return (InputStream)in;
+ }
+
+ /**
+ * Returns a {@link OutputStream} for a file that might need
+ * compression.
+ */
+ static OutputStream getPossiblyCompressedOutputStream(Path file,
+ Configuration conf)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ JobConf jConf = new JobConf(conf);
+ if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
+ // get the codec class
+ Class<? extends CompressionCodec> codecClass =
+ org.apache.hadoop.mapred.FileOutputFormat
+ .getOutputCompressorClass(jConf,
+ GzipCodec.class);
+ // get the codec implementation
+ CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+ // add the appropriate extension
+ file = file.suffix(codec.getDefaultExtension());
+
+ if (isCompressionEmulationEnabled(conf)) {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return new DataOutputStream(codec.createOutputStream(fileOut));
+ }
+ }
+ return fs.create(file, false);
+ }
+
+ /**
+ * Extracts compression/decompression related configuration parameters from
+ * the source configuration to the target configuration.
+ */
+ static void configureCompressionEmulation(Configuration source,
+ Configuration target) {
+ // enable output compression
+ target.setBoolean("mapred.output.compress",
+ source.getBoolean("mapred.output.compress", false));
+
+ // set the job output compression codec
+ String jobOutputCompressionCodec =
+ source.get("mapred.output.compression.codec");
+ if (jobOutputCompressionCodec != null) {
+ target.set("mapred.output.compression.codec", jobOutputCompressionCodec);
+ }
+
+ // set the job output compression type
+ String jobOutputCompressionType =
+ source.get("mapred.output.compression.type");
+ if (jobOutputCompressionType != null) {
+ target.set("mapred.output.compression.type", jobOutputCompressionType);
+ }
+
+ // enable map output compression
+ target.setBoolean("mapred.compress.map.output",
+ source.getBoolean("mapred.compress.map.output", false));
+
+ // set the map output compression codecs
+ String mapOutputCompressionCodec =
+ source.get("mapred.map.output.compression.codec");
+ if (mapOutputCompressionCodec != null) {
+ target.set("mapred.map.output.compression.codec",
+ mapOutputCompressionCodec);
+ }
+
+ // enable input decompression
+ //TODO replace with mapInputBytes and hdfsBytesRead
+ Path[] inputs =
+ org.apache.hadoop.mapred.FileInputFormat
+ .getInputPaths(new JobConf(source));
+ boolean needsCompressedInput = false;
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(source);
+ for (Path input : inputs) {
+ CompressionCodec codec = compressionCodecs.getCodec(input);
+ if (codec != null) {
+ needsCompressedInput = true;
+ }
+ }
+ setInputCompressionEmulationEnabled(target, needsCompressedInput);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Emulation of Distributed Cache Usage in gridmix.
+ * <br> Emulation of Distributed Cache Load in gridmix will put load on
+ * TaskTrackers and affects execution time of tasks because of localization of
+ * distributed cache files by TaskTrackers.
+ * <br> Gridmix creates distributed cache files for simulated jobs by launching
+ * a MapReduce job {@link GenerateDistCacheData} in advance i.e. before
+ * launching simulated jobs.
+ * <br> The distributed cache file paths used in the original cluster are mapped
+ * to unique file names in the simulated cluster.
+ * <br> All HDFS-based distributed cache files generated by gridmix are
+ * public distributed cache files. But Gridmix makes sure that load incurred due
+ * to localization of private distributed cache files on the original cluster
+ * is also faithfully simulated. Gridmix emulates the load due to private
+ * distributed cache files by mapping private distributed cache files of
+ * different users in the original cluster to different public distributed cache
+ * files in the simulated cluster.
+ *
+ * <br> The configuration properties like
+ * {@link DistributedCache#CACHE_FILES},
+ * {@link JobContext#CACHE_FILE_VISIBILITIES},
+ * {@link DistributedCache#CACHE_FILES_SIZES} and
+ * {@link DistributedCache#CACHE_FILES_TIMESTAMPS} obtained from trace are used
+ * to decide
+ * <li> file size of each distributed cache file to be generated
+ * <li> whether a distributed cache file is already seen in this trace file
+ * <li> whether a distributed cache file was considered public or private.
+ * <br>
+ * <br> Gridmix configures these generated files as distributed cache files for
+ * the simulated jobs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DistributedCacheEmulator {
+ private static final Log LOG =
+ LogFactory.getLog(DistributedCacheEmulator.class);
+
+ static final long AVG_BYTES_PER_MAP = 128 * 1024 * 1024L;// 128MB
+
+ // If at least 1 distributed cache file is missing in the expected
+ // distributed cache dir, Gridmix cannot proceed with emulation of
+ // distributed cache load.
+ int MISSING_DIST_CACHE_FILES_ERROR = 1;
+
+ private Path distCachePath;
+
+ /**
+ * Map between simulated cluster's distributed cache file paths and their
+ * file sizes. Unique distributed cache files are entered into this map.
+ * 2 distributed cache files are considered same if and only if their
+ * file paths, visibilities and timestamps are same.
+ */
+ private Map<String, Long> distCacheFiles = new HashMap<String, Long>();
+
+ /**
+ * Configuration property for whether gridmix should emulate
+ * distributed cache usage or not. Default value is true.
+ */
+ static final String GRIDMIX_EMULATE_DISTRIBUTEDCACHE =
+ "gridmix.distributed-cache-emulation.enable";
+
+ // Whether to emulate distributed cache usage or not
+ boolean emulateDistributedCache = true;
+
+ // Whether to generate distributed cache data or not
+ boolean generateDistCacheData = false;
+
+ Configuration conf; // gridmix configuration
+
+ // Pseudo local file system where local FS based distributed cache files are
+ // created by gridmix.
+ FileSystem pseudoLocalFs = null;
+
+ /**
+ * @param conf gridmix configuration
+ * @param ioPath <ioPath>/distributedCache/ is the gridmix Distributed
+ * Cache directory
+ */
+ public DistributedCacheEmulator(Configuration conf, Path ioPath) {
+ this.conf = conf;
+ distCachePath = new Path(ioPath, "distributedCache");
+ this.conf.setClass("fs.pseudo.impl", PseudoLocalFs.class, FileSystem.class);
+ }
+
+ /**
+ * This is to be called before any other method of DistributedCacheEmulator.
+ * <br> Checks if emulation of distributed cache load is needed and is feasible.
+ * Sets the flags generateDistCacheData and emulateDistributedCache to the
+ * appropriate values.
+ * <br> Gridmix does not emulate distributed cache load if
+ * <ol><li> the specific gridmix job type doesn't need emulation of
+ * distributed cache load OR
+ * <li> the trace is coming from a stream instead of file OR
+ * <li> the distributed cache dir where distributed cache data is to be
+ * generated by gridmix is on local file system OR
+ * <li> execute permission is not there for any of the ascendant directories
+ * of <ioPath> till root. This is because for emulation of distributed
+ * cache load, distributed cache files created under
+ * <ioPath/distributedCache/public/> should be considered by hadoop
+ * as public distributed cache files.
+ * <li> creation of pseudo local file system fails.</ol>
+ * <br> For (2), (3), (4) and (5), generation of distributed cache data
+ * is also disabled.
+ *
+ * @param traceIn trace file path. If this is '-', then trace comes from the
+ * stream stdin.
+ * @param jobCreator job creator of gridmix jobs of a specific type
+ * @param generate true if -generate option was specified
+ * @throws IOException
+ */
+ void init(String traceIn, JobCreator jobCreator, boolean generate)
+ throws IOException {
+ emulateDistributedCache = jobCreator.canEmulateDistCacheLoad()
+ && conf.getBoolean(GRIDMIX_EMULATE_DISTRIBUTEDCACHE, true);
+ generateDistCacheData = generate;
+
+ if (generateDistCacheData || emulateDistributedCache) {
+ if ("-".equals(traceIn)) {// trace is from stdin
+ LOG.warn("Gridmix will not emulate Distributed Cache load because "
+ + "the input trace source is a stream instead of file.");
+ emulateDistributedCache = generateDistCacheData = false;
+ } else if (FileSystem.getLocal(conf).getUri().getScheme().equals(
+ distCachePath.toUri().getScheme())) {// local FS
+ LOG.warn("Gridmix will not emulate Distributed Cache load because "
+ + "<iopath> provided is on local file system.");
+ emulateDistributedCache = generateDistCacheData = false;
+ } else {
+ // Check if execute permission is there for all the ascendant
+ // directories of distCachePath till root.
+ FileSystem fs = FileSystem.get(conf);
+ Path cur = distCachePath.getParent();
+ while (cur != null) {
+ if (cur.toString().length() > 0) {
+ FsPermission perm = fs.getFileStatus(cur).getPermission();
+ if (!perm.getOtherAction().and(FsAction.EXECUTE).equals(
+ FsAction.EXECUTE)) {
+ LOG.warn("Gridmix will not emulate Distributed Cache load "
+ + "because the ascendant directory (of distributed cache "
+ + "directory) " + cur + " doesn't have execute permission "
+ + "for others.");
+ emulateDistributedCache = generateDistCacheData = false;
+ break;
+ }
+ }
+ cur = cur.getParent();
+ }
+ }
+ }
+
+ // Check if pseudo local file system can be created
+ try {
+ pseudoLocalFs = FileSystem.get(new URI("pseudo:///"), conf);
+ } catch (URISyntaxException e) {
+ LOG.warn("Gridmix will not emulate Distributed Cache load because "
+ + "creation of pseudo local file system failed.");
+ e.printStackTrace();
+ emulateDistributedCache = generateDistCacheData = false;
+ return;
+ }
+ }
+
+ /**
+ * @return true if gridmix should emulate distributed cache load
+ */
+ boolean shouldEmulateDistCacheLoad() {
+ return emulateDistributedCache;
+ }
+
+ /**
+ * @return true if gridmix should generate distributed cache data
+ */
+ boolean shouldGenerateDistCacheData() {
+ return generateDistCacheData;
+ }
+
+ /**
+ * @return the distributed cache directory path
+ */
+ Path getDistributedCacheDir() {
+ return distCachePath;
+ }
+
+ /**
+ * Create distributed cache directories.
+ * Also create a file that contains the list of distributed cache files
+ * that will be used as distributed cache files for all the simulated jobs.
+ * @param jsp job story producer for the trace
+ * @return exit code
+ * @throws IOException
+ */
+ int setupGenerateDistCacheData(JobStoryProducer jsp)
+ throws IOException {
+
+ createDistCacheDirectory();
+ return buildDistCacheFilesList(jsp);
+ }
+
+ /**
+ * Create distributed cache directory where distributed cache files will be
+ * created by the MapReduce job {@link GenerateDistCacheData#JOB_NAME}.
+ * @throws IOException
+ */
+ private void createDistCacheDirectory() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem.mkdirs(fs, distCachePath, new FsPermission((short) 0777));
+ }
+
+ /**
+ * Create the list of unique distributed cache files needed for all the
+ * simulated jobs and write the list to a special file.
+ * @param jsp job story producer for the trace
+ * @return exit code
+ * @throws IOException
+ */
+ private int buildDistCacheFilesList(JobStoryProducer jsp) throws IOException {
+ // Read all the jobs from the trace file and build the list of unique
+ // distributed cache files.
+ JobStory jobStory;
+ while ((jobStory = jsp.getNextJob()) != null) {
+ if (jobStory.getOutcome() == Pre21JobHistoryConstants.Values.SUCCESS &&
+ jobStory.getSubmissionTime() >= 0) {
+ updateHDFSDistCacheFilesList(jobStory);
+ }
+ }
+ jsp.close();
+
+ return writeDistCacheFilesList();
+ }
+
+ /**
+ * For the job to be simulated, identify the needed distributed cache files by
+ * mapping original cluster's distributed cache file paths to the simulated cluster's
+ * paths and add these paths in the map {@code distCacheFiles}.
+ *<br>
+ * JobStory should contain distributed cache related properties like
+ * <li> {@link DistributedCache#CACHE_FILES}
+ * <li> {@link JobContext#CACHE_FILE_VISIBILITIES}
+ * <li> {@link DistributedCache#CACHE_FILES_SIZES}
+ * <li> {@link DistributedCache#CACHE_FILES_TIMESTAMPS}
+ * <li> {@link DistributedCache#CLASSPATH_FILES}
+ *
+ * <li> {@link DistributedCache#CACHE_ARCHIVES}
+ * <li> {@link JobContext#CACHE_ARCHIVES_VISIBILITIES}
+ * <li> {@link DistributedCache#CACHE_ARCHIVES_SIZES}
+ * <li> {@link DistributedCache#CACHE_ARCHIVES_TIMESTAMPS}
+ * <li> {@link DistributedCache#CLASSPATH_ARCHIVES}
+ *
+ * <li> {@link DistributedCache#CACHE_SYMLINK}
+ *
+ * @param jobdesc JobStory of original job obtained from trace
+ * @throws IOException
+ */
+ void updateHDFSDistCacheFilesList(JobStory jobdesc) throws IOException {
+
+ // Map original job's distributed cache file paths to simulated cluster's
+ // paths, to be used by this simulated job.
+ JobConf jobConf = jobdesc.getJobConf();
+
+ String[] files = jobConf.getStrings(DistributedCache.CACHE_FILES);
+ if (files != null) {
+
+ String[] fileSizes = jobConf.getStrings(
+ DistributedCache.CACHE_FILES_SIZES);
+ String[] visibilities =
+ jobConf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+ String[] timeStamps =
+ jobConf.getStrings(DistributedCache.CACHE_FILES_TIMESTAMPS);
+
+ FileSystem fs = FileSystem.get(conf);
+ String user = jobConf.getUser();
+ for (int i = 0; i < files.length; i++) {
+ // Check if visibilities are available because older hadoop versions
+ // didn't have public, private Distributed Caches separately.
+ boolean visibility =
+ (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+ if (isLocalDistCacheFile(files[i], user, visibility)) {
+ // local FS based distributed cache file.
+ // Create this file on the pseudo local FS on the fly (i.e. when the
+ // simulated job is submitted).
+ continue;
+ }
+ // distributed cache file on hdfs
+ String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+ visibility, user);
+
+ // No need to add a distributed cache file path to the list if
+ // (1) the mapped path is already there in the list OR
+ // (2) the file with the mapped path already exists.
+ // In any of the above 2 cases, file paths, timestamps, file sizes and
+ // visibilities match. File sizes should match if file paths and
+ // timestamps match because single file path with single timestamp
+ // should correspond to a single file size.
+ if (distCacheFiles.containsKey(mappedPath) ||
+ fs.exists(new Path(mappedPath))) {
+ continue;
+ }
+ distCacheFiles.put(mappedPath, Long.valueOf(fileSizes[i]));
+ }
+ }
+ }
+
+ /**
+ * Check if the file path provided was constructed by MapReduce for a
+ * distributed cache file on local file system.
+ * @param filePath path of the distributed cache file
+ * @param user job submitter of the job for which <filePath> is a
+ * distributed cache file
+ * @param visibility <code>true</code> for public distributed cache file
+ * @return true if the path provided is of a local file system based
+ * distributed cache file
+ */
+ private boolean isLocalDistCacheFile(String filePath, String user,
+ boolean visibility) {
+ return (!visibility && filePath.contains(user + "/.staging"));
+ }
+
+ /**
+ * Map the HDFS based distributed cache file path from original cluster to
+ * a unique file name on the simulated cluster.
+ * <br> Unique distributed file names on simulated cluster are generated
+ * using original cluster's <li>file path, <li>timestamp and <li> the
+ * job-submitter for private distributed cache file.
+ * <br> This implies that if on original cluster, a single HDFS file
+ * considered as two private distributed cache files for two jobs of
+ * different users, then the corresponding simulated jobs will have two
+ * different files of the same size in public distributed cache, one for each
+ * user. Both these simulated jobs will not share these distributed cache
+ * files, thus leading to the same load as seen in the original cluster.
+ * @param file distributed cache file path
+ * @param timeStamp time stamp of dist cachce file
+ * @param isPublic true if this distributed cache file is a public
+ * distributed cache file
+ * @param user job submitter on original cluster
+ * @return the mapped path on simulated cluster
+ */
+ private String mapDistCacheFilePath(String file, String timeStamp,
+ boolean isPublic, String user) {
+ String id = file + timeStamp;
+ if (!isPublic) {
+ // consider job-submitter for private distributed cache file
+ id = id.concat(user);
+ }
+ return new Path(distCachePath, MD5Hash.digest(id).toString()).toUri()
+ .getPath();
+ }
+
+ /**
+ * Write the list of distributed cache files in the decreasing order of
+ * file sizes into the sequence file. This file will be input to the job
+ * {@link GenerateDistCacheData}.
+ * Also validates if -generate option is missing and distributed cache files
+ * are missing.
+ * @return exit code
+ * @throws IOException
+ */
+ private int writeDistCacheFilesList()
+ throws IOException {
+ // Sort the distributed cache files in the decreasing order of file sizes.
+ List dcFiles = new ArrayList(distCacheFiles.entrySet());
+ Collections.sort(dcFiles, new Comparator() {
+ public int compare(Object dc1, Object dc2) {
+ return ((Comparable) ((Map.Entry) (dc2)).getValue())
+ .compareTo(((Map.Entry) (dc1)).getValue());
+ }
+ });
+
+ // write the sorted distributed cache files to the sequence file
+ FileSystem fs = FileSystem.get(conf);
+ Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
+ conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
+ distCacheFilesList.toString());
+ SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
+ distCacheFilesList, LongWritable.class, BytesWritable.class,
+ SequenceFile.CompressionType.NONE);
+
+ // Total number of unique distributed cache files
+ int fileCount = dcFiles.size();
+ long byteCount = 0;// Total size of all distributed cache files
+ long bytesSync = 0;// Bytes after previous sync;used to add sync marker
+
+ for (Iterator it = dcFiles.iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry)it.next();
+ LongWritable fileSize =
+ new LongWritable(Long.valueOf(entry.getValue().toString()));
+ BytesWritable filePath =
+ new BytesWritable(entry.getKey().toString().getBytes());
+
+ byteCount += fileSize.get();
+ bytesSync += fileSize.get();
+ if (bytesSync > AVG_BYTES_PER_MAP) {
+ src_writer.sync();
+ bytesSync = fileSize.get();
+ }
+ src_writer.append(fileSize, filePath);
+ }
+ if (src_writer != null) {
+ src_writer.close();
+ }
+ // Set delete on exit for 'dist cache files list' as it is not needed later.
+ fs.deleteOnExit(distCacheFilesList);
+
+ conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
+ conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
+ LOG.info("Number of HDFS based distributed cache files to be generated is "
+ + fileCount + ". Total size of HDFS based distributed cache files "
+ + "to be generated is " + byteCount);
+
+ if (!shouldGenerateDistCacheData() && fileCount > 0) {
+ LOG.error("Missing " + fileCount + " distributed cache files under the "
+ + " directory\n" + distCachePath + "\nthat are needed for gridmix"
+ + " to emulate distributed cache load. Either use -generate\noption"
+ + " to generate distributed cache data along with input data OR "
+ + "disable\ndistributed cache emulation by configuring '"
+ + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ + "' to false.");
+ return MISSING_DIST_CACHE_FILES_ERROR;
+ }
+ return 0;
+ }
+
+ /**
+ * If gridmix needs to emulate distributed cache load, then configure
+ * distributed cache files of a simulated job by mapping the original
+ * cluster's distributed cache file paths to the simulated cluster's paths and
+ * setting these mapped paths in the job configuration of the simulated job.
+ * <br>
+ * Configure local FS based distributed cache files through the property
+ * "tmpfiles" and hdfs based distributed cache files through the property
+ * {@link DistributedCache#CACHE_FILES}.
+ * @param conf configuration for the simulated job to be run
+ * @param jobConf job configuration of original cluster's job, obtained from
+ * trace
+ * @throws IOException
+ */
+ void configureDistCacheFiles(Configuration conf, JobConf jobConf)
+ throws IOException {
+ if (shouldEmulateDistCacheLoad()) {
+
+ String[] files = jobConf.getStrings(DistributedCache.CACHE_FILES);
+ if (files != null) {
+ // hdfs based distributed cache files to be configured for simulated job
+ List<String> cacheFiles = new ArrayList<String>();
+ // local FS based distributed cache files to be configured for
+ // simulated job
+ List<String> localCacheFiles = new ArrayList<String>();
+
+ String[] visibilities =
+ jobConf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+ String[] timeStamps =
+ jobConf.getStrings(DistributedCache.CACHE_FILES_TIMESTAMPS);
+ String[] fileSizes =
+ jobConf.getStrings(DistributedCache.CACHE_FILES_SIZES);
+
+ String user = jobConf.getUser();
+ for (int i = 0; i < files.length; i++) {
+ // Check if visibilities are available because older hadoop versions
+ // didn't have public, private Distributed Caches separately.
+ boolean visibility =
+ (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+ if (isLocalDistCacheFile(files[i], user, visibility)) {
+ // local FS based distributed cache file.
+ // Create this file on the pseudo local FS.
+ String fileId = MD5Hash.digest(files[i] + timeStamps[i]).toString();
+ long fileSize = Long.valueOf(fileSizes[i]);
+ Path mappedLocalFilePath =
+ PseudoLocalFs.generateFilePath(fileId, fileSize)
+ .makeQualified(pseudoLocalFs.getUri(),
+ pseudoLocalFs.getWorkingDirectory());
+ pseudoLocalFs.create(mappedLocalFilePath);
+ localCacheFiles.add(mappedLocalFilePath.toUri().toString());
+ } else {
+ // hdfs based distributed cache file.
+ // Get the mapped HDFS path on simulated cluster
+ String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+ visibility, user);
+ cacheFiles.add(mappedPath);
+ }
+ }
+ if (cacheFiles.size() > 0) {
+ // configure hdfs based distributed cache files for simulated job
+ conf.setStrings(DistributedCache.CACHE_FILES,
+ cacheFiles.toArray(new String[cacheFiles.size()]));
+ }
+ if (localCacheFiles.size() > 0) {
+ // configure local FS based distributed cache files for simulated job
+ conf.setStrings("tmpfiles", localCacheFiles.toArray(
+ new String[localCacheFiles.size()]));
+ }
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java Tue Oct 18 14:45:48 2011
@@ -19,15 +19,9 @@ package org.apache.hadoop.mapred.gridmix
import java.io.IOException;
import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,4 +44,14 @@ public class EchoUserResolver implements
UserGroupInformation ugi) {
return ugi;
}
+
+ /**
+ * {@inheritDoc}
+ * <br><br>
+ * Since {@link EchoUserResolver} simply returns the user's name passed as
+ * the argument, it doesn't need a target list of users.
+ */
+ public boolean needsTargetUsersList() {
+ return false;
+ }
}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Summarizes a {@link Gridmix} run. Statistics that are reported are
+ * <ul>
+ * <li>Total number of jobs in the input trace</li>
+ * <li>Trace signature</li>
+ * <li>Total number of jobs processed from the input trace</li>
+ * <li>Total number of jobs submitted</li>
+ * <li>Total number of successful and failed jobs</li>
+ * <li>Total number of map/reduce tasks launched</li>
+ * <li>Gridmix start & end time</li>
+ * <li>Total time for the Gridmix run (data-generation and simulation)</li>
+ * <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li>
+ * </ul>
+ */
+class ExecutionSummarizer implements StatListener<JobStats> {
+ static final Log LOG = LogFactory.getLog(ExecutionSummarizer.class);
+ private static final FastDateFormat UTIL = FastDateFormat.getInstance();
+
+ private int numJobsInInputTrace;
+ private int totalSuccessfulJobs;
+ private int totalFailedJobs;
+ private int totalMapTasksLaunched;
+ private int totalReduceTasksLaunched;
+ private long totalSimulationTime;
+ private long totalRuntime;
+ private final String commandLineArgs;
+ private long startTime;
+ private long endTime;
+ private long simulationStartTime;
+ private String inputTraceLocation;
+ private String inputTraceSignature;
+ private String jobSubmissionPolicy;
+ private String resolver;
+ private DataStatistics dataStats;
+ private String expectedDataSize;
+
+ /**
+ * Basic constructor initialized with the runtime arguments.
+ */
+ ExecutionSummarizer(String[] args) {
+ startTime = System.currentTimeMillis();
+ // flatten the args string and store it
+ commandLineArgs =
+ org.apache.commons.lang.StringUtils.join(args, ' ');
+ }
+
+ /**
+ * Default constructor.
+ */
+ ExecutionSummarizer() {
+ startTime = System.currentTimeMillis();
+ commandLineArgs = Summarizer.NA;
+ }
+
+ void start(Configuration conf) {
+ simulationStartTime = System.currentTimeMillis();
+ }
+
+ private void processJobState(JobStats stats) throws Exception {
+ Job job = stats.getJob();
+ if (job.isSuccessful()) {
+ ++totalSuccessfulJobs;
+ } else {
+ ++totalFailedJobs;
+ }
+ }
+
+ private void processJobTasks(JobStats stats) throws Exception {
+ totalMapTasksLaunched += stats.getNoOfMaps();
+ Job job = stats.getJob();
+ totalReduceTasksLaunched += job.getNumReduceTasks();
+ }
+
+ private void process(JobStats stats) {
+ try {
+ // process the job run state
+ processJobState(stats);
+
+ // process the tasks information
+ processJobTasks(stats);
+ } catch (Exception e) {
+ LOG.info("Error in processing job " + stats.getJob().getJobID() + ".");
+ }
+ }
+
+ @Override
+ public void update(JobStats item) {
+ // process only if the simulation has started
+ if (simulationStartTime > 0) {
+ process(item);
+ totalSimulationTime =
+ System.currentTimeMillis() - getSimulationStartTime();
+ }
+ }
+
+ // Generates a signature for the trace file based on
+ // - filename
+ // - modification time
+ // - file length
+ // - owner
+ protected static String getTraceSignature(String input) throws IOException {
+ Path inputPath = new Path(input);
+ FileSystem fs = inputPath.getFileSystem(new Configuration());
+ FileStatus status = fs.getFileStatus(inputPath);
+ Path qPath = fs.makeQualified(status.getPath());
+ String traceID = status.getModificationTime() + qPath.toString()
+ + status.getOwner() + status.getLen();
+ return MD5Hash.digest(traceID).toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ void finalize(JobFactory factory, String inputPath, long dataSize,
+ UserResolver userResolver, DataStatistics stats,
+ Configuration conf)
+ throws IOException {
+ numJobsInInputTrace = factory.numJobsInTrace;
+ endTime = System.currentTimeMillis();
+ Path inputTracePath = new Path(inputPath);
+ FileSystem fs = inputTracePath.getFileSystem(conf);
+ inputTraceLocation = fs.makeQualified(inputTracePath).toString();
+ inputTraceSignature = getTraceSignature(inputTraceLocation);
+ jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
+ resolver = userResolver.getClass().getName();
+ if (dataSize > 0) {
+ expectedDataSize = StringUtils.humanReadableInt(dataSize);
+ } else {
+ expectedDataSize = Summarizer.NA;
+ }
+ dataStats = stats;
+ totalRuntime = System.currentTimeMillis() - getStartTime();
+ }
+
+ /**
+ * Summarizes the current {@link Gridmix} run.
+ */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Execution Summary:-");
+ builder.append("\nInput trace: ").append(getInputTraceLocation());
+ builder.append("\nInput trace signature: ")
+ .append(getInputTraceSignature());
+ builder.append("\nTotal number of jobs in trace: ")
+ .append(getNumJobsInTrace());
+ builder.append("\nExpected input data size: ")
+ .append(getExpectedDataSize());
+ builder.append("\nInput data statistics: ")
+ .append(getInputDataStatistics());
+ builder.append("\nTotal number of jobs processed: ")
+ .append(getNumSubmittedJobs());
+ builder.append("\nTotal number of successful jobs: ")
+ .append(getNumSuccessfulJobs());
+ builder.append("\nTotal number of failed jobs: ")
+ .append(getNumFailedJobs());
+ builder.append("\nTotal number of map tasks launched: ")
+ .append(getNumMapTasksLaunched());
+ builder.append("\nTotal number of reduce task launched: ")
+ .append(getNumReduceTasksLaunched());
+ builder.append("\nGridmix start time: ")
+ .append(UTIL.format(getStartTime()));
+ builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime()));
+ builder.append("\nGridmix simulation start time: ")
+ .append(UTIL.format(getStartTime()));
+ builder.append("\nGridmix runtime: ")
+ .append(StringUtils.formatTime(getRuntime()));
+ builder.append("\nTime spent in initialization (data-gen etc): ")
+ .append(StringUtils.formatTime(getInitTime()));
+ builder.append("\nTime spent in simulation: ")
+ .append(StringUtils.formatTime(getSimulationTime()));
+ builder.append("\nGridmix configuration parameters: ")
+ .append(getCommandLineArgsString());
+ builder.append("\nGridmix job submission policy: ")
+ .append(getJobSubmissionPolicy());
+ builder.append("\nGridmix resolver: ").append(getUserResolver());
+ builder.append("\n\n");
+ return builder.toString();
+ }
+
+ // Gets the stringified version of DataStatistics
+ static String stringifyDataStatistics(DataStatistics stats) {
+ if (stats != null) {
+ StringBuffer buffer = new StringBuffer();
+ String compressionStatus = stats.isDataCompressed()
+ ? "Compressed"
+ : "Uncompressed";
+ buffer.append(compressionStatus).append(" input data size: ");
+ buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
+ buffer.append(", ");
+ buffer.append("Number of files: ").append(stats.getNumFiles());
+
+ return buffer.toString();
+ } else {
+ return Summarizer.NA;
+ }
+ }
+
+ // Getters
+ protected String getExpectedDataSize() {
+ return expectedDataSize;
+ }
+
+ protected String getUserResolver() {
+ return resolver;
+ }
+
+ protected String getInputDataStatistics() {
+ return stringifyDataStatistics(dataStats);
+ }
+
+ protected String getInputTraceSignature() {
+ return inputTraceSignature;
+ }
+
+ protected String getInputTraceLocation() {
+ return inputTraceLocation;
+ }
+
+ protected int getNumJobsInTrace() {
+ return numJobsInInputTrace;
+ }
+
+ protected int getNumSuccessfulJobs() {
+ return totalSuccessfulJobs;
+ }
+
+ protected int getNumFailedJobs() {
+ return totalFailedJobs;
+ }
+
+ protected int getNumSubmittedJobs() {
+ return totalSuccessfulJobs + totalFailedJobs;
+ }
+
+ protected int getNumMapTasksLaunched() {
+ return totalMapTasksLaunched;
+ }
+
+ protected int getNumReduceTasksLaunched() {
+ return totalReduceTasksLaunched;
+ }
+
+ protected long getStartTime() {
+ return startTime;
+ }
+
+ protected long getEndTime() {
+ return endTime;
+ }
+
+ protected long getInitTime() {
+ return simulationStartTime - startTime;
+ }
+
+ protected long getSimulationStartTime() {
+ return simulationStartTime;
+ }
+
+ protected long getSimulationTime() {
+ return totalSimulationTime;
+ }
+
+ protected long getRuntime() {
+ return totalRuntime;
+ }
+
+ protected String getCommandLineArgsString() {
+ return commandLineArgs;
+ }
+
+ protected String getJobSubmissionPolicy() {
+ return jobSubmissionPolicy;
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Tue Oct 18 14:45:48 2011
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
@@ -34,7 +32,7 @@ class FileQueue extends InputStream {
private int idx = -1;
private long curlen = -1L;
- private FSDataInputStream input;
+ private InputStream input;
private final byte[] z = new byte[1];
private final Path[] paths;
private final long[] lengths;
@@ -64,9 +62,9 @@ class FileQueue extends InputStream {
idx = (idx + 1) % paths.length;
curlen = lengths[idx];
final Path file = paths[idx];
- final FileSystem fs = file.getFileSystem(conf);
- input = fs.open(file);
- input.seek(startoffset[idx]);
+ input =
+ CompressionEmulationUtil.getPossiblyDecompressedInputStream(file,
+ conf, startoffset[idx]);
}
@Override