You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/10/18 16:45:51 UTC

svn commit: r1185694 [1/7] - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/ src/contrib/gridmix/sr...

Author: amarrk
Date: Tue Oct 18 14:45:48 2011
New Revision: 1185694

URL: http://svn.apache.org/viewvc?rev=1185694&view=rev
Log:
MAPREDUCE-3118. Backport Gridmix and Rumen features to branch-0.20-security (Ravi Gummadi via amarrk)

Added:
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/data/
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/data/wordcount.json.gz   (with props)
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestHighRamJob.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
    hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/rumen.xml
    hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz   (with props)
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/package-info.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/build.xml
    hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/Progress.java
    hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/gridmix.xml
    hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/site.xml
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Reporter.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
    hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
    hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/job-tracker-logs-trace-output.gz
    hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
    hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/truncated-trace-output
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/Node.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
    hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Oct 18 14:45:48 2011
@@ -4,6 +4,8 @@ Release 0.20.206.0 - unreleased
 
   NEW FEATURES
 
+    MAPREDUCE-3118. Backport Gridmix and Rumen features to 
+                    branch-0.20-security (Ravi Gummadi via amarrk)
   BUG FIXES
 
     HDFS-2305. Running multiple 2NNs can result in corrupt file system. (atm)

Modified: hadoop/common/branches/branch-0.20-security/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/build.xml?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security/build.xml Tue Oct 18 14:45:48 2011
@@ -1291,6 +1291,7 @@
         <packageset dir="${mapred.src.dir}"/>
         <packageset dir="${hdfs.src.dir}"/>        	
     	<packageset dir="${examples.dir}"/>
+    	<packageset dir="${tools.src}"/>
 
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/data_join/src/java"/>
@@ -1371,6 +1372,8 @@
        <packageset dir="src/core"/>
        <packageset dir="src/mapred"/>
        <packageset dir="src/tools"/>
+       <packageset dir="${tools.src}"/>
+       <packageset dir="${tools.src}"/>
        <classpath >
          <path refid="classpath" />
          <path refid="jdiff-classpath" />

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/build-contrib.xml Tue Oct 18 14:45:48 2011
@@ -33,6 +33,7 @@
 
   <property name="src.dir"  location="${root}/src/java"/>
   <property name="src.test" location="${root}/src/test"/>
+  <property name="src.test.data" location="${root}/src/test/data"/>
   <!-- Property added for contrib system tests -->
   <property name="src.test.system" location="${root}/src/test/system"/>
 
@@ -289,6 +290,7 @@
       
       <sysproperty key="test.build.data" value="${build.test}/data"/>
       <sysproperty key="build.test" value="${build.test}"/>
+      <sysproperty key="src.test.data" value="${src.test.data}"/>
       <sysproperty key="contrib.name" value="${name}"/>
       
       <!-- requires fork=yes for: 

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Tue Oct 18 14:45:48 2011
@@ -40,6 +40,8 @@ class AvgRecordFactory extends RecordFac
   private final int keyLen;
   private long accBytes = 0L;
   private long accRecords = 0L;
+  private int unspilledBytes = 0;
+  private int minSpilledBytes = 0;
 
   /**
    * @param targetBytes Expected byte count.
@@ -48,6 +50,14 @@ class AvgRecordFactory extends RecordFac
    */
   public AvgRecordFactory(long targetBytes, long targetRecords,
       Configuration conf) {
+    this(targetBytes, targetRecords, conf, 0);
+  }
+  
+  /**
+   * @param minSpilledBytes Minimum amount of data expected per record
+   */
+  public AvgRecordFactory(long targetBytes, long targetRecords,
+      Configuration conf, int minSpilledBytes) {
     this.targetBytes = targetBytes;
     this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
       ? Math.max(1,
@@ -58,6 +68,7 @@ class AvgRecordFactory extends RecordFac
     avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
     keyLen = Math.max(1,
         (int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+    this.minSpilledBytes = minSpilledBytes;
   }
 
   @Override
@@ -67,14 +78,33 @@ class AvgRecordFactory extends RecordFac
     }
     final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
     final int len = (int) Math.min(targetBytes - accBytes, reclen);
+    
+    unspilledBytes += len;
+    
     // len != reclen?
     if (key != null) {
-      key.setSize(keyLen);
-      val.setSize(len - key.getSize());
+      if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+        key.setSize(1);
+        val.setSize(1);
+        accBytes += key.getSize() + val.getSize();
+        unspilledBytes -= (key.getSize() + val.getSize());
+      } else {
+        key.setSize(keyLen);
+        val.setSize(unspilledBytes - key.getSize());
+        accBytes += unspilledBytes;
+        unspilledBytes = 0;
+      }
     } else {
-      val.setSize(len);
+      if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+        val.setSize(1);
+        accBytes += val.getSize();
+        unspilledBytes -= val.getSize();
+      } else {
+        val.setSize(unspilledBytes);
+        accBytes += unspilledBytes;
+        unspilledBytes = 0;
+      }
     }
-    accBytes += len;
     return true;
   }
 

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+
+/**
+ * Summarizes the Hadoop cluster used in this {@link Gridmix} run. 
+ * Statistics that are reported are
+ * <ul>
+ *   <li>Total number of active trackers in the cluster</li>
+ *   <li>Total number of blacklisted trackers in the cluster</li>
+ *   <li>Max map task capacity of the cluster</li>
+ *   <li>Max reduce task capacity of the cluster</li>
+ * </ul>
+ * 
+ * Apart from these statistics, {@link JobTracker} and {@link FileSystem} 
+ * addresses are also recorded in the summary.
+ */
+class ClusterSummarizer implements StatListener<ClusterStats> {
+  static final Log LOG = LogFactory.getLog(ClusterSummarizer.class);
+  
+  private int numBlacklistedTrackers;
+  private int numActiveTrackers;
+  private int maxMapTasks;
+  private int maxReduceTasks;
+  private String jobTrackerInfo = Summarizer.NA;
+  private String namenodeInfo = Summarizer.NA;
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public void update(ClusterStats item) {
+    try {
+      numBlacklistedTrackers = item.getStatus().getBlacklistedTrackers();
+      numActiveTrackers = item.getStatus().getTaskTrackers();
+      maxMapTasks = item.getStatus().getMaxMapTasks();
+      maxReduceTasks = item.getStatus().getMaxReduceTasks();
+    } catch (Exception e) {
+      long time = System.currentTimeMillis();
+      LOG.info("Error in processing cluster status at " 
+               + FastDateFormat.getInstance().format(time));
+    }
+  }
+  
+  /**
+   * Summarizes the cluster used for this {@link Gridmix} run.
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Cluster Summary:-");
+    builder.append("\nJobTracker: ").append(getJobTrackerInfo());
+    builder.append("\nFileSystem: ").append(getNamenodeInfo());
+    builder.append("\nNumber of blacklisted trackers: ")
+           .append(getNumBlacklistedTrackers());
+    builder.append("\nNumber of active trackers: ")
+           .append(getNumActiveTrackers());
+    builder.append("\nMax map task capacity: ")
+           .append(getMaxMapTasks());
+    builder.append("\nMax reduce task capacity: ").append(getMaxReduceTasks());
+    builder.append("\n\n");
+    return builder.toString();
+  }
+  
+  void start(Configuration conf) {
+    jobTrackerInfo = conf.get("mapred.job.tracker");
+    namenodeInfo = conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY);
+  }
+  
+  // Getters
+  protected int getNumBlacklistedTrackers() {
+    return numBlacklistedTrackers;
+  }
+  
+  protected int getNumActiveTrackers() {
+    return numActiveTrackers;
+  }
+  
+  protected int getMaxMapTasks() {
+    return maxMapTasks;
+  }
+  
+  protected int getMaxReduceTasks() {
+    return maxReduceTasks;
+  }
+  
+  protected String getJobTrackerInfo() {
+    return jobTrackerInfo;
+  }
+  
+  protected String getNamenodeInfo() {
+    return namenodeInfo;
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This is a utility class for all the compression related modules.
+ */
+class CompressionEmulationUtil {
+  static final Log LOG = LogFactory.getLog(CompressionEmulationUtil.class);
+  
+  /**
+   * Enable compression usage in GridMix runs.
+   */
+  private static final String COMPRESSION_EMULATION_ENABLE = 
+    "gridmix.compression-emulation.enable";
+  
+  /**
+   * Enable input data decompression.
+   */
+  private static final String INPUT_DECOMPRESSION_EMULATION_ENABLE = 
+    "gridmix.compression-emulation.input-decompression.enable";
+  
+  /**
+   * Configuration property for setting the compression ratio for map input 
+   * data.
+   */
+  private static final String GRIDMIX_MAP_INPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.map-input.decompression-ratio";
+  
+  /**
+   * Configuration property for setting the compression ratio of map output.
+   */
+  private static final String GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.map-output.compression-ratio";
+  
+  /**
+   * Configuration property for setting the compression ratio of reduce output.
+   */
+  private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.reduce-output.compression-ratio";
+  
+  /**
+   * Default compression ratio.
+   */
+  static final float DEFAULT_COMPRESSION_RATIO = 0.5F;
+  
+  private static final CompressionRatioLookupTable COMPRESSION_LOOKUP_TABLE = 
+    new CompressionRatioLookupTable();
+  
+  /**
+   * This is a {@link Mapper} implementation for generating random text data.
+   * It uses {@link RandomTextDataGenerator} for generating text data and the
+   * output files are compressed.
+   */
+  public static class RandomTextDataMapper
+  extends Mapper<NullWritable, LongWritable, Text, Text> {
+    private RandomTextDataGenerator rtg;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      int listSize = 
+        RandomTextDataGenerator.getRandomTextDataGeneratorListSize(conf);
+      int wordSize = 
+        RandomTextDataGenerator.getRandomTextDataGeneratorWordSize(conf);
+      rtg = new RandomTextDataGenerator(listSize, wordSize);
+    }
+    
+    /**
+     * Emits random words sequence of desired size. Note that the desired output
+     * size is passed as the value parameter to this map.
+     */
+    @Override
+    public void map(NullWritable key, LongWritable value, Context context)
+    throws IOException, InterruptedException {
+      //TODO Control the extra data written ..
+      //TODO Should the key\tvalue\n be considered for measuring size?
+      //     Can counters like BYTES_WRITTEN be used? What will be the value of
+      //     such counters in LocalJobRunner?
+      for (long bytes = value.get(); bytes > 0;) {
+        String randomKey = rtg.getRandomWord();
+        String randomValue = rtg.getRandomWord();
+        context.write(new Text(randomKey), new Text(randomValue));
+        bytes -= (randomValue.getBytes().length + randomKey.getBytes().length);
+      }
+    }
+  }
+  
+  /**
+   * Configure the {@link Job} for enabling compression emulation.
+   */
+  static void configure(final Job job) throws IOException, InterruptedException,
+                                              ClassNotFoundException {
+    // set the random text mapper
+    job.setMapperClass(RandomTextDataMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setInputFormatClass(GenDataFormat.class);
+    job.setJarByClass(GenerateData.class);
+
+    // set the output compression true
+    FileOutputFormat.setCompressOutput(job, true);
+    try {
+      FileInputFormat.addInputPath(job, new Path("ignored"));
+    } catch (IOException e) {
+      LOG.error("Error while adding input path ", e);
+    }
+  }
+
+  /**
+   * This is the lookup table for mapping compression ratio to the size of the 
+   * word in the {@link RandomTextDataGenerator}'s dictionary. 
+   * 
+   * Note that this table is computed (empirically) using a dictionary of 
+   * default length i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+   */
+  private static class CompressionRatioLookupTable {
+    private static Map<Float, Integer> map = new HashMap<Float, Integer>(60);
+    private static final float MIN_RATIO = 0.07F;
+    private static final float MAX_RATIO = 0.68F;
+    
+    // add the empirically obtained data points in the lookup table
+    CompressionRatioLookupTable() {
+      map.put(.07F,30);
+      map.put(.08F,25);
+      map.put(.09F,60);
+      map.put(.10F,20);
+      map.put(.11F,70);
+      map.put(.12F,15);
+      map.put(.13F,80);
+      map.put(.14F,85);
+      map.put(.15F,90);
+      map.put(.16F,95);
+      map.put(.17F,100);
+      map.put(.18F,105);
+      map.put(.19F,110);
+      map.put(.20F,115);
+      map.put(.21F,120);
+      map.put(.22F,125);
+      map.put(.23F,130);
+      map.put(.24F,140);
+      map.put(.25F,145);
+      map.put(.26F,150);
+      map.put(.27F,155);
+      map.put(.28F,160);
+      map.put(.29F,170);
+      map.put(.30F,175);
+      map.put(.31F,180);
+      map.put(.32F,190);
+      map.put(.33F,195);
+      map.put(.34F,205);
+      map.put(.35F,215);
+      map.put(.36F,225);
+      map.put(.37F,230);
+      map.put(.38F,240);
+      map.put(.39F,250);
+      map.put(.40F,260);
+      map.put(.41F,270);
+      map.put(.42F,280);
+      map.put(.43F,295);
+      map.put(.44F,310);
+      map.put(.45F,325);
+      map.put(.46F,335);
+      map.put(.47F,355);
+      map.put(.48F,375);
+      map.put(.49F,395);
+      map.put(.50F,420);
+      map.put(.51F,440);
+      map.put(.52F,465);
+      map.put(.53F,500);
+      map.put(.54F,525);
+      map.put(.55F,550);
+      map.put(.56F,600);
+      map.put(.57F,640);
+      map.put(.58F,680);
+      map.put(.59F,734);
+      map.put(.60F,813);
+      map.put(.61F,905);
+      map.put(.62F,1000);
+      map.put(.63F,1055);
+      map.put(.64F,1160);
+      map.put(.65F,1355);
+      map.put(.66F,1510);
+      map.put(.67F,1805);
+      map.put(.68F,2170);
+    }
+    
+    /**
+     * Returns the size of the word in {@link RandomTextDataGenerator}'s 
+     * dictionary that can generate text with the desired compression ratio.
+     * 
+     * @throws RuntimeException If ratio is less than {@value #MIN_RATIO} or 
+     *                          greater than {@value #MAX_RATIO}.
+     */
+    int getWordSizeForRatio(float ratio) {
+      ratio = standardizeCompressionRatio(ratio);
+      if (ratio >= MIN_RATIO && ratio <= MAX_RATIO) {
+        return map.get(ratio);
+      } else {
+        throw new RuntimeException("Compression ratio should be in the range [" 
+          + MIN_RATIO + "," + MAX_RATIO + "]. Configured compression ratio is " 
+          + ratio + ".");
+      }
+    }
+  }
+  
+  /**
+   * Setup the data generator's configuration to generate compressible random 
+   * text data with the desired compression ratio.
+   * Note that the compression ratio, if configured, will set the 
+   * {@link RandomTextDataGenerator}'s list-size and word-size based on 
+   * empirical values using the compression ratio set in the configuration. 
+   * 
+   * Hence to achieve the desired compression ratio, 
+   * {@link RandomTextDataGenerator}'s list-size will be set to the default 
+   * value i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+   */
+  static void setupDataGeneratorConfig(Configuration conf) {
+    boolean compress = isCompressionEmulationEnabled(conf);
+    if (compress) {
+      float ratio = getMapInputCompressionEmulationRatio(conf);
+      LOG.info("GridMix is configured to generate compressed input data with "
+               + " a compression ratio of " + ratio);
+      int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+      RandomTextDataGenerator.setRandomTextDataGeneratorWordSize(conf, 
+                                                                 wordSize);
+
+      // since the compression ratios are computed using the default value of 
+      // list size
+      RandomTextDataGenerator.setRandomTextDataGeneratorListSize(conf, 
+          RandomTextDataGenerator.DEFAULT_LIST_SIZE);
+    }
+  }
+  
+  /**
+   * Returns a {@link RandomTextDataGenerator} that generates random 
+   * compressible text with the desired compression ratio.
+   */
+  static RandomTextDataGenerator getRandomTextDataGenerator(float ratio, 
+                                                            long seed) {
+    int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+    RandomTextDataGenerator rtg = 
+      new RandomTextDataGenerator(RandomTextDataGenerator.DEFAULT_LIST_SIZE, 
+            seed, wordSize);
+    return rtg;
+  }
+  
+  /** Publishes compression related data statistics. Following statistics are
+   * published
+   * <ul>
+   *   <li>Total compressed input data size</li>
+   *   <li>Number of compressed input data files</li>
+   *   <li>Compression Ratio</li>
+   *   <li>Text data dictionary size</li>
+   *   <li>Random text word size</li>
+   * </ul>
+   */
+  static DataStatistics publishCompressedDataStatistics(Path inputDir, 
+                          Configuration conf, long uncompressedDataSize) 
+  throws IOException {
+    FileSystem fs = inputDir.getFileSystem(conf);
+    CompressionCodecFactory compressionCodecs = 
+      new CompressionCodecFactory(conf);
+
+    // iterate over compressed files and sum up the compressed file sizes
+    long compressedDataSize = 0;
+    int numCompressedFiles = 0;
+    // obtain input data file statuses
+    FileStatus[] outFileStatuses = 
+      fs.listStatus(inputDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    for (FileStatus status : outFileStatuses) {
+      // check if the input file is compressed
+      if (compressionCodecs != null) {
+        CompressionCodec codec = compressionCodecs.getCodec(status.getPath());
+        if (codec != null) {
+          ++numCompressedFiles;
+          compressedDataSize += status.getLen();
+        }
+      }
+    }
+
+    LOG.info("Gridmix is configured to use compressed input data.");
+    // publish the input data size
+    LOG.info("Total size of compressed input data : " 
+             + StringUtils.humanReadableInt(compressedDataSize));
+    LOG.info("Total number of compressed input data files : " 
+             + numCompressedFiles);
+
+    if (numCompressedFiles == 0) {
+      throw new RuntimeException("No compressed file found in the input" 
+          + " directory : " + inputDir.toString() + ". To enable compression"
+          + " emulation, run Gridmix either with "
+          + " an input directory containing compressed input file(s) or" 
+          + " use the -generate option to (re)generate it. If compression"
+          + " emulation is not desired, disable it by setting '" 
+          + COMPRESSION_EMULATION_ENABLE + "' to 'false'.");
+    }
+    
+    // publish compression ratio only if its generated in this gridmix run
+    if (uncompressedDataSize > 0) {
+      // compute the compression ratio
+      double ratio = ((double)compressedDataSize) / uncompressedDataSize;
+
+      // publish the compression ratio
+      LOG.info("Input Data Compression Ratio : " + ratio);
+    }
+    
+    return new DataStatistics(compressedDataSize, numCompressedFiles, true);
+  }
+  
+  /**
+   * Enables/Disables compression emulation.
+   * @param conf Target configuration where the parameter 
+   * {@value #COMPRESSION_EMULATION_ENABLE} will be set. 
+   * @param val The value to be set.
+   */
+  static void setCompressionEmulationEnabled(Configuration conf, boolean val) {
+    conf.setBoolean(COMPRESSION_EMULATION_ENABLE, val);
+  }
+  
+  /**
+   * Checks if compression emulation is enabled or not. Default is {@code true}.
+   */
+  static boolean isCompressionEmulationEnabled(Configuration conf) {
+    return conf.getBoolean(COMPRESSION_EMULATION_ENABLE, true);
+  }
+  
+  /**
+   * Enables/Disables input decompression emulation.
+   * @param conf Target configuration where the parameter 
+   * {@value #INPUT_DECOMPRESSION_EMULATION_ENABLE} will be set. 
+   * @param val The value to be set.
+   */
+  static void setInputCompressionEmulationEnabled(Configuration conf, 
+                                                  boolean val) {
+    conf.setBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, val);
+  }
+  
+  /**
+   * Check if input decompression emulation is enabled or not. 
+   * Default is {@code false}.
+   */
+  static boolean isInputCompressionEmulationEnabled(Configuration conf) {
+    return conf.getBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, false);
+  }
+  
+  /**
+   * Set the map input data compression ratio in the given conf.
+   */
+  static void setMapInputCompressionEmulationRatio(Configuration conf, 
+                                                   float ratio) {
+    conf.setFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO, ratio);
+  }
+  
+  /**
+   * Get the map input data compression ratio using the given configuration.
+   * If the compression ratio is not set in the configuration then use the 
+   * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+   */
+  static float getMapInputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO, 
+                         DEFAULT_COMPRESSION_RATIO);
+  }
+  
+  /**
+   * Set the map output data compression ratio in the given configuration.
+   */
+  static void setMapOutputCompressionEmulationRatio(Configuration conf, 
+                                                    float ratio) {
+    conf.setFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO, ratio);
+  }
+  
+  /**
+   * Get the map output data compression ratio using the given configuration.
+   * If the compression ratio is not set in the configuration then use the 
+   * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+   */
+  static float getMapOutputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO, 
+                         DEFAULT_COMPRESSION_RATIO);
+  }
+  
+  /**
+   * Set the reduce output data compression ratio in the given configuration.
+   */
+  static void setReduceOutputCompressionEmulationRatio(Configuration conf, 
+                                                       float ratio) {
+    conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio);
+  }
+  
+  /**
+   * Get the reduce output data compression ratio using the given configuration.
+   * If the compression ratio is not set in the configuration then use the 
+   * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+   */
+  static float getReduceOutputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, 
+                         DEFAULT_COMPRESSION_RATIO);
+  }
+  
+  /**
+   * Standardize the compression ratio i.e round off the compression ratio to
+   * only 2 significant digits.
+   */
+  static float standardizeCompressionRatio(float ratio) {
+    // round off to 2 significant digits
+    int significant = (int)Math.round(ratio * 100);
+    return ((float)significant)/100;
+  }
+  
+  /**
+   * Returns a {@link InputStream} for a file that might be compressed.
+   */
+  static InputStream getPossiblyDecompressedInputStream(Path file, 
+                                                        Configuration conf,
+                                                        long offset)
+  throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    if (isCompressionEmulationEnabled(conf)
+        && isInputCompressionEmulationEnabled(conf)) {
+      CompressionCodecFactory compressionCodecs = 
+        new CompressionCodecFactory(conf);
+      CompressionCodec codec = compressionCodecs.getCodec(file);
+      if (codec != null) {
+        Decompressor decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          CompressionInputStream in = 
+            codec.createInputStream(fs.open(file), decompressor);
+          //TODO Seek doesnt work with compressed input stream. 
+          //     Use SplittableCompressionCodec?
+          return (InputStream)in;
+        }
+      }
+    }
+    FSDataInputStream in = fs.open(file);
+    in.seek(offset);
+    return (InputStream)in;
+  }
+  
+  /**
+   * Returns a {@link OutputStream} for a file that might need 
+   * compression.
+   */
+  static OutputStream getPossiblyCompressedOutputStream(Path file, 
+                                                        Configuration conf)
+  throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    JobConf jConf = new JobConf(conf);
+    if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
+      // get the codec class
+      Class<? extends CompressionCodec> codecClass =
+        org.apache.hadoop.mapred.FileOutputFormat
+                                .getOutputCompressorClass(jConf, 
+                                                          GzipCodec.class);
+      // get the codec implementation
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      // add the appropriate extension
+      file = file.suffix(codec.getDefaultExtension());
+
+      if (isCompressionEmulationEnabled(conf)) {
+        FSDataOutputStream fileOut = fs.create(file, false);
+        return new DataOutputStream(codec.createOutputStream(fileOut));
+      }
+    }
+    return fs.create(file, false);
+  }
+  
+  /**
+   * Extracts compression/decompression related configuration parameters from 
+   * the source configuration to the target configuration.
+   */
+  static void configureCompressionEmulation(Configuration source, 
+                                            Configuration target) {
+    // enable output compression
+    target.setBoolean("mapred.output.compress",
+    		source.getBoolean("mapred.output.compress", false));
+
+    // set the job output compression codec
+    String jobOutputCompressionCodec = 
+      source.get("mapred.output.compression.codec");
+    if (jobOutputCompressionCodec != null) {
+      target.set("mapred.output.compression.codec", jobOutputCompressionCodec);
+    }
+
+    // set the job output compression type
+    String jobOutputCompressionType = 
+      source.get("mapred.output.compression.type");
+    if (jobOutputCompressionType != null) {
+      target.set("mapred.output.compression.type", jobOutputCompressionType);
+    }
+
+    // enable map output compression
+    target.setBoolean("mapred.compress.map.output",
+        source.getBoolean("mapred.compress.map.output", false));
+
+    // set the map output compression codecs
+    String mapOutputCompressionCodec = 
+      source.get("mapred.map.output.compression.codec");
+    if (mapOutputCompressionCodec != null) {
+      target.set("mapred.map.output.compression.codec", 
+                 mapOutputCompressionCodec);
+    }
+
+    // enable input decompression
+    //TODO replace with mapInputBytes and hdfsBytesRead
+    Path[] inputs = 
+      org.apache.hadoop.mapred.FileInputFormat
+         .getInputPaths(new JobConf(source));
+    boolean needsCompressedInput = false;
+    CompressionCodecFactory compressionCodecs = 
+      new CompressionCodecFactory(source);
+    for (Path input : inputs) {
+      CompressionCodec codec = compressionCodecs.getCodec(input);
+      if (codec != null) {
+        needsCompressedInput = true;
+      }
+    }
+    setInputCompressionEmulationEnabled(target, needsCompressedInput);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Emulation of Distributed Cache Usage in gridmix.
+ * <br> Emulation of Distributed Cache Load in gridmix will put load on
+ * TaskTrackers and affects execution time of tasks because of localization of
+ * distributed cache files by TaskTrackers.
+ * <br> Gridmix creates distributed cache files for simulated jobs by launching
+ * a MapReduce job {@link GenerateDistCacheData} in advance i.e. before
+ * launching simulated jobs.
+ * <br> The distributed cache file paths used in the original cluster are mapped
+ * to unique file names in the simulated cluster.
+ * <br> All HDFS-based distributed cache files generated by gridmix are
+ * public distributed cache files. But Gridmix makes sure that load incurred due
+ * to localization of private distributed cache files on the original cluster
+ * is also faithfully simulated. Gridmix emulates the load due to private
+ * distributed cache files by mapping private distributed cache files of
+ * different users in the original cluster to different public distributed cache
+ * files in the simulated cluster.
+ *
+ * <br> The configuration properties like
+ * {@link DistributedCache#CACHE_FILES},
+ * {@link JobContext#CACHE_FILE_VISIBILITIES},
+ * {@link DistributedCache#CACHE_FILES_SIZES} and
+ * {@link DistributedCache#CACHE_FILES_TIMESTAMPS} obtained from trace are used
+ * to decide
+ * <li> file size of each distributed cache file to be generated
+ * <li> whether a distributed cache file is already seen in this trace file
+ * <li> whether a distributed cache file was considered public or private.
+ * <br>
+ * <br> Gridmix configures these generated files as distributed cache files for
+ * the simulated jobs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DistributedCacheEmulator {
+  private static final Log LOG =
+      LogFactory.getLog(DistributedCacheEmulator.class);
+
+  static final long AVG_BYTES_PER_MAP = 128 * 1024 * 1024L;// 128MB
+
+  // If at least 1 distributed cache file is missing in the expected
+  // distributed cache dir, Gridmix cannot proceed with emulation of
+  // distributed cache load.
+  int MISSING_DIST_CACHE_FILES_ERROR = 1;
+
+  private Path distCachePath;
+
+  /**
+   * Map between simulated cluster's distributed cache file paths and their
+   * file sizes. Unique distributed cache files are entered into this map.
+   * 2 distributed cache files are considered same if and only if their
+   * file paths, visibilities and timestamps are same.
+   */
+  private Map<String, Long> distCacheFiles = new HashMap<String, Long>();
+
+  /**
+   * Configuration property for whether gridmix should emulate
+   * distributed cache usage or not. Default value is true.
+   */
+  static final String GRIDMIX_EMULATE_DISTRIBUTEDCACHE =
+      "gridmix.distributed-cache-emulation.enable";
+
+  // Whether to emulate distributed cache usage or not
+  boolean emulateDistributedCache = true;
+
+  // Whether to generate distributed cache data or not
+  boolean generateDistCacheData = false;
+
+  Configuration conf; // gridmix configuration
+
+  // Pseudo local file system where local FS based distributed cache files are
+  // created by gridmix.
+  FileSystem pseudoLocalFs = null;
+
+  /**
+   * @param conf gridmix configuration
+   * @param ioPath &lt;ioPath&gt;/distributedCache/ is the gridmix Distributed
+   *               Cache directory
+   */
+  public DistributedCacheEmulator(Configuration conf, Path ioPath) {
+    this.conf = conf;
+    distCachePath = new Path(ioPath, "distributedCache");
+    this.conf.setClass("fs.pseudo.impl", PseudoLocalFs.class, FileSystem.class);
+  }
+
+  /**
+   * This is to be called before any other method of DistributedCacheEmulator.
+   * <br> Checks if emulation of distributed cache load is needed and is feasible.
+   *  Sets the flags generateDistCacheData and emulateDistributedCache to the
+   *  appropriate values.
+   * <br> Gridmix does not emulate distributed cache load if
+   * <ol><li> the specific gridmix job type doesn't need emulation of
+   * distributed cache load OR
+   * <li> the trace is coming from a stream instead of file OR
+   * <li> the distributed cache dir where distributed cache data is to be
+   * generated by gridmix is on local file system OR
+   * <li> execute permission is not there for any of the ascendant directories
+   * of &lt;ioPath&gt; till root. This is because for emulation of distributed
+   * cache load, distributed cache files created under
+   * &lt;ioPath/distributedCache/public/&gt; should be considered by hadoop
+   * as public distributed cache files.
+   * <li> creation of pseudo local file system fails.</ol>
+   * <br> For (2), (3), (4) and (5), generation of distributed cache data
+   * is also disabled.
+   * 
+   * @param traceIn trace file path. If this is '-', then trace comes from the
+   *                stream stdin.
+   * @param jobCreator job creator of gridmix jobs of a specific type
+   * @param generate  true if -generate option was specified
+   * @throws IOException
+   */
+  void init(String traceIn, JobCreator jobCreator, boolean generate)
+      throws IOException {
+    emulateDistributedCache = jobCreator.canEmulateDistCacheLoad()
+        && conf.getBoolean(GRIDMIX_EMULATE_DISTRIBUTEDCACHE, true);
+    generateDistCacheData = generate;
+
+    if (generateDistCacheData || emulateDistributedCache) {
+      if ("-".equals(traceIn)) {// trace is from stdin
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "the input trace source is a stream instead of file.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else if (FileSystem.getLocal(conf).getUri().getScheme().equals(
+          distCachePath.toUri().getScheme())) {// local FS
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "<iopath> provided is on local file system.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else {
+        // Check if execute permission is there for all the ascendant
+        // directories of distCachePath till root.
+        FileSystem fs = FileSystem.get(conf);
+        Path cur = distCachePath.getParent();
+        while (cur != null) {
+          if (cur.toString().length() > 0) {
+            FsPermission perm = fs.getFileStatus(cur).getPermission();
+            if (!perm.getOtherAction().and(FsAction.EXECUTE).equals(
+                FsAction.EXECUTE)) {
+              LOG.warn("Gridmix will not emulate Distributed Cache load "
+                  + "because the ascendant directory (of distributed cache "
+                  + "directory) " + cur + " doesn't have execute permission "
+                  + "for others.");
+              emulateDistributedCache = generateDistCacheData = false;
+              break;
+            }
+          }
+          cur = cur.getParent();
+        }
+      }
+    }
+
+    // Check if pseudo local file system can be created
+    try {
+      pseudoLocalFs = FileSystem.get(new URI("pseudo:///"), conf);
+    } catch (URISyntaxException e) {
+      LOG.warn("Gridmix will not emulate Distributed Cache load because "
+          + "creation of pseudo local file system failed.");
+      e.printStackTrace();
+      emulateDistributedCache = generateDistCacheData = false;
+      return;
+    }
+  }
+
+  /**
+   * @return true if gridmix should emulate distributed cache load
+   */
+  boolean shouldEmulateDistCacheLoad() {
+    return emulateDistributedCache;
+  }
+
+  /**
+   * @return true if gridmix should generate distributed cache data
+   */
+  boolean shouldGenerateDistCacheData() {
+    return generateDistCacheData;
+  }
+
+  /**
+   * @return the distributed cache directory path
+   */
+  Path getDistributedCacheDir() {
+    return distCachePath;
+  }
+
+  /**
+   * Create distributed cache directories.
+   * Also create a file that contains the list of distributed cache files
+   * that will be used as distributed cache files for all the simulated jobs.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  int setupGenerateDistCacheData(JobStoryProducer jsp)
+      throws IOException {
+
+    createDistCacheDirectory();
+    return buildDistCacheFilesList(jsp);
+  }
+
+  /**
+   * Create distributed cache directory where distributed cache files will be
+   * created by the MapReduce job {@link GenerateDistCacheData#JOB_NAME}.
+   * @throws IOException
+   */
+  private void createDistCacheDirectory() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem.mkdirs(fs, distCachePath, new FsPermission((short) 0777));
+  }
+
+  /**
+   * Create the list of unique distributed cache files needed for all the
+   * simulated jobs and write the list to a special file.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  private int buildDistCacheFilesList(JobStoryProducer jsp) throws IOException {
+    // Read all the jobs from the trace file and build the list of unique
+    // distributed cache files.
+    JobStory jobStory;
+    while ((jobStory = jsp.getNextJob()) != null) {
+      if (jobStory.getOutcome() == Pre21JobHistoryConstants.Values.SUCCESS && 
+         jobStory.getSubmissionTime() >= 0) {
+        updateHDFSDistCacheFilesList(jobStory);
+      }
+    }
+    jsp.close();
+
+    return writeDistCacheFilesList();
+  }
+
+  /**
+   * For the job to be simulated, identify the needed distributed cache files by
+   * mapping original cluster's distributed cache file paths to the simulated cluster's
+   * paths and add these paths in the map {@code distCacheFiles}.
+   *<br>
+   * JobStory should contain distributed cache related properties like
+   * <li> {@link DistributedCache#CACHE_FILES}
+   * <li> {@link JobContext#CACHE_FILE_VISIBILITIES}
+   * <li> {@link DistributedCache#CACHE_FILES_SIZES}
+   * <li> {@link DistributedCache#CACHE_FILES_TIMESTAMPS}
+   * <li> {@link DistributedCache#CLASSPATH_FILES}
+   *
+   * <li> {@link DistributedCache#CACHE_ARCHIVES}
+   * <li> {@link JobContext#CACHE_ARCHIVES_VISIBILITIES}
+   * <li> {@link DistributedCache#CACHE_ARCHIVES_SIZES}
+   * <li> {@link DistributedCache#CACHE_ARCHIVES_TIMESTAMPS}
+   * <li> {@link DistributedCache#CLASSPATH_ARCHIVES}
+   *
+   * <li> {@link DistributedCache#CACHE_SYMLINK}
+   *
+   * @param jobdesc JobStory of original job obtained from trace
+   * @throws IOException
+   */
+  void updateHDFSDistCacheFilesList(JobStory jobdesc) throws IOException {
+
+    // Map original job's distributed cache file paths to simulated cluster's
+    // paths, to be used by this simulated job.
+    JobConf jobConf = jobdesc.getJobConf();
+
+    String[] files = jobConf.getStrings(DistributedCache.CACHE_FILES);
+    if (files != null) {
+
+      String[] fileSizes = jobConf.getStrings(
+                               DistributedCache.CACHE_FILES_SIZES);
+      String[] visibilities =
+        jobConf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+      String[] timeStamps =
+        jobConf.getStrings(DistributedCache.CACHE_FILES_TIMESTAMPS);
+
+      FileSystem fs = FileSystem.get(conf);
+      String user = jobConf.getUser();
+      for (int i = 0; i < files.length; i++) {
+        // Check if visibilities are available because older hadoop versions
+        // didn't have public, private Distributed Caches separately.
+        boolean visibility =
+            (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+        if (isLocalDistCacheFile(files[i], user, visibility)) {
+          // local FS based distributed cache file.
+          // Create this file on the pseudo local FS on the fly (i.e. when the
+          // simulated job is submitted).
+          continue;
+        }
+        // distributed cache file on hdfs
+        String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                                                 visibility, user);
+
+        // No need to add a distributed cache file path to the list if
+        // (1) the mapped path is already there in the list OR
+        // (2) the file with the mapped path already exists.
+        // In any of the above 2 cases, file paths, timestamps, file sizes and
+        // visibilities match. File sizes should match if file paths and
+        // timestamps match because single file path with single timestamp
+        // should correspond to a single file size.
+        if (distCacheFiles.containsKey(mappedPath) ||
+            fs.exists(new Path(mappedPath))) {
+          continue;
+        }
+        distCacheFiles.put(mappedPath, Long.valueOf(fileSizes[i]));
+      }
+    }
+  }
+
+  /**
+   * Check if the file path provided was constructed by MapReduce for a
+   * distributed cache file on local file system.
+   * @param filePath path of the distributed cache file
+   * @param user job submitter of the job for which &lt;filePath&gt; is a
+   *             distributed cache file
+   * @param visibility <code>true</code> for public distributed cache file
+   * @return true if the path provided is of a local file system based
+   *              distributed cache file
+   */
+  private boolean isLocalDistCacheFile(String filePath, String user,
+                                       boolean visibility) {
+    return (!visibility && filePath.contains(user + "/.staging"));
+  }
+
+  /**
+   * Map the HDFS based distributed cache file path from original cluster to
+   * a unique file name on the simulated cluster.
+   * <br> Unique  distributed file names on simulated cluster are generated
+   * using original cluster's <li>file path, <li>timestamp and <li> the
+   * job-submitter for private distributed cache file.
+   * <br> This implies that if on original cluster, a single HDFS file
+   * considered as two private distributed cache files for two jobs of
+   * different users, then the corresponding simulated jobs will have two
+   * different files of the same size in public distributed cache, one for each
+   * user. Both these simulated jobs will not share these distributed cache
+   * files, thus leading to the same load as seen in the original cluster.
+   * @param file distributed cache file path
+   * @param timeStamp time stamp of dist cachce file
+   * @param isPublic true if this distributed cache file is a public
+   *                 distributed cache file
+   * @param user job submitter on original cluster
+   * @return the mapped path on simulated cluster
+   */
+  private String mapDistCacheFilePath(String file, String timeStamp,
+      boolean isPublic, String user) {
+    String id = file + timeStamp;
+    if (!isPublic) {
+      // consider job-submitter for private distributed cache file
+      id = id.concat(user);
+    }
+    return new Path(distCachePath, MD5Hash.digest(id).toString()).toUri()
+               .getPath();
+  }
+
+  /**
+   * Write the list of distributed cache files in the decreasing order of
+   * file sizes into the sequence file. This file will be input to the job
+   * {@link GenerateDistCacheData}.
+   * Also validates if -generate option is missing and distributed cache files
+   * are missing.
+   * @return exit code
+   * @throws IOException
+   */
+  private int writeDistCacheFilesList()
+      throws IOException {
+    // Sort the distributed cache files in the decreasing order of file sizes.
+    List dcFiles = new ArrayList(distCacheFiles.entrySet());
+    Collections.sort(dcFiles, new Comparator() {
+      public int compare(Object dc1, Object dc2) {
+        return ((Comparable) ((Map.Entry) (dc2)).getValue())
+            .compareTo(((Map.Entry) (dc1)).getValue());
+      }
+    });
+
+    // write the sorted distributed cache files to the sequence file
+    FileSystem fs = FileSystem.get(conf);
+    Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
+    conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
+        distCacheFilesList.toString());
+    SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
+        distCacheFilesList, LongWritable.class, BytesWritable.class,
+        SequenceFile.CompressionType.NONE);
+
+    // Total number of unique distributed cache files
+    int fileCount = dcFiles.size();
+    long byteCount = 0;// Total size of all distributed cache files
+    long bytesSync = 0;// Bytes after previous sync;used to add sync marker
+
+    for (Iterator it = dcFiles.iterator(); it.hasNext();) {
+      Map.Entry entry = (Map.Entry)it.next();
+      LongWritable fileSize =
+          new LongWritable(Long.valueOf(entry.getValue().toString()));
+      BytesWritable filePath =
+          new BytesWritable(entry.getKey().toString().getBytes());
+
+      byteCount += fileSize.get();
+      bytesSync += fileSize.get();
+      if (bytesSync > AVG_BYTES_PER_MAP) {
+        src_writer.sync();
+        bytesSync = fileSize.get();
+      }
+      src_writer.append(fileSize, filePath);
+    }
+    if (src_writer != null) {
+      src_writer.close();
+    }
+    // Set delete on exit for 'dist cache files list' as it is not needed later.
+    fs.deleteOnExit(distCacheFilesList);
+
+    conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
+    conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
+    LOG.info("Number of HDFS based distributed cache files to be generated is "
+        + fileCount + ". Total size of HDFS based distributed cache files "
+        + "to be generated is " + byteCount);
+
+    if (!shouldGenerateDistCacheData() && fileCount > 0) {
+      LOG.error("Missing " + fileCount + " distributed cache files under the "
+          + " directory\n" + distCachePath + "\nthat are needed for gridmix"
+          + " to emulate distributed cache load. Either use -generate\noption"
+          + " to generate distributed cache data along with input data OR "
+          + "disable\ndistributed cache emulation by configuring '"
+          + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+          + "' to false.");
+      return MISSING_DIST_CACHE_FILES_ERROR;
+    }
+    return 0;
+  }
+
+  /**
+   * If gridmix needs to emulate distributed cache load, then configure
+   * distributed cache files of a simulated job by mapping the original
+   * cluster's distributed cache file paths to the simulated cluster's paths and
+   * setting these mapped paths in the job configuration of the simulated job.
+   * <br>
+   * Configure local FS based distributed cache files through the property
+   * "tmpfiles" and hdfs based distributed cache files through the property
+   * {@link DistributedCache#CACHE_FILES}.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  void configureDistCacheFiles(Configuration conf, JobConf jobConf)
+      throws IOException {
+    if (shouldEmulateDistCacheLoad()) {
+
+      String[] files = jobConf.getStrings(DistributedCache.CACHE_FILES);
+      if (files != null) {
+        // hdfs based distributed cache files to be configured for simulated job
+        List<String> cacheFiles = new ArrayList<String>();
+        // local FS based distributed cache files to be configured for
+        // simulated job
+        List<String> localCacheFiles = new ArrayList<String>();
+
+        String[] visibilities =
+          jobConf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+        String[] timeStamps =
+          jobConf.getStrings(DistributedCache.CACHE_FILES_TIMESTAMPS);
+        String[] fileSizes =
+          jobConf.getStrings(DistributedCache.CACHE_FILES_SIZES);
+
+        String user = jobConf.getUser();
+        for (int i = 0; i < files.length; i++) {
+          // Check if visibilities are available because older hadoop versions
+          // didn't have public, private Distributed Caches separately.
+          boolean visibility =
+            (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+          if (isLocalDistCacheFile(files[i], user, visibility)) {
+            // local FS based distributed cache file.
+            // Create this file on the pseudo local FS.
+            String fileId = MD5Hash.digest(files[i] + timeStamps[i]).toString();
+            long fileSize = Long.valueOf(fileSizes[i]);
+            Path mappedLocalFilePath =
+                PseudoLocalFs.generateFilePath(fileId, fileSize)
+                    .makeQualified(pseudoLocalFs.getUri(),
+                                   pseudoLocalFs.getWorkingDirectory());
+            pseudoLocalFs.create(mappedLocalFilePath);
+            localCacheFiles.add(mappedLocalFilePath.toUri().toString());
+          } else {
+            // hdfs based distributed cache file.
+            // Get the mapped HDFS path on simulated cluster
+            String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                                                     visibility, user);
+            cacheFiles.add(mappedPath);
+          }
+        }
+        if (cacheFiles.size() > 0) {
+          // configure hdfs based distributed cache files for simulated job
+          conf.setStrings(DistributedCache.CACHE_FILES,
+                          cacheFiles.toArray(new String[cacheFiles.size()]));
+        }
+        if (localCacheFiles.size() > 0) {
+          // configure local FS based distributed cache files for simulated job
+          conf.setStrings("tmpfiles", localCacheFiles.toArray(
+                                        new String[localCacheFiles.size()]));
+        }
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java Tue Oct 18 14:45:48 2011
@@ -19,15 +19,9 @@ package org.apache.hadoop.mapred.gridmix
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -50,4 +44,14 @@ public class EchoUserResolver implements
       UserGroupInformation ugi) {
     return ugi;
   }
+
+  /**
+   * {@inheritDoc}
+   * <br><br>
+   * Since {@link EchoUserResolver} simply returns the user's name passed as
+   * the argument, it doesn't need a target list of users.
+   */
+  public boolean needsTargetUsersList() {
+    return false;
+  }
 }

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Summarizes a {@link Gridmix} run. Statistics that are reported are
+ * <ul>
+ *   <li>Total number of jobs in the input trace</li>
+ *   <li>Trace signature</li>
+ *   <li>Total number of jobs processed from the input trace</li>
+ *   <li>Total number of jobs submitted</li>
+ *   <li>Total number of successful and failed jobs</li>
+ *   <li>Total number of map/reduce tasks launched</li>
+ *   <li>Gridmix start & end time</li>
+ *   <li>Total time for the Gridmix run (data-generation and simulation)</li>
+ *   <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li>
+ * </ul>
+ */
+class ExecutionSummarizer implements StatListener<JobStats> {
+  static final Log LOG = LogFactory.getLog(ExecutionSummarizer.class);
+  private static final FastDateFormat UTIL = FastDateFormat.getInstance();
+  
+  private int numJobsInInputTrace;
+  private int totalSuccessfulJobs;
+  private int totalFailedJobs;
+  private int totalMapTasksLaunched;
+  private int totalReduceTasksLaunched;
+  private long totalSimulationTime;
+  private long totalRuntime;
+  private final String commandLineArgs;
+  private long startTime;
+  private long endTime;
+  private long simulationStartTime;
+  private String inputTraceLocation;
+  private String inputTraceSignature;
+  private String jobSubmissionPolicy;
+  private String resolver;
+  private DataStatistics dataStats;
+  private String expectedDataSize;
+  
+  /**
+   * Basic constructor initialized with the runtime arguments. 
+   */
+  ExecutionSummarizer(String[] args) {
+    startTime = System.currentTimeMillis();
+    // flatten the args string and store it
+    commandLineArgs = 
+      org.apache.commons.lang.StringUtils.join(args, ' '); 
+  }
+  
+  /**
+   * Default constructor. 
+   */
+  ExecutionSummarizer() {
+    startTime = System.currentTimeMillis();
+    commandLineArgs = Summarizer.NA; 
+  }
+  
+  void start(Configuration conf) {
+    simulationStartTime = System.currentTimeMillis();
+  }
+  
+  private void processJobState(JobStats stats) throws Exception {
+    Job job = stats.getJob();
+    if (job.isSuccessful()) {
+      ++totalSuccessfulJobs;
+    } else {
+      ++totalFailedJobs;
+    }
+  }
+  
+  private void processJobTasks(JobStats stats) throws Exception {
+    totalMapTasksLaunched += stats.getNoOfMaps();
+    Job job = stats.getJob();
+    totalReduceTasksLaunched += job.getNumReduceTasks();
+  }
+  
+  private void process(JobStats stats) {
+    try {
+      // process the job run state
+      processJobState(stats);
+      
+      // process the tasks information
+      processJobTasks(stats);
+    } catch (Exception e) {
+      LOG.info("Error in processing job " + stats.getJob().getJobID() + ".");
+    }
+  }
+  
+  @Override
+  public void update(JobStats item) {
+    // process only if the simulation has started
+    if (simulationStartTime > 0) {
+      process(item);
+      totalSimulationTime = 
+        System.currentTimeMillis() - getSimulationStartTime();
+    }
+  }
+  
+  // Generates a signature for the trace file based on
+  //   - filename
+  //   - modification time
+  //   - file length
+  //   - owner
+  protected static String getTraceSignature(String input) throws IOException {
+    Path inputPath = new Path(input);
+    FileSystem fs = inputPath.getFileSystem(new Configuration());
+    FileStatus status = fs.getFileStatus(inputPath);
+    Path qPath = fs.makeQualified(status.getPath());
+    String traceID = status.getModificationTime() + qPath.toString()
+                     + status.getOwner() + status.getLen();
+    return MD5Hash.digest(traceID).toString();
+  }
+  
+  @SuppressWarnings("unchecked")
+  void finalize(JobFactory factory, String inputPath, long dataSize, 
+                UserResolver userResolver, DataStatistics stats,
+                Configuration conf) 
+  throws IOException {
+    numJobsInInputTrace = factory.numJobsInTrace;
+    endTime = System.currentTimeMillis();
+    Path inputTracePath = new Path(inputPath);
+    FileSystem fs = inputTracePath.getFileSystem(conf);
+    inputTraceLocation = fs.makeQualified(inputTracePath).toString();
+    inputTraceSignature = getTraceSignature(inputTraceLocation);
+    jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
+    resolver = userResolver.getClass().getName();
+    if (dataSize > 0) {
+      expectedDataSize = StringUtils.humanReadableInt(dataSize);
+    } else {
+      expectedDataSize = Summarizer.NA;
+    }
+    dataStats = stats;
+    totalRuntime = System.currentTimeMillis() - getStartTime();
+  }
+  
+  /**
+   * Summarizes the current {@link Gridmix} run.
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Execution Summary:-");
+    builder.append("\nInput trace: ").append(getInputTraceLocation());
+    builder.append("\nInput trace signature: ")
+           .append(getInputTraceSignature());
+    builder.append("\nTotal number of jobs in trace: ")
+           .append(getNumJobsInTrace());
+    builder.append("\nExpected input data size: ")
+           .append(getExpectedDataSize());
+    builder.append("\nInput data statistics: ")
+           .append(getInputDataStatistics());
+    builder.append("\nTotal number of jobs processed: ")
+           .append(getNumSubmittedJobs());
+    builder.append("\nTotal number of successful jobs: ")
+           .append(getNumSuccessfulJobs());
+    builder.append("\nTotal number of failed jobs: ")
+           .append(getNumFailedJobs());
+    builder.append("\nTotal number of map tasks launched: ")
+           .append(getNumMapTasksLaunched());
+    builder.append("\nTotal number of reduce task launched: ")
+           .append(getNumReduceTasksLaunched());
+    builder.append("\nGridmix start time: ")
+           .append(UTIL.format(getStartTime()));
+    builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime()));
+    builder.append("\nGridmix simulation start time: ")
+           .append(UTIL.format(getStartTime()));
+    builder.append("\nGridmix runtime: ")
+           .append(StringUtils.formatTime(getRuntime()));
+    builder.append("\nTime spent in initialization (data-gen etc): ")
+           .append(StringUtils.formatTime(getInitTime()));
+    builder.append("\nTime spent in simulation: ")
+           .append(StringUtils.formatTime(getSimulationTime()));
+    builder.append("\nGridmix configuration parameters: ")
+           .append(getCommandLineArgsString());
+    builder.append("\nGridmix job submission policy: ")
+           .append(getJobSubmissionPolicy());
+    builder.append("\nGridmix resolver: ").append(getUserResolver());
+    builder.append("\n\n");
+    return builder.toString();
+  }
+  
+  // Gets the stringified version of DataStatistics
+  static String stringifyDataStatistics(DataStatistics stats) {
+    if (stats != null) {
+      StringBuffer buffer = new StringBuffer();
+      String compressionStatus = stats.isDataCompressed() 
+                                 ? "Compressed" 
+                                 : "Uncompressed";
+      buffer.append(compressionStatus).append(" input data size: ");
+      buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
+      buffer.append(", ");
+      buffer.append("Number of files: ").append(stats.getNumFiles());
+
+      return buffer.toString();
+    } else {
+      return Summarizer.NA;
+    }
+  }
+  
+  // Getters
+  protected String getExpectedDataSize() {
+    return expectedDataSize;
+  }
+  
+  protected String getUserResolver() {
+    return resolver;
+  }
+  
+  protected String getInputDataStatistics() {
+    return stringifyDataStatistics(dataStats);
+  }
+  
+  protected String getInputTraceSignature() {
+    return inputTraceSignature;
+  }
+  
+  protected String getInputTraceLocation() {
+    return inputTraceLocation;
+  }
+  
+  protected int getNumJobsInTrace() {
+    return numJobsInInputTrace;
+  }
+  
+  protected int getNumSuccessfulJobs() {
+    return totalSuccessfulJobs;
+  }
+  
+  protected int getNumFailedJobs() {
+    return totalFailedJobs;
+  }
+  
+  protected int getNumSubmittedJobs() {
+    return totalSuccessfulJobs + totalFailedJobs;
+  }
+  
+  protected int getNumMapTasksLaunched() {
+    return totalMapTasksLaunched;
+  }
+  
+  protected int getNumReduceTasksLaunched() {
+    return totalReduceTasksLaunched;
+  }
+  
+  protected long getStartTime() {
+    return startTime;
+  }
+  
+  protected long getEndTime() {
+    return endTime;
+  }
+  
+  protected long getInitTime() {
+    return simulationStartTime - startTime;
+  }
+  
+  protected long getSimulationStartTime() {
+    return simulationStartTime;
+  }
+  
+  protected long getSimulationTime() {
+    return totalSimulationTime;
+  }
+  
+  protected long getRuntime() {
+    return totalRuntime;
+  }
+  
+  protected String getCommandLineArgsString() {
+    return commandLineArgs;
+  }
+  
+  protected String getJobSubmissionPolicy() {
+    return jobSubmissionPolicy;
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Tue Oct 18 14:45:48 2011
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 
@@ -34,7 +32,7 @@ class FileQueue extends InputStream {
 
   private int idx = -1;
   private long curlen = -1L;
-  private FSDataInputStream input;
+  private InputStream input;
   private final byte[] z = new byte[1];
   private final Path[] paths;
   private final long[] lengths;
@@ -64,9 +62,9 @@ class FileQueue extends InputStream {
     idx = (idx + 1) % paths.length;
     curlen = lengths[idx];
     final Path file = paths[idx];
-    final FileSystem fs = file.getFileSystem(conf);
-    input = fs.open(file);
-    input.seek(startoffset[idx]);
+    input = 
+      CompressionEmulationUtil.getPossiblyDecompressedInputStream(file, 
+                                 conf, startoffset[idx]);
   }
 
   @Override