You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ra...@apache.org on 2011/05/23 16:22:36 UTC

svn commit: r1126499 [1/2] - in /hadoop/mapreduce/trunk/src: contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ docs/src/documentation/content/xdocs/

Author: ravigummadi
Date: Mon May 23 14:22:36 2011
New Revision: 1126499

URL: http://svn.apache.org/viewvc?rev=1126499&view=rev
Log:
MAPREDUCE-2407. Make GridMix emulate usage of distributed cache files in simulated jobs.

Added:
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java
Modified:
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/gridmix.xml

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1126499&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Mon May 23 14:22:36 2011
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Emulation of Distributed Cache Usage in gridmix.
+ * <br> Emulation of Distributed Cache Load in gridmix will put load on
+ * TaskTrackers and affects execution time of tasks because of localization of
+ * distributed cache files by TaskTrackers.
+ * <br> Gridmix creates distributed cache files for simulated jobs by launching
+ * a MapReduce job {@link GenerateDistCacheData} in advance i.e. before
+ * launching simulated jobs.
+ * <br> The distributed cache file paths used in the original cluster are mapped
+ * to unique file names in the simulated cluster.
+ * <br> All HDFS-based distributed cache files generated by gridmix are
+ * public distributed cache files. But Gridmix makes sure that load incurred due
+ * to localization of private distributed cache files on the original cluster
+ * is also faithfully simulated. Gridmix emulates the load due to private
+ * distributed cache files by mapping private distributed cache files of
+ * different users in the original cluster to different public distributed cache
+ * files in the simulated cluster.
+ *
+ * <br> The configuration properties like
+ * {@link MRJobConfig#CACHE_FILES}, {@link MRJobConfig#CACHE_FILE_VISIBILITIES},
+ * {@link MRJobConfig#CACHE_FILES_SIZES} and
+ * {@link MRJobConfig#CACHE_FILE_TIMESTAMPS} obtained from trace are used to
+ *  decide
+ * <li> file size of each distributed cache file to be generated
+ * <li> whether a distributed cache file is already seen in this trace file
+ * <li> whether a distributed cache file was considered public or private.
+ * <br>
+ * <br> Gridmix configures these generated files as distributed cache files for
+ * the simulated jobs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DistributedCacheEmulator {
+  private static final Log LOG =
+      LogFactory.getLog(DistributedCacheEmulator.class);
+
+  static final long AVG_BYTES_PER_MAP = 128 * 1024 * 1024L;// 128MB
+
+  // If at least 1 distributed cache file is missing in the expected
+  // distributed cache dir, Gridmix cannot proceed with emulation of
+  // distributed cache load.
+  int MISSING_DIST_CACHE_FILES_ERROR = 1;
+
+  private Path distCachePath;
+
+  /**
+   * Map between simulated cluster's distributed cache file paths and their
+   * file sizes. Unique distributed cache files are entered into this map.
+   * 2 distributed cache files are considered same if and only if their
+   * file paths, visibilities and timestamps are same.
+   */
+  private Map<String, Long> distCacheFiles = new HashMap<String, Long>();
+
+  /**
+   * Configuration property for whether gridmix should emulate
+   * distributed cache usage or not. Default value is true.
+   */
+  static final String GRIDMIX_EMULATE_DISTRIBUTEDCACHE =
+      "gridmix.distributed-cache-emulation.enable";
+
+  // Whether to emulate distributed cache usage or not
+  boolean emulateDistributedCache = true;
+
+  // Whether to generate distributed cache data or not
+  boolean generateDistCacheData = false;
+
+  Configuration conf; // gridmix configuration
+
+  // Pseudo local file system where local FS based distributed cache files are
+  // created by gridmix.
+  FileSystem pseudoLocalFs = null;
+
+  {
+    // Need to handle deprecation of these MapReduce-internal configuration
+    // properties as MapReduce doesn't handle their deprecation.
+    Configuration.addDeprecation("mapred.cache.files.filesizes",
+        new String[] {MRJobConfig.CACHE_FILES_SIZES});
+    Configuration.addDeprecation("mapred.cache.files.visibilities",
+        new String[] {MRJobConfig.CACHE_FILE_VISIBILITIES});
+  }
+
+  /**
+   * @param conf gridmix configuration
+   * @param ioPath &lt;ioPath&gt;/distributedCache/ is the gridmix Distributed
+   *               Cache directory
+   */
+  public DistributedCacheEmulator(Configuration conf, Path ioPath) {
+    this.conf = conf;
+    distCachePath = new Path(ioPath, "distributedCache");
+    this.conf.setClass("fs.pseudo.impl", PseudoLocalFs.class, FileSystem.class);
+  }
+
+  /**
+   * This is to be called before any other method of DistributedCacheEmulator.
+   * <br> Checks if emulation of distributed cache load is needed and is feasible.
+   *  Sets the flags generateDistCacheData and emulateDistributedCache to the
+   *  appropriate values.
+   * <br> Gridmix does not emulate distributed cache load if
+   * <ol><li> the specific gridmix job type doesn't need emulation of
+   * distributed cache load OR
+   * <li> the trace is coming from a stream instead of file OR
+   * <li> the distributed cache dir where distributed cache data is to be
+   * generated by gridmix is on local file system OR
+   * <li> execute permission is not there for any of the ascendant directories
+   * of &lt;ioPath&gt; till root. This is because for emulation of distributed
+   * cache load, distributed cache files created under
+   * &lt;ioPath/distributedCache/public/&gt; should be considered by hadoop
+   * as public distributed cache files.
+   * <li> creation of pseudo local file system fails.</ol>
+   * <br> For (2), (3), (4) and (5), generation of distributed cache data
+   * is also disabled.
+   * 
+   * @param traceIn trace file path. If this is '-', then trace comes from the
+   *                stream stdin.
+   * @param jobCreator job creator of gridmix jobs of a specific type
+   * @param generate  true if -generate option was specified
+   * @throws IOException
+   */
+  void init(String traceIn, JobCreator jobCreator, boolean generate)
+      throws IOException {
+    emulateDistributedCache = jobCreator.canEmulateDistCacheLoad()
+        && conf.getBoolean(GRIDMIX_EMULATE_DISTRIBUTEDCACHE, true);
+    generateDistCacheData = generate;
+
+    if (generateDistCacheData || emulateDistributedCache) {
+      if ("-".equals(traceIn)) {// trace is from stdin
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "the input trace source is a stream instead of file.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else if (FileSystem.getLocal(conf).getUri().getScheme().equals(
+          distCachePath.toUri().getScheme())) {// local FS
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "<iopath> provided is on local file system.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else {
+        // Check if execute permission is there for all the ascendant
+        // directories of distCachePath till root.
+        FileSystem fs = FileSystem.get(conf);
+        Path cur = distCachePath.getParent();
+        while (cur != null) {
+          if (cur.toString().length() > 0) {
+            FsPermission perm = fs.getFileStatus(cur).getPermission();
+            if (!perm.getOtherAction().and(FsAction.EXECUTE).equals(
+                FsAction.EXECUTE)) {
+              LOG.warn("Gridmix will not emulate Distributed Cache load "
+                  + "because the ascendant directory (of distributed cache "
+                  + "directory) " + cur + " doesn't have execute permission "
+                  + "for others.");
+              emulateDistributedCache = generateDistCacheData = false;
+              break;
+            }
+          }
+          cur = cur.getParent();
+        }
+      }
+    }
+
+    // Check if pseudo local file system can be created
+    try {
+      pseudoLocalFs = FileSystem.get(new URI("pseudo:///"), conf);
+    } catch (URISyntaxException e) {
+      LOG.warn("Gridmix will not emulate Distributed Cache load because "
+          + "creation of pseudo local file system failed.");
+      e.printStackTrace();
+      emulateDistributedCache = generateDistCacheData = false;
+      return;
+    }
+  }
+
+  /**
+   * @return true if gridmix should emulate distributed cache load
+   */
+  boolean shouldEmulateDistCacheLoad() {
+    return emulateDistributedCache;
+  }
+
+  /**
+   * @return true if gridmix should generate distributed cache data
+   */
+  boolean shouldGenerateDistCacheData() {
+    return generateDistCacheData;
+  }
+
+  /**
+   * @return the distributed cache directory path
+   */
+  Path getDistributedCacheDir() {
+    return distCachePath;
+  }
+
+  /**
+   * Create distributed cache directories.
+   * Also create a file that contains the list of distributed cache files
+   * that will be used as distributed cache files for all the simulated jobs.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  int setupGenerateDistCacheData(JobStoryProducer jsp)
+      throws IOException {
+
+    createDistCacheDirectory();
+    return buildDistCacheFilesList(jsp);
+  }
+
+  /**
+   * Create distributed cache directory where distributed cache files will be
+   * created by the MapReduce job {@link GenerateDistCacheData#JOB_NAME}.
+   * @throws IOException
+   */
+  private void createDistCacheDirectory() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem.mkdirs(fs, distCachePath, new FsPermission((short) 0777));
+  }
+
+  /**
+   * Create the list of unique distributed cache files needed for all the
+   * simulated jobs and write the list to a special file.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  private int buildDistCacheFilesList(JobStoryProducer jsp) throws IOException {
+    // Read all the jobs from the trace file and build the list of unique
+    // distributed cache files.
+    JobStory jobStory;
+    while ((jobStory = jsp.getNextJob()) != null) {
+      if (jobStory.getOutcome() == Pre21JobHistoryConstants.Values.SUCCESS && 
+         jobStory.getSubmissionTime() >= 0) {
+        updateHDFSDistCacheFilesList(jobStory);
+      }
+    }
+    jsp.close();
+
+    return writeDistCacheFilesList();
+  }
+
+  /**
+   * For the job to be simulated, identify the needed distributed cache files by
+   * mapping original cluster's distributed cache file paths to the simulated cluster's
+   * paths and add these paths in the map {@code distCacheFiles}.
+   *<br>
+   * JobStory should contain distributed cache related properties like
+   * <li> {@link MRJobConfig#CACHE_FILES}
+   * <li> {@link MRJobConfig#CACHE_FILE_VISIBILITIES}
+   * <li> {@link MRJobConfig#CACHE_FILES_SIZES}
+   * <li> {@link MRJobConfig#CACHE_FILE_TIMESTAMPS}
+   * <li> {@link MRJobConfig#CLASSPATH_FILES}
+   *
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_VISIBILITIES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_SIZES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_TIMESTAMPS}
+   * <li> {@link MRJobConfig#CLASSPATH_ARCHIVES}
+   *
+   * <li> {@link MRJobConfig#CACHE_SYMLINK}
+   *
+   * @param jobdesc JobStory of original job obtained from trace
+   * @throws IOException
+   */
+  void updateHDFSDistCacheFilesList(JobStory jobdesc) throws IOException {
+
+    // Map original job's distributed cache file paths to simulated cluster's
+    // paths, to be used by this simulated job.
+    JobConf jobConf = jobdesc.getJobConf();
+
+    String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+    if (files != null) {
+
+      String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+      String[] visibilities =
+        jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+      String[] timeStamps =
+        jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+
+      FileSystem fs = FileSystem.get(conf);
+      String user = jobConf.getUser();
+      for (int i = 0; i < files.length; i++) {
+        // Check if visibilities are available because older hadoop versions
+        // didn't have public, private Distributed Caches separately.
+        boolean visibility =
+            (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+        if (isLocalDistCacheFile(files[i], user, visibility)) {
+          // local FS based distributed cache file.
+          // Create this file on the pseudo local FS on the fly (i.e. when the
+          // simulated job is submitted).
+          continue;
+        }
+        // distributed cache file on hdfs
+        String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                                                 visibility, user);
+
+        // No need to add a distributed cache file path to the list if
+        // (1) the mapped path is already there in the list OR
+        // (2) the file with the mapped path already exists.
+        // In any of the above 2 cases, file paths, timestamps, file sizes and
+        // visibilities match. File sizes should match if file paths and
+        // timestamps match because single file path with single timestamp
+        // should correspond to a single file size.
+        if (distCacheFiles.containsKey(mappedPath) ||
+            fs.exists(new Path(mappedPath))) {
+          continue;
+        }
+        distCacheFiles.put(mappedPath, Long.valueOf(fileSizes[i]));
+      }
+    }
+  }
+
+  /**
+   * Check if the file path provided was constructed by MapReduce for a
+   * distributed cache file on local file system.
+   * @param filePath path of the distributed cache file
+   * @param user job submitter of the job for which &lt;filePath&gt; is a
+   *             distributed cache file
+   * @param visibility <code>true</code> for public distributed cache file
+   * @return true if the path provided is of a local file system based
+   *              distributed cache file
+   */
+  private boolean isLocalDistCacheFile(String filePath, String user,
+                                       boolean visibility) {
+    return (!visibility && filePath.contains(user + "/.staging"));
+  }
+
+  /**
+   * Map the HDFS based distributed cache file path from original cluster to
+   * a unique file name on the simulated cluster.
+   * <br> Unique  distributed file names on simulated cluster are generated
+   * using original cluster's <li>file path, <li>timestamp and <li> the
+   * job-submitter for private distributed cache file.
+   * <br> This implies that if on original cluster, a single HDFS file
+   * considered as two private distributed cache files for two jobs of
+   * different users, then the corresponding simulated jobs will have two
+   * different files of the same size in public distributed cache, one for each
+   * user. Both these simulated jobs will not share these distributed cache
+   * files, thus leading to the same load as seen in the original cluster.
+   * @param file distributed cache file path
+   * @param timeStamp time stamp of dist cachce file
+   * @param isPublic true if this distributed cache file is a public
+   *                 distributed cache file
+   * @param user job submitter on original cluster
+   * @return the mapped path on simulated cluster
+   */
+  private String mapDistCacheFilePath(String file, String timeStamp,
+      boolean isPublic, String user) {
+    String id = file + timeStamp;
+    if (!isPublic) {
+      // consider job-submitter for private distributed cache file
+      id = id.concat(user);
+    }
+    return new Path(distCachePath, MD5Hash.digest(id).toString()).toUri()
+               .getPath();
+  }
+
+  /**
+   * Write the list of distributed cache files in the decreasing order of
+   * file sizes into the sequence file. This file will be input to the job
+   * {@link GenerateDistCacheData}.
+   * Also validates if -generate option is missing and distributed cache files
+   * are missing.
+   * @return exit code
+   * @throws IOException
+   */
+  private int writeDistCacheFilesList()
+      throws IOException {
+    // Sort the distributed cache files in the decreasing order of file sizes.
+    List dcFiles = new ArrayList(distCacheFiles.entrySet());
+    Collections.sort(dcFiles, new Comparator() {
+      public int compare(Object dc1, Object dc2) {
+        return ((Comparable) ((Map.Entry) (dc2)).getValue())
+            .compareTo(((Map.Entry) (dc1)).getValue());
+      }
+    });
+
+    // write the sorted distributed cache files to the sequence file
+    FileSystem fs = FileSystem.get(conf);
+    Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
+    conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
+        distCacheFilesList.toString());
+    SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
+        distCacheFilesList, LongWritable.class, BytesWritable.class,
+        SequenceFile.CompressionType.NONE);
+
+    // Total number of unique distributed cache files
+    int fileCount = dcFiles.size();
+    long byteCount = 0;// Total size of all distributed cache files
+    long bytesSync = 0;// Bytes after previous sync;used to add sync marker
+
+    for (Iterator it = dcFiles.iterator(); it.hasNext();) {
+      Map.Entry entry = (Map.Entry)it.next();
+      LongWritable fileSize =
+          new LongWritable(Long.valueOf(entry.getValue().toString()));
+      BytesWritable filePath =
+          new BytesWritable(entry.getKey().toString().getBytes());
+
+      byteCount += fileSize.get();
+      bytesSync += fileSize.get();
+      if (bytesSync > AVG_BYTES_PER_MAP) {
+        src_writer.sync();
+        bytesSync = fileSize.get();
+      }
+      src_writer.append(fileSize, filePath);
+    }
+    if (src_writer != null) {
+      src_writer.close();
+    }
+    // Set delete on exit for 'dist cache files list' as it is not needed later.
+    fs.deleteOnExit(distCacheFilesList);
+
+    conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
+    conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
+    LOG.info("Number of HDFS based distributed cache files to be generated is"
+        + fileCount + ". Total size of HDFS based distributed cache files "
+        + "to be generated is " + byteCount);
+
+    if (!shouldGenerateDistCacheData() && fileCount > 0) {
+      LOG.error("Missing " + fileCount + " distributed cache files under the "
+          + " directory\n" + distCachePath + "\nthat are needed for gridmix"
+          + " to emulate distributed cache load. Either use -generate\noption"
+          + " to generate distributed cache data along with input data OR "
+          + "disable\ndistributed cache emulation by configuring '"
+          + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+          + "' to false.");
+      return MISSING_DIST_CACHE_FILES_ERROR;
+    }
+    return 0;
+  }
+
+  /**
+   * If gridmix needs to emulate distributed cache load, then configure
+   * distributed cache files of a simulated job by mapping the original
+   * cluster's distributed cache file paths to the simulated cluster's paths and
+   * setting these mapped paths in the job configuration of the simulated job.
+   * <br>
+   * Configure local FS based distributed cache files through the property
+   * "tmpfiles" and hdfs based distributed cache files through the property
+   * {@link MRJobConfig#CACHE_FILES}.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  void configureDistCacheFiles(Configuration conf, JobConf jobConf)
+      throws IOException {
+    if (shouldEmulateDistCacheLoad()) {
+
+      String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+      if (files != null) {
+        // hdfs based distributed cache files to be configured for simulated job
+        List<String> cacheFiles = new ArrayList<String>();
+        // local FS based distributed cache files to be configured for
+        // simulated job
+        List<String> localCacheFiles = new ArrayList<String>();
+
+        String[] visibilities =
+          jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+        String[] timeStamps =
+          jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+        String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+
+        String user = jobConf.getUser();
+        for (int i = 0; i < files.length; i++) {
+          // Check if visibilities are available because older hadoop versions
+          // didn't have public, private Distributed Caches separately.
+          boolean visibility =
+            (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+          if (isLocalDistCacheFile(files[i], user, visibility)) {
+            // local FS based distributed cache file.
+            // Create this file on the pseudo local FS.
+            String fileId = MD5Hash.digest(files[i] + timeStamps[i]).toString();
+            long fileSize = Long.valueOf(fileSizes[i]);
+            Path mappedLocalFilePath =
+                PseudoLocalFs.generateFilePath(fileId, fileSize)
+                    .makeQualified(pseudoLocalFs.getUri(),
+                                   pseudoLocalFs.getWorkingDirectory());
+            pseudoLocalFs.create(mappedLocalFilePath);
+            localCacheFiles.add(mappedLocalFilePath.toUri().toString());
+          } else {
+            // hdfs based distributed cache file.
+            // Get the mapped HDFS path on simulated cluster
+            String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                                                     visibility, user);
+            cacheFiles.add(mappedPath);
+          }
+        }
+        if (cacheFiles.size() > 0) {
+          // configure hdfs based distributed cache files for simulated job
+          conf.setStrings(MRJobConfig.CACHE_FILES,
+                          cacheFiles.toArray(new String[cacheFiles.size()]));
+        }
+        if (localCacheFiles.size() > 0) {
+          // configure local FS based distributed cache files for simulated job
+          conf.setStrings("tmpfiles", localCacheFiles.toArray(
+                                        new String[localCacheFiles.size()]));
+        }
+      }
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Mon May 23 14:22:36 2011
@@ -85,10 +85,11 @@ class GenerateData extends GridmixJob {
    * Replication of generated data.
    */
   public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_INPUT_DATA";
 
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
-    super(conf, 0L, "GRIDMIX_GENDATA");
+    super(conf, 0L, JOB_NAME);
     job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
     FileOutputFormat.setOutputPath(job, outdir);
   }

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java?rev=1126499&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java Mon May 23 14:22:36 2011
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * GridmixJob that generates distributed cache files.
+ * {@link GenerateDistCacheData} expects a list of distributed cache files to be
+ * generated as input. This list is expected to be stored as a sequence file
+ * and the filename is expected to be configured using
+ * {@code gridmix.distcache.file.list}.
+ * This input file contains the list of distributed cache files and their sizes.
+ * For each record (i.e. file size and file path) in this input file,
+ * a file with the specific file size at the specific path is created.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class GenerateDistCacheData extends GridmixJob {
+
+  /**
+   * Number of distributed cache files to be created by gridmix
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_COUNT =
+      "gridmix.distcache.file.count";
+  /**
+   * Total number of bytes to be written to the distributed cache files by
+   * gridmix. i.e. Sum of sizes of all unique distributed cache files to be
+   * created by gridmix.
+   */
+  static final String GRIDMIX_DISTCACHE_BYTE_COUNT =
+      "gridmix.distcache.byte.count";
+  /**
+   * The special file created(and used) by gridmix, that contains the list of
+   * unique distributed cache files that are to be created and their sizes.
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_LIST =
+      "gridmix.distcache.file.list";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_DISTCACHE_DATA";
+
+  public GenerateDistCacheData(Configuration conf) throws IOException {
+    super(conf, 0L, JOB_NAME);
+  }
+
+  @Override
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    ugi.doAs( new PrivilegedExceptionAction <Job>() {
+       public Job run() throws IOException, ClassNotFoundException,
+                               InterruptedException {
+        job.setMapperClass(GenDCDataMapper.class);
+        job.setNumReduceTasks(0);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(GenDCDataFormat.class);
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setJarByClass(GenerateDistCacheData.class);
+        try {
+          FileInputFormat.addInputPath(job, new Path("ignored"));
+        } catch (IOException e) {
+          LOG.error("Error while adding input path ", e);
+        }
+        job.submit();
+        return job;
+      }
+    });
+    return job;
+  }
+
+  public static class GenDCDataMapper
+      extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
+
+    private BytesWritable val;
+    private final Random r = new Random();
+    private FileSystem fs;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      val = new BytesWritable(new byte[context.getConfiguration().getInt(
+              GenerateData.GRIDMIX_VAL_BYTES, 1024 * 1024)]);
+      fs = FileSystem.get(context.getConfiguration());
+    }
+
+    // Create one distributed cache file with the needed file size.
+    // key is distributed cache file size and
+    // value is distributed cache file path.
+    @Override
+    public void map(LongWritable key, BytesWritable value, Context context)
+        throws IOException, InterruptedException {
+
+      String fileName = new String(value.getBytes(), 0, value.getLength());
+      Path path = new Path(fileName);
+
+      /**
+       * Create distributed cache file with the permissions 0755.
+       * Since the private distributed cache directory doesn't have execute
+       * permission for others, it is OK to set read permission for others for
+       * the files under that directory and still they will become 'private'
+       * distributed cache files on the simulated cluster.
+       */
+      FSDataOutputStream dos =
+          FileSystem.create(fs, path, new FsPermission((short)0755));
+
+      for (long bytes = key.get(); bytes > 0; bytes -= val.getLength()) {
+        r.nextBytes(val.getBytes());
+        val.setSize((int)Math.min(val.getLength(), bytes));
+        dos.write(val.getBytes(), 0, val.getLength());// Write to distCache file
+      }
+      dos.close();
+    }
+  }
+
+  /**
+   * InputFormat for GenerateDistCacheData.
+   * Input to GenerateDistCacheData is the special file(in SequenceFile format)
+   * that contains the list of distributed cache files to be generated along
+   * with their file sizes.
+   */
+  static class GenDCDataFormat
+      extends InputFormat<LongWritable, BytesWritable> {
+
+    // Split the special file that contains the list of distributed cache file
+    // paths and their file sizes such that each split corresponds to
+    // approximately same amount of distributed cache data to be generated.
+    // Consider numTaskTrackers * numMapSlotsPerTracker as the number of maps
+    // for this job, if there is lot of data to be generated.
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
+      final JobClient client = new JobClient(jobConf);
+      ClusterStatus stat = client.getClusterStatus(true);
+      int numTrackers = stat.getTaskTrackers();
+      final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+
+      // Total size of distributed cache files to be generated
+      final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
+      // Get the path of the special file
+      String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
+      if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
+        throw new RuntimeException("Invalid metadata: #files (" + fileCount
+            + "), total_size (" + totalSize + "), filelisturi ("
+            + distCacheFileList + ")");
+      }
+
+      Path sequenceFile = new Path(distCacheFileList);
+      FileSystem fs = sequenceFile.getFileSystem(jobConf);
+      FileStatus srcst = fs.getFileStatus(sequenceFile);
+      // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
+      int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
+      int numSplits = numTrackers * numMapSlotsPerTracker;
+
+      List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+      LongWritable key = new LongWritable();
+      BytesWritable value = new BytesWritable();
+
+      // Average size of data to be generated by each map task
+      final long targetSize = Math.max(totalSize / numSplits,
+                                DistributedCacheEmulator.AVG_BYTES_PER_MAP);
+      long splitStartPosition = 0L;
+      long splitEndPosition = 0L;
+      long acc = 0L;
+      long bytesRemaining = srcst.getLen();
+      SequenceFile.Reader reader = null;
+      try {
+        reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+        while (reader.next(key, value)) {
+
+          // If adding this file would put this split past the target size,
+          // cut the last split and put this file in the next split.
+          if (acc + key.get() > targetSize && acc != 0) {
+            long splitSize = splitEndPosition - splitStartPosition;
+            splits.add(new FileSplit(
+                sequenceFile, splitStartPosition, splitSize, (String[])null));
+            bytesRemaining -= splitSize;
+            splitStartPosition = splitEndPosition;
+            acc = 0L;
+          }
+          acc += key.get();
+          splitEndPosition = reader.getPosition();
+        }
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+      if (bytesRemaining != 0) {
+        splits.add(new FileSplit(
+            sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
+      }
+
+      return splits;
+    }
+
+    /**
+     * Returns a reader for this split of the distributed cache file list.
+     */
+    @Override
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException, InterruptedException {
+      return new SequenceFileRecordReader<LongWritable, BytesWritable>();
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Mon May 23 14:22:36 2011
@@ -92,6 +92,8 @@ public class Gridmix extends Configured 
    */
   public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
 
+  private DistributedCacheEmulator distCacheEmulator;
+
   // Submit data structures
   private JobFactory factory;
   private JobSubmitter submitter;
@@ -102,36 +104,61 @@ public class Gridmix extends Configured 
   private final Shutdown sdh = new Shutdown();
 
   /**
-   * Write random bytes at the path provided.
+   * Write random bytes at the path &lt;ioPath&gt;/input/
    * @see org.apache.hadoop.mapred.gridmix.GenerateData
    */
   protected void writeInputData(long genbytes, Path ioPath)
       throws IOException, InterruptedException {
+    Path inputDir = new Path(ioPath, "input");
     final Configuration conf = getConf();
-    final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
-    submitter.add(genData);
+    final GridmixJob genData = new GenerateData(conf, inputDir, genbytes);
     LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
         " of test data...");
+    launchGridmixJob(genData);
+    
+    FsShell shell = new FsShell(conf);
+    try {
+      LOG.info("Changing the permissions for inputPath " + inputDir.toString());
+      shell.run(new String[] {"-chmod","-R","777", inputDir.toString()});
+    } catch (Exception e) {
+      LOG.error("Couldnt change the file permissions " , e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Write random bytes in the distributed cache files that will be used by all
+   * simulated jobs of current gridmix run, if files are to be generated.
+   * Do this as part of the MapReduce job {@link GenerateDistCacheData#JOB_NAME}
+   * @see org.apache.hadoop.mapred.gridmix.GenerateDistCacheData
+   */
+  protected void writeDistCacheData(Configuration conf)
+      throws IOException, InterruptedException {
+    int fileCount =
+        conf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+    if (fileCount > 0) {// generate distributed cache files
+      final GridmixJob genDistCacheData = new GenerateDistCacheData(conf);
+      LOG.info("Generating distributed cache data of size " + conf.getLong(
+          GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+      launchGridmixJob(genDistCacheData);
+    }
+  }
+
+  // Launch Input/DistCache Data Generation job and wait for completion
+  void launchGridmixJob(GridmixJob job)
+      throws IOException, InterruptedException {
+    submitter.add(job);
+
     // TODO add listeners, use for job dependencies
     TimeUnit.SECONDS.sleep(10);
     try {
-      genData.getJob().waitForCompletion(false);
+      job.getJob().waitForCompletion(false);
     } catch (ClassNotFoundException e) {
       throw new IOException("Internal error", e);
     }
-    if (!genData.getJob().isSuccessful()) {
-      throw new IOException("Data generation failed!");
+    if (!job.getJob().isSuccessful()) {
+      throw new IOException(job.getJob().getJobName() + " job failed!");
     }
-
-    FsShell shell = new FsShell(conf);
-    try {
-      LOG.info("Changing the permissions for inputPath " + ioPath.toString());
-      shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
-    } catch (Exception e) {
-      LOG.error("Couldnt change the file permissions " , e);
-      throw new IOException(e);
-    }
-    LOG.info("Done.");
   }
 
   /**
@@ -158,7 +185,9 @@ public class Gridmix extends Configured 
    * @param conf Configuration data, no keys specific to this context
    * @param traceIn Either a Path to the trace data or &quot;-&quot; for
    *                stdin
-   * @param ioPath Path from which input data is read
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir from which input data is
+   *               read and &lt;ioPath&gt;/distributedCache/ is the gridmix
+   *               distributed cache directory.
    * @param scratchDir Path into which job output is written
    * @param startFlag Semaphore for starting job trace pipeline
    */
@@ -166,6 +195,7 @@ public class Gridmix extends Configured 
       Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
       throws IOException {
     try {
+      Path inputDir = new Path(ioPath, "input");
       GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
         conf, GridmixJobSubmissionPolicy.STRESS);
       LOG.info(" Submission policy is " + policy.name());
@@ -179,11 +209,14 @@ public class Gridmix extends Configured 
       int numThreads = conf.getInt(GRIDMIX_SUB_THR, noOfSubmitterThreads);
       int queueDep = conf.getInt(GRIDMIX_QUE_DEP, 5);
       submitter = createJobSubmitter(monitor, numThreads, queueDep,
-                                     new FilePool(conf, ioPath), userResolver, 
+                                     new FilePool(conf, inputDir), userResolver, 
                                      statistics);
-      
+      distCacheEmulator = new DistributedCacheEmulator(conf, ioPath);
+
       factory = createJobFactory(submitter, traceIn, scratchDir, conf, 
                                  startFlag, userResolver);
+      factory.jobCreator.setDistCacheEmulator(distCacheEmulator);
+
       if (policy == GridmixJobSubmissionPolicy.SERIAL) {
         statistics.addJobStatsListeners(factory);
       } else {
@@ -244,6 +277,9 @@ public class Gridmix extends Configured 
       printUsage(System.err);
       return 1;
     }
+    
+    // Should gridmix generate distributed cache data ?
+    boolean generate = false;
     long genbytes = -1L;
     String traceIn = null;
     Path ioPath = null;
@@ -257,6 +293,7 @@ public class Gridmix extends Configured 
       for (int i = 0; i < argv.length - 2; ++i) {
         if ("-generate".equals(argv[i])) {
           genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
+          generate = true;
         } else if ("-users".equals(argv[i])) {
           userRsrc = new URI(argv[++i]);
         } else {
@@ -287,12 +324,33 @@ public class Gridmix extends Configured 
       printUsage(System.err);
       return 1;
     }
-    return start(conf, traceIn, ioPath, genbytes, userResolver);
+    return start(conf, traceIn, ioPath, genbytes, userResolver, generate);
   }
 
+  /**
+   * 
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath Working directory for gridmix. GenerateData job
+   *               will generate data in the directory &lt;ioPath&gt;/input/ and
+   *               distributed cache data is generated in the directory
+   *               &lt;ioPath&gt;/distributedCache/, if -generate option is
+   *               specified.
+   * @param genbytes size of input data to be generated under the directory
+   *                 &lt;ioPath&gt;/input/
+   * @param userResolver gridmix user resolver
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
   int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
-      UserResolver userResolver) throws IOException, InterruptedException {
+      UserResolver userResolver, boolean generate)
+      throws IOException, InterruptedException {
     InputStream trace = null;
+    ioPath = ioPath.makeQualified(ioPath.getFileSystem(conf));
+
     try {
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
 
@@ -310,10 +368,12 @@ public class Gridmix extends Configured 
         // scan input dir contents
         submitter.refreshFilePool();
 
-        // create scratch directory(output path of gridmix)
-        final FileSystem scratchFs = scratchDir.getFileSystem(conf);
-        FileSystem.mkdirs(scratchFs, scratchDir,
-            new FsPermission((short) 0777));
+        // set up the needed things for emulation of various loads
+        int exitCode = setupEmulation(conf, traceIn, scratchDir, ioPath,
+                                      generate);
+        if (exitCode != 0) {
+          return exitCode;
+        }
 
         factory.start();
         statistics.start();
@@ -350,6 +410,64 @@ public class Gridmix extends Configured 
   }
 
   /**
+   * Create gridmix output directory. Setup things for emulation of
+   * various loads, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param scratchDir gridmix output directory
+   * @param ioPath Working directory for gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  private int setupEmulation(Configuration conf, String traceIn,
+      Path scratchDir, Path ioPath, boolean generate)
+      throws IOException, InterruptedException {
+    // create scratch directory(output directory of gridmix)
+    final FileSystem scratchFs = scratchDir.getFileSystem(conf);
+    FileSystem.mkdirs(scratchFs, scratchDir, new FsPermission((short) 0777));
+
+    // Setup things needed for emulation of distributed cache load
+    return setupDistCacheEmulation(conf, traceIn, ioPath, generate);
+    // Setup emulation of other loads like CPU load, Memory load
+  }
+
+  /**
+   * Setup gridmix for emulation of distributed cache load. This includes
+   * generation of distributed cache files, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir where input data (a) exists
+   *               or (b) is generated. &lt;ioPath&gt;/distributedCache/ is the
+   *               folder where distributed cache data (a) exists or (b) is to be
+   *               generated by gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private int setupDistCacheEmulation(Configuration conf, String traceIn,
+      Path ioPath, boolean generate) throws IOException, InterruptedException {
+    distCacheEmulator.init(traceIn, factory.jobCreator, generate);
+    int exitCode = 0;
+    if (distCacheEmulator.shouldGenerateDistCacheData() ||
+        distCacheEmulator.shouldEmulateDistCacheLoad()) {
+
+      JobStoryProducer jsp = createJobStoryProducer(traceIn, conf);
+      exitCode = distCacheEmulator.setupGenerateDistCacheData(jsp);
+      if (exitCode == 0) {
+        // If there are files to be generated, run a MapReduce job to generate
+        // these distributed cache files of all the simulated jobs of this trace.
+        writeDistCacheData(conf);
+      }
+    }
+    return exitCode;
+  }
+
+  /**
    * Handles orderly shutdown by requesting that each component in the
    * pipeline abort its progress, waiting for each to exit and killing
    * any jobs still running on the cluster.
@@ -446,6 +564,11 @@ public class Gridmix extends Configured 
     ToolRunner.printGenericCommandUsage(out);
     out.println("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>");
     out.println("  e.g. gridmix -generate 100m foo -");
+    out.println("Options:");
+    out.println("   -generate <MiB> : Generate input data of size MiB under "
+        + "<iopath>/input/ and generate\n\t\t     distributed cache data under "
+        + "<iopath>/distributedCache/.");
+    out.println("   -users <usersResourceURI> : URI that contains the users list.");
     out.println("Configuration parameters:");
     out.println("   General parameters:");
     out.printf("       %-48s : Output directory\n", GRIDMIX_OUT_DIR);

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Mon May 23 14:22:36 2011
@@ -18,31 +18,42 @@
 
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public enum JobCreator {
 
   LOADJOB {
     @Override
     public GridmixJob createGridmixJob(
-      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
-      UserGroupInformation ugi, int seq) throws IOException {
+      Configuration gridmixConf, long submissionMillis, JobStory jobdesc,
+      Path outRoot, UserGroupInformation ugi, int seq) throws IOException {
+
+      // Build configuration for this simulated job
+      Configuration conf = new Configuration(gridmixConf);
+      dce.configureDistCacheFiles(conf, jobdesc.getJobConf());
       return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
-    }},
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return true;
+    }
+  },
 
   SLEEPJOB {
     private String[] hosts;
@@ -72,12 +83,30 @@ public enum JobCreator {
       }
       return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
           numLocations, hosts);
-    }};
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return false;
+    }
+  };
 
   public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
   public static final String SLEEPJOB_RANDOM_LOCATIONS = 
     "gridmix.sleep.fake-locations";
 
+  /**
+   * Create Gridmix simulated job.
+   * @param conf configuration of simulated job
+   * @param submissionMillis At what time submission of this simulated job be
+   *                         done
+   * @param jobdesc JobStory obtained from trace
+   * @param outRoot gridmix output directory
+   * @param ugi UGI of job submitter of this simulated job
+   * @param seq job sequence number
+   * @return the created simulated job
+   * @throws IOException
+   */
   public abstract GridmixJob createGridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,
     Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
@@ -86,4 +115,21 @@ public enum JobCreator {
     Configuration conf, JobCreator defaultPolicy) {
     return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
   }
+
+  /**
+   * @return true if gridmix simulated jobs of this job type can emulate
+   *         distributed cache load
+   */
+  abstract boolean canEmulateDistCacheLoad();
+
+  DistributedCacheEmulator dce;
+  /**
+   * This method is to be called before calling any other method in JobCreator
+   * except canEmulateDistCacheLoad(), especially if canEmulateDistCacheLoad()
+   * returns true for that job type.
+   * @param e Distributed Cache Emulator
+   */
+  void setDistCacheEmulator(DistributedCacheEmulator e) {
+    this.dce = e;
+  }
 }

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java?rev=1126499&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java Mon May 23 14:22:36 2011
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Pseudo local file system that generates random data for any file on the fly
+ * instead of storing files on disk. So opening same file multiple times will
+ * not give same file content. There are no directories in this file system
+ * other than the root and all the files are under root i.e. "/". All file URIs
+ * on pseudo local file system should be of the format <code>
+ * pseudo:///&lt;name&gt;.&lt;fileSize&gt;</code> where name is a unique name
+ * and &lt;fileSize&gt; is a number representing the size of the file in bytes.
+ */
+class PseudoLocalFs extends FileSystem {
+  Path home;
+  /**
+   * The creation time and modification time of all files in
+   * {@link PseudoLocalFs} is same.
+   */
+  private static final long TIME = System.currentTimeMillis();
+  private static final String HOME_DIR = "/";
+  private static final long BLOCK_SIZE  = 4 * 1024 * 1024L; // 4 MB
+  private static final int DEFAULT_BUFFER_SIZE = 1024  * 1024; // 1MB
+
+  static final URI NAME = URI.create("pseudo:///");
+
+  PseudoLocalFs() {
+    this(new Path(HOME_DIR));
+  }
+
+  PseudoLocalFs(Path home) {
+    super();
+    this.home = home;
+  }
+
+  @Override
+  public URI getUri() {
+    return NAME;
+  }
+
+  @Override
+  public Path getHomeDirectory() {
+    return home;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return getHomeDirectory();
+  }
+
+  /**
+   * Generates a valid pseudo local file path from the given <code>fileId</code>
+   * and <code>fileSize</code>.
+   * @param fileId unique file id string
+   * @param fileSize file size
+   * @return the generated relative path
+   */
+  static Path generateFilePath(String fileId, long fileSize) {
+    return new Path(fileId + "." + fileSize);
+  }
+
+  /**
+   * Creating a pseudo local file is nothing but validating the file path.
+   * Actual data of the file is generated on the fly when client tries to open
+   * the file for reading.
+   * @param path file path to be created
+   */
+  @Override
+  public FSDataOutputStream create(Path path) throws IOException {
+    try {
+      validateFileNameFormat(path);
+    } catch (FileNotFoundException e) {
+      throw new IOException("File creation failed for " + path);
+    }
+    return null;
+  }
+
+  /**
+   * Validate if the path provided is of expected format of Pseudo Local File
+   * System based files.
+   * @param path file path
+   * @return the file size
+   * @throws FileNotFoundException
+   */
+  long validateFileNameFormat(Path path) throws FileNotFoundException {
+    path = path.makeQualified(this);
+    boolean valid = true;
+    long fileSize = 0;
+    if (!path.toUri().getScheme().equals(getUri().getScheme())) {
+      valid = false;
+    } else {
+      String[] parts = path.toUri().getPath().split("\\.");
+      try {
+        fileSize = Long.valueOf(parts[parts.length - 1]);
+        valid = (fileSize >= 0);
+      } catch (NumberFormatException e) {
+        valid = false;
+      }
+    }
+    if (!valid) {
+      throw new FileNotFoundException("File " + path
+          + " does not exist in pseudo local file system");
+    }
+    return fileSize;
+  }
+
+  /**
+   * @See create(Path) for details
+   */
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    long fileSize = validateFileNameFormat(path);
+    InputStream in = new RandomInputStream(fileSize, bufferSize);
+    return new FSDataInputStream(in);
+  }
+
+  /**
+   * @See create(Path) for details
+   */
+  @Override
+  public FSDataInputStream open(Path path) throws IOException {
+    return open(path, DEFAULT_BUFFER_SIZE);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    long fileSize = validateFileNameFormat(path);
+    return new FileStatus(fileSize, false, 1, BLOCK_SIZE, TIME, path);
+  }
+
+  @Override
+  public boolean exists(Path path) {
+    try{
+      validateFileNameFormat(path);
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return create(path);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws FileNotFoundException,
+      IOException {
+    return new FileStatus[] {getFileStatus(path)};
+  }
+
+  /**
+   * Input Stream that generates specified number of random bytes.
+   */
+  static class RandomInputStream extends InputStream
+      implements Seekable, PositionedReadable {
+
+    private final Random r = new Random();
+    private BytesWritable val = null;
+    private int positionInVal = 0;// current position in the buffer 'val'
+
+    private long totalSize = 0;// total number of random bytes to be generated
+    private long curPos = 0;// current position in this stream
+
+    /**
+     * @param size total number of random bytes to be generated in this stream
+     * @param bufferSize the buffer size. An internal buffer array of length
+     * <code>bufferSize</code> is created. If <code>bufferSize</code> is not a
+     * positive number, then a default value of 1MB is used.
+     */
+    RandomInputStream(long size, int bufferSize) {
+      totalSize = size;
+      if (bufferSize <= 0) {
+        bufferSize = DEFAULT_BUFFER_SIZE;
+      }
+      val = new BytesWritable(new byte[bufferSize]);
+    }
+
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      if (curPos < totalSize) {
+        if (positionInVal < val.getLength()) {// use buffered byte
+          b[0] = val.getBytes()[positionInVal++];
+          ++curPos;
+        } else {// generate data
+          int num = read(b);
+          if (num < 0) {
+            return num;
+          }
+        }
+      } else {
+        return -1;
+      }
+      return b[0];
+    }
+
+    @Override
+    public int read(byte[] bytes) throws IOException {
+      return read(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len) throws IOException {
+      if (curPos == totalSize) {
+        return -1;// EOF
+      }
+      int numBytes = len;
+      if (numBytes > (totalSize - curPos)) {// position in file is close to EOF
+        numBytes = (int)(totalSize - curPos);
+      }
+      if (numBytes > (val.getLength() - positionInVal)) {
+        // need to generate data into val
+        r.nextBytes(val.getBytes());
+        positionInVal = 0;
+      }
+
+      System.arraycopy(val.getBytes(), positionInVal, bytes, off, numBytes);
+      curPos += numBytes;
+      positionInVal += numBytes;
+      return numBytes;
+    }
+
+    @Override
+    public int available() {
+      return (int)(val.getLength() - positionInVal);
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Get the current position in this stream/pseudo-file
+     * @return the position in this stream/pseudo-file
+     * @throws IOException
+     */
+    @Override
+    public long getPos() throws IOException {
+      return curPos;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public FSDataOutputStream append(Path path, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("Append is not supported"
+        + " in pseudo local file system.");
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    throw new UnsupportedOperationException("Mkdirs is not supported"
+        + " in pseudo local file system.");
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    throw new UnsupportedOperationException("Rename is not supported"
+        + " in pseudo local file system.");
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive) {
+    throw new UnsupportedOperationException("File deletion is not supported "
+        + "in pseudo local file system.");
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    throw new UnsupportedOperationException("SetWorkingDirectory "
+        + "is not supported in pseudo local file system.");
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Mon May 23 14:22:36 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.tools.rumen.Pre
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.InputSplit;
 
@@ -209,9 +210,14 @@ public class DebugJobProducer implements
 
    @Override
    public String getUser() {
-     String s = String.format("foobar%d", id);
-     GridmixTestUtils.createHomeAndStagingDirectory(s, (JobConf)conf);
-     return s;
+     // Obtain user name from job configuration, if available.
+     // Otherwise use dummy user names.
+     String user = conf.get(MRJobConfig.USER_NAME);
+     if (user == null) {
+       user = String.format("foobar%d", id);
+     }
+     GridmixTestUtils.createHomeAndStagingDirectory(user, (JobConf)conf);
+     return user;
    }
 
    @Override
@@ -289,7 +295,7 @@ public class DebugJobProducer implements
 
     @Override
     public org.apache.hadoop.mapred.JobConf getJobConf() {
-      throw new UnsupportedOperationException();
+      return new JobConf(conf);
     }
 
     @Override