You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ra...@apache.org on 2011/05/23 16:22:36 UTC
svn commit: r1126499 [1/2] - in /hadoop/mapreduce/trunk/src:
contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
docs/src/documentation/content/xdocs/
Author: ravigummadi
Date: Mon May 23 14:22:36 2011
New Revision: 1126499
URL: http://svn.apache.org/viewvc?rev=1126499&view=rev
Log:
MAPREDUCE-2407. Make GridMix emulate usage of distributed cache files in simulated jobs.
Added:
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestPseudoLocalFs.java
Modified:
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/gridmix.xml
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1126499&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Mon May 23 14:22:36 2011
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Emulation of Distributed Cache Usage in gridmix.
+ * <br> Emulation of Distributed Cache Load in gridmix will put load on
+ * TaskTrackers and affects execution time of tasks because of localization of
+ * distributed cache files by TaskTrackers.
+ * <br> Gridmix creates distributed cache files for simulated jobs by launching
+ * a MapReduce job {@link GenerateDistCacheData} in advance i.e. before
+ * launching simulated jobs.
+ * <br> The distributed cache file paths used in the original cluster are mapped
+ * to unique file names in the simulated cluster.
+ * <br> All HDFS-based distributed cache files generated by gridmix are
+ * public distributed cache files. But Gridmix makes sure that load incurred due
+ * to localization of private distributed cache files on the original cluster
+ * is also faithfully simulated. Gridmix emulates the load due to private
+ * distributed cache files by mapping private distributed cache files of
+ * different users in the original cluster to different public distributed cache
+ * files in the simulated cluster.
+ *
+ * <br> The configuration properties like
+ * {@link MRJobConfig#CACHE_FILES}, {@link MRJobConfig#CACHE_FILE_VISIBILITIES},
+ * {@link MRJobConfig#CACHE_FILES_SIZES} and
+ * {@link MRJobConfig#CACHE_FILE_TIMESTAMPS} obtained from trace are used to
+ * decide
+ * <li> file size of each distributed cache file to be generated
+ * <li> whether a distributed cache file is already seen in this trace file
+ * <li> whether a distributed cache file was considered public or private.
+ * <br>
+ * <br> Gridmix configures these generated files as distributed cache files for
+ * the simulated jobs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DistributedCacheEmulator {
+ private static final Log LOG =
+ LogFactory.getLog(DistributedCacheEmulator.class);
+
+ static final long AVG_BYTES_PER_MAP = 128 * 1024 * 1024L;// 128MB
+
+ // If at least 1 distributed cache file is missing in the expected
+ // distributed cache dir, Gridmix cannot proceed with emulation of
+ // distributed cache load.
+ int MISSING_DIST_CACHE_FILES_ERROR = 1;
+
+ private Path distCachePath;
+
+ /**
+ * Map between simulated cluster's distributed cache file paths and their
+ * file sizes. Unique distributed cache files are entered into this map.
+ * 2 distributed cache files are considered same if and only if their
+ * file paths, visibilities and timestamps are same.
+ */
+ private Map<String, Long> distCacheFiles = new HashMap<String, Long>();
+
+ /**
+ * Configuration property for whether gridmix should emulate
+ * distributed cache usage or not. Default value is true.
+ */
+ static final String GRIDMIX_EMULATE_DISTRIBUTEDCACHE =
+ "gridmix.distributed-cache-emulation.enable";
+
+ // Whether to emulate distributed cache usage or not
+ boolean emulateDistributedCache = true;
+
+ // Whether to generate distributed cache data or not
+ boolean generateDistCacheData = false;
+
+ Configuration conf; // gridmix configuration
+
+ // Pseudo local file system where local FS based distributed cache files are
+ // created by gridmix.
+ FileSystem pseudoLocalFs = null;
+
+ {
+ // Need to handle deprecation of these MapReduce-internal configuration
+ // properties as MapReduce doesn't handle their deprecation.
+ Configuration.addDeprecation("mapred.cache.files.filesizes",
+ new String[] {MRJobConfig.CACHE_FILES_SIZES});
+ Configuration.addDeprecation("mapred.cache.files.visibilities",
+ new String[] {MRJobConfig.CACHE_FILE_VISIBILITIES});
+ }
+
+ /**
+ * @param conf gridmix configuration
+ * @param ioPath <ioPath>/distributedCache/ is the gridmix Distributed
+ * Cache directory
+ */
+ public DistributedCacheEmulator(Configuration conf, Path ioPath) {
+ this.conf = conf;
+ distCachePath = new Path(ioPath, "distributedCache");
+ this.conf.setClass("fs.pseudo.impl", PseudoLocalFs.class, FileSystem.class);
+ }
+
+ /**
+ * This is to be called before any other method of DistributedCacheEmulator.
+ * <br> Checks if emulation of distributed cache load is needed and is feasible.
+ * Sets the flags generateDistCacheData and emulateDistributedCache to the
+ * appropriate values.
+ * <br> Gridmix does not emulate distributed cache load if
+ * <ol><li> the specific gridmix job type doesn't need emulation of
+ * distributed cache load OR
+ * <li> the trace is coming from a stream instead of file OR
+ * <li> the distributed cache dir where distributed cache data is to be
+ * generated by gridmix is on local file system OR
+ * <li> execute permission is not there for any of the ascendant directories
+ * of <ioPath> till root. This is because for emulation of distributed
+ * cache load, distributed cache files created under
+ * <ioPath/distributedCache/public/> should be considered by hadoop
+ * as public distributed cache files.
+ * <li> creation of pseudo local file system fails.</ol>
+ * <br> For (2), (3), (4) and (5), generation of distributed cache data
+ * is also disabled.
+ *
+ * @param traceIn trace file path. If this is '-', then trace comes from the
+ * stream stdin.
+ * @param jobCreator job creator of gridmix jobs of a specific type
+ * @param generate true if -generate option was specified
+ * @throws IOException
+ */
+ void init(String traceIn, JobCreator jobCreator, boolean generate)
+ throws IOException {
+ emulateDistributedCache = jobCreator.canEmulateDistCacheLoad()
+ && conf.getBoolean(GRIDMIX_EMULATE_DISTRIBUTEDCACHE, true);
+ generateDistCacheData = generate;
+
+ if (generateDistCacheData || emulateDistributedCache) {
+ if ("-".equals(traceIn)) {// trace is from stdin
+ LOG.warn("Gridmix will not emulate Distributed Cache load because "
+ + "the input trace source is a stream instead of file.");
+ emulateDistributedCache = generateDistCacheData = false;
+ } else if (FileSystem.getLocal(conf).getUri().getScheme().equals(
+ distCachePath.toUri().getScheme())) {// local FS
+ LOG.warn("Gridmix will not emulate Distributed Cache load because "
+ + "<iopath> provided is on local file system.");
+ emulateDistributedCache = generateDistCacheData = false;
+ } else {
+ // Check if execute permission is there for all the ascendant
+ // directories of distCachePath till root.
+ FileSystem fs = FileSystem.get(conf);
+ Path cur = distCachePath.getParent();
+ while (cur != null) {
+ if (cur.toString().length() > 0) {
+ FsPermission perm = fs.getFileStatus(cur).getPermission();
+ if (!perm.getOtherAction().and(FsAction.EXECUTE).equals(
+ FsAction.EXECUTE)) {
+ LOG.warn("Gridmix will not emulate Distributed Cache load "
+ + "because the ascendant directory (of distributed cache "
+ + "directory) " + cur + " doesn't have execute permission "
+ + "for others.");
+ emulateDistributedCache = generateDistCacheData = false;
+ break;
+ }
+ }
+ cur = cur.getParent();
+ }
+ }
+ }
+
+ // Check if pseudo local file system can be created
+ try {
+ pseudoLocalFs = FileSystem.get(new URI("pseudo:///"), conf);
+ } catch (URISyntaxException e) {
+ LOG.warn("Gridmix will not emulate Distributed Cache load because "
+ + "creation of pseudo local file system failed.");
+ e.printStackTrace();
+ emulateDistributedCache = generateDistCacheData = false;
+ return;
+ }
+ }
+
+ /**
+ * @return true if gridmix should emulate distributed cache load
+ */
+ boolean shouldEmulateDistCacheLoad() {
+ return emulateDistributedCache;
+ }
+
+ /**
+ * @return true if gridmix should generate distributed cache data
+ */
+ boolean shouldGenerateDistCacheData() {
+ return generateDistCacheData;
+ }
+
+ /**
+ * @return the distributed cache directory path
+ */
+ Path getDistributedCacheDir() {
+ return distCachePath;
+ }
+
+ /**
+ * Create distributed cache directories.
+ * Also create a file that contains the list of distributed cache files
+ * that will be used as distributed cache files for all the simulated jobs.
+ * @param jsp job story producer for the trace
+ * @return exit code
+ * @throws IOException
+ */
+ int setupGenerateDistCacheData(JobStoryProducer jsp)
+ throws IOException {
+
+ createDistCacheDirectory();
+ return buildDistCacheFilesList(jsp);
+ }
+
+ /**
+ * Create distributed cache directory where distributed cache files will be
+ * created by the MapReduce job {@link GenerateDistCacheData#JOB_NAME}.
+ * @throws IOException
+ */
+ private void createDistCacheDirectory() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem.mkdirs(fs, distCachePath, new FsPermission((short) 0777));
+ }
+
+ /**
+ * Create the list of unique distributed cache files needed for all the
+ * simulated jobs and write the list to a special file.
+ * @param jsp job story producer for the trace
+ * @return exit code
+ * @throws IOException
+ */
+ private int buildDistCacheFilesList(JobStoryProducer jsp) throws IOException {
+ // Read all the jobs from the trace file and build the list of unique
+ // distributed cache files.
+ JobStory jobStory;
+ while ((jobStory = jsp.getNextJob()) != null) {
+ if (jobStory.getOutcome() == Pre21JobHistoryConstants.Values.SUCCESS &&
+ jobStory.getSubmissionTime() >= 0) {
+ updateHDFSDistCacheFilesList(jobStory);
+ }
+ }
+ jsp.close();
+
+ return writeDistCacheFilesList();
+ }
+
+ /**
+ * For the job to be simulated, identify the needed distributed cache files by
+ * mapping original cluster's distributed cache file paths to the simulated cluster's
+ * paths and add these paths in the map {@code distCacheFiles}.
+ *<br>
+ * JobStory should contain distributed cache related properties like
+ * <li> {@link MRJobConfig#CACHE_FILES}
+ * <li> {@link MRJobConfig#CACHE_FILE_VISIBILITIES}
+ * <li> {@link MRJobConfig#CACHE_FILES_SIZES}
+ * <li> {@link MRJobConfig#CACHE_FILE_TIMESTAMPS}
+ * <li> {@link MRJobConfig#CLASSPATH_FILES}
+ *
+ * <li> {@link MRJobConfig#CACHE_ARCHIVES}
+ * <li> {@link MRJobConfig#CACHE_ARCHIVES_VISIBILITIES}
+ * <li> {@link MRJobConfig#CACHE_ARCHIVES_SIZES}
+ * <li> {@link MRJobConfig#CACHE_ARCHIVES_TIMESTAMPS}
+ * <li> {@link MRJobConfig#CLASSPATH_ARCHIVES}
+ *
+ * <li> {@link MRJobConfig#CACHE_SYMLINK}
+ *
+ * @param jobdesc JobStory of original job obtained from trace
+ * @throws IOException
+ */
+ void updateHDFSDistCacheFilesList(JobStory jobdesc) throws IOException {
+
+ // Map original job's distributed cache file paths to simulated cluster's
+ // paths, to be used by this simulated job.
+ JobConf jobConf = jobdesc.getJobConf();
+
+ String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+ if (files != null) {
+
+ String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+ String[] visibilities =
+ jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+ String[] timeStamps =
+ jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+
+ FileSystem fs = FileSystem.get(conf);
+ String user = jobConf.getUser();
+ for (int i = 0; i < files.length; i++) {
+ // Check if visibilities are available because older hadoop versions
+ // didn't have public, private Distributed Caches separately.
+ boolean visibility =
+ (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+ if (isLocalDistCacheFile(files[i], user, visibility)) {
+ // local FS based distributed cache file.
+ // Create this file on the pseudo local FS on the fly (i.e. when the
+ // simulated job is submitted).
+ continue;
+ }
+ // distributed cache file on hdfs
+ String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+ visibility, user);
+
+ // No need to add a distributed cache file path to the list if
+ // (1) the mapped path is already there in the list OR
+ // (2) the file with the mapped path already exists.
+ // In any of the above 2 cases, file paths, timestamps, file sizes and
+ // visibilities match. File sizes should match if file paths and
+ // timestamps match because single file path with single timestamp
+ // should correspond to a single file size.
+ if (distCacheFiles.containsKey(mappedPath) ||
+ fs.exists(new Path(mappedPath))) {
+ continue;
+ }
+ distCacheFiles.put(mappedPath, Long.valueOf(fileSizes[i]));
+ }
+ }
+ }
+
+ /**
+ * Check if the file path provided was constructed by MapReduce for a
+ * distributed cache file on local file system.
+ * @param filePath path of the distributed cache file
+ * @param user job submitter of the job for which <filePath> is a
+ * distributed cache file
+ * @param visibility <code>true</code> for public distributed cache file
+ * @return true if the path provided is of a local file system based
+ * distributed cache file
+ */
+ private boolean isLocalDistCacheFile(String filePath, String user,
+ boolean visibility) {
+ return (!visibility && filePath.contains(user + "/.staging"));
+ }
+
+ /**
+ * Map the HDFS based distributed cache file path from original cluster to
+ * a unique file name on the simulated cluster.
+ * <br> Unique distributed file names on simulated cluster are generated
+ * using original cluster's <li>file path, <li>timestamp and <li> the
+ * job-submitter for private distributed cache file.
+ * <br> This implies that if on original cluster, a single HDFS file
+ * considered as two private distributed cache files for two jobs of
+ * different users, then the corresponding simulated jobs will have two
+ * different files of the same size in public distributed cache, one for each
+ * user. Both these simulated jobs will not share these distributed cache
+ * files, thus leading to the same load as seen in the original cluster.
+ * @param file distributed cache file path
+ * @param timeStamp time stamp of dist cachce file
+ * @param isPublic true if this distributed cache file is a public
+ * distributed cache file
+ * @param user job submitter on original cluster
+ * @return the mapped path on simulated cluster
+ */
+ private String mapDistCacheFilePath(String file, String timeStamp,
+ boolean isPublic, String user) {
+ String id = file + timeStamp;
+ if (!isPublic) {
+ // consider job-submitter for private distributed cache file
+ id = id.concat(user);
+ }
+ return new Path(distCachePath, MD5Hash.digest(id).toString()).toUri()
+ .getPath();
+ }
+
+ /**
+ * Write the list of distributed cache files in the decreasing order of
+ * file sizes into the sequence file. This file will be input to the job
+ * {@link GenerateDistCacheData}.
+ * Also validates if -generate option is missing and distributed cache files
+ * are missing.
+ * @return exit code
+ * @throws IOException
+ */
+ private int writeDistCacheFilesList()
+ throws IOException {
+ // Sort the distributed cache files in the decreasing order of file sizes.
+ List dcFiles = new ArrayList(distCacheFiles.entrySet());
+ Collections.sort(dcFiles, new Comparator() {
+ public int compare(Object dc1, Object dc2) {
+ return ((Comparable) ((Map.Entry) (dc2)).getValue())
+ .compareTo(((Map.Entry) (dc1)).getValue());
+ }
+ });
+
+ // write the sorted distributed cache files to the sequence file
+ FileSystem fs = FileSystem.get(conf);
+ Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
+ conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
+ distCacheFilesList.toString());
+ SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
+ distCacheFilesList, LongWritable.class, BytesWritable.class,
+ SequenceFile.CompressionType.NONE);
+
+ // Total number of unique distributed cache files
+ int fileCount = dcFiles.size();
+ long byteCount = 0;// Total size of all distributed cache files
+ long bytesSync = 0;// Bytes after previous sync;used to add sync marker
+
+ for (Iterator it = dcFiles.iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry)it.next();
+ LongWritable fileSize =
+ new LongWritable(Long.valueOf(entry.getValue().toString()));
+ BytesWritable filePath =
+ new BytesWritable(entry.getKey().toString().getBytes());
+
+ byteCount += fileSize.get();
+ bytesSync += fileSize.get();
+ if (bytesSync > AVG_BYTES_PER_MAP) {
+ src_writer.sync();
+ bytesSync = fileSize.get();
+ }
+ src_writer.append(fileSize, filePath);
+ }
+ if (src_writer != null) {
+ src_writer.close();
+ }
+ // Set delete on exit for 'dist cache files list' as it is not needed later.
+ fs.deleteOnExit(distCacheFilesList);
+
+ conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
+ conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
+ LOG.info("Number of HDFS based distributed cache files to be generated is"
+ + fileCount + ". Total size of HDFS based distributed cache files "
+ + "to be generated is " + byteCount);
+
+ if (!shouldGenerateDistCacheData() && fileCount > 0) {
+ LOG.error("Missing " + fileCount + " distributed cache files under the "
+ + " directory\n" + distCachePath + "\nthat are needed for gridmix"
+ + " to emulate distributed cache load. Either use -generate\noption"
+ + " to generate distributed cache data along with input data OR "
+ + "disable\ndistributed cache emulation by configuring '"
+ + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ + "' to false.");
+ return MISSING_DIST_CACHE_FILES_ERROR;
+ }
+ return 0;
+ }
+
+ /**
+ * If gridmix needs to emulate distributed cache load, then configure
+ * distributed cache files of a simulated job by mapping the original
+ * cluster's distributed cache file paths to the simulated cluster's paths and
+ * setting these mapped paths in the job configuration of the simulated job.
+ * <br>
+ * Configure local FS based distributed cache files through the property
+ * "tmpfiles" and hdfs based distributed cache files through the property
+ * {@link MRJobConfig#CACHE_FILES}.
+ * @param conf configuration for the simulated job to be run
+ * @param jobConf job configuration of original cluster's job, obtained from
+ * trace
+ * @throws IOException
+ */
+ void configureDistCacheFiles(Configuration conf, JobConf jobConf)
+ throws IOException {
+ if (shouldEmulateDistCacheLoad()) {
+
+ String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+ if (files != null) {
+ // hdfs based distributed cache files to be configured for simulated job
+ List<String> cacheFiles = new ArrayList<String>();
+ // local FS based distributed cache files to be configured for
+ // simulated job
+ List<String> localCacheFiles = new ArrayList<String>();
+
+ String[] visibilities =
+ jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+ String[] timeStamps =
+ jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+ String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+
+ String user = jobConf.getUser();
+ for (int i = 0; i < files.length; i++) {
+ // Check if visibilities are available because older hadoop versions
+ // didn't have public, private Distributed Caches separately.
+ boolean visibility =
+ (visibilities == null) ? true : Boolean.valueOf(visibilities[i]);
+ if (isLocalDistCacheFile(files[i], user, visibility)) {
+ // local FS based distributed cache file.
+ // Create this file on the pseudo local FS.
+ String fileId = MD5Hash.digest(files[i] + timeStamps[i]).toString();
+ long fileSize = Long.valueOf(fileSizes[i]);
+ Path mappedLocalFilePath =
+ PseudoLocalFs.generateFilePath(fileId, fileSize)
+ .makeQualified(pseudoLocalFs.getUri(),
+ pseudoLocalFs.getWorkingDirectory());
+ pseudoLocalFs.create(mappedLocalFilePath);
+ localCacheFiles.add(mappedLocalFilePath.toUri().toString());
+ } else {
+ // hdfs based distributed cache file.
+ // Get the mapped HDFS path on simulated cluster
+ String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+ visibility, user);
+ cacheFiles.add(mappedPath);
+ }
+ }
+ if (cacheFiles.size() > 0) {
+ // configure hdfs based distributed cache files for simulated job
+ conf.setStrings(MRJobConfig.CACHE_FILES,
+ cacheFiles.toArray(new String[cacheFiles.size()]));
+ }
+ if (localCacheFiles.size() > 0) {
+ // configure local FS based distributed cache files for simulated job
+ conf.setStrings("tmpfiles", localCacheFiles.toArray(
+ new String[localCacheFiles.size()]));
+ }
+ }
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Mon May 23 14:22:36 2011
@@ -85,10 +85,11 @@ class GenerateData extends GridmixJob {
* Replication of generated data.
*/
public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
+ static final String JOB_NAME = "GRIDMIX_GENERATE_INPUT_DATA";
public GenerateData(Configuration conf, Path outdir, long genbytes)
throws IOException {
- super(conf, 0L, "GRIDMIX_GENDATA");
+ super(conf, 0L, JOB_NAME);
job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
FileOutputFormat.setOutputPath(job, outdir);
}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java?rev=1126499&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java Mon May 23 14:22:36 2011
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * GridmixJob that generates distributed cache files.
+ * {@link GenerateDistCacheData} expects a list of distributed cache files to be
+ * generated as input. This list is expected to be stored as a sequence file
+ * and the filename is expected to be configured using
+ * {@code gridmix.distcache.file.list}.
+ * This input file contains the list of distributed cache files and their sizes.
+ * For each record (i.e. file size and file path) in this input file,
+ * a file with the specific file size at the specific path is created.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class GenerateDistCacheData extends GridmixJob {
+
+ /**
+ * Number of distributed cache files to be created by gridmix
+ */
+ static final String GRIDMIX_DISTCACHE_FILE_COUNT =
+ "gridmix.distcache.file.count";
+ /**
+ * Total number of bytes to be written to the distributed cache files by
+ * gridmix. i.e. Sum of sizes of all unique distributed cache files to be
+ * created by gridmix.
+ */
+ static final String GRIDMIX_DISTCACHE_BYTE_COUNT =
+ "gridmix.distcache.byte.count";
+ /**
+ * The special file created(and used) by gridmix, that contains the list of
+ * unique distributed cache files that are to be created and their sizes.
+ */
+ static final String GRIDMIX_DISTCACHE_FILE_LIST =
+ "gridmix.distcache.file.list";
+ static final String JOB_NAME = "GRIDMIX_GENERATE_DISTCACHE_DATA";
+
+ public GenerateDistCacheData(Configuration conf) throws IOException {
+ super(conf, 0L, JOB_NAME);
+ }
+
+ @Override
+ public Job call() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ ugi.doAs( new PrivilegedExceptionAction <Job>() {
+ public Job run() throws IOException, ClassNotFoundException,
+ InterruptedException {
+ job.setMapperClass(GenDCDataMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(BytesWritable.class);
+ job.setInputFormatClass(GenDCDataFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setJarByClass(GenerateDistCacheData.class);
+ try {
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ } catch (IOException e) {
+ LOG.error("Error while adding input path ", e);
+ }
+ job.submit();
+ return job;
+ }
+ });
+ return job;
+ }
+
+ public static class GenDCDataMapper
+ extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
+
+ private BytesWritable val;
+ private final Random r = new Random();
+ private FileSystem fs;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ val = new BytesWritable(new byte[context.getConfiguration().getInt(
+ GenerateData.GRIDMIX_VAL_BYTES, 1024 * 1024)]);
+ fs = FileSystem.get(context.getConfiguration());
+ }
+
+ // Create one distributed cache file with the needed file size.
+ // key is distributed cache file size and
+ // value is distributed cache file path.
+ @Override
+ public void map(LongWritable key, BytesWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ String fileName = new String(value.getBytes(), 0, value.getLength());
+ Path path = new Path(fileName);
+
+ /**
+ * Create distributed cache file with the permissions 0755.
+ * Since the private distributed cache directory doesn't have execute
+ * permission for others, it is OK to set read permission for others for
+ * the files under that directory and still they will become 'private'
+ * distributed cache files on the simulated cluster.
+ */
+ FSDataOutputStream dos =
+ FileSystem.create(fs, path, new FsPermission((short)0755));
+
+ for (long bytes = key.get(); bytes > 0; bytes -= val.getLength()) {
+ r.nextBytes(val.getBytes());
+ val.setSize((int)Math.min(val.getLength(), bytes));
+ dos.write(val.getBytes(), 0, val.getLength());// Write to distCache file
+ }
+ dos.close();
+ }
+ }
+
+ /**
+ * InputFormat for GenerateDistCacheData.
+ * Input to GenerateDistCacheData is the special file(in SequenceFile format)
+ * that contains the list of distributed cache files to be generated along
+ * with their file sizes.
+ */
+ static class GenDCDataFormat
+ extends InputFormat<LongWritable, BytesWritable> {
+
+ // Split the special file that contains the list of distributed cache file
+ // paths and their file sizes such that each split corresponds to
+ // approximately same amount of distributed cache data to be generated.
+ // Consider numTaskTrackers * numMapSlotsPerTracker as the number of maps
+ // for this job, if there is lot of data to be generated.
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
+ final JobClient client = new JobClient(jobConf);
+ ClusterStatus stat = client.getClusterStatus(true);
+ int numTrackers = stat.getTaskTrackers();
+ final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+
+ // Total size of distributed cache files to be generated
+ final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
+ // Get the path of the special file
+ String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
+ if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
+ throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ + "), total_size (" + totalSize + "), filelisturi ("
+ + distCacheFileList + ")");
+ }
+
+ Path sequenceFile = new Path(distCacheFileList);
+ FileSystem fs = sequenceFile.getFileSystem(jobConf);
+ FileStatus srcst = fs.getFileStatus(sequenceFile);
+ // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
+ int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
+ int numSplits = numTrackers * numMapSlotsPerTracker;
+
+ List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+
+ // Average size of data to be generated by each map task
+ final long targetSize = Math.max(totalSize / numSplits,
+ DistributedCacheEmulator.AVG_BYTES_PER_MAP);
+ long splitStartPosition = 0L;
+ long splitEndPosition = 0L;
+ long acc = 0L;
+ long bytesRemaining = srcst.getLen();
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+ while (reader.next(key, value)) {
+
+ // If adding this file would put this split past the target size,
+ // cut the last split and put this file in the next split.
+ if (acc + key.get() > targetSize && acc != 0) {
+ long splitSize = splitEndPosition - splitStartPosition;
+ splits.add(new FileSplit(
+ sequenceFile, splitStartPosition, splitSize, (String[])null));
+ bytesRemaining -= splitSize;
+ splitStartPosition = splitEndPosition;
+ acc = 0L;
+ }
+ acc += key.get();
+ splitEndPosition = reader.getPosition();
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ if (bytesRemaining != 0) {
+ splits.add(new FileSplit(
+ sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
+ }
+
+ return splits;
+ }
+
+ /**
+ * Returns a reader for this split of the distributed cache file list.
+ */
+ @Override
+ public RecordReader<LongWritable, BytesWritable> createRecordReader(
+ InputSplit split, final TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<LongWritable, BytesWritable>();
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Mon May 23 14:22:36 2011
@@ -92,6 +92,8 @@ public class Gridmix extends Configured
*/
public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
+ private DistributedCacheEmulator distCacheEmulator;
+
// Submit data structures
private JobFactory factory;
private JobSubmitter submitter;
@@ -102,36 +104,61 @@ public class Gridmix extends Configured
private final Shutdown sdh = new Shutdown();
/**
- * Write random bytes at the path provided.
+ * Write random bytes at the path <ioPath>/input/
* @see org.apache.hadoop.mapred.gridmix.GenerateData
*/
protected void writeInputData(long genbytes, Path ioPath)
throws IOException, InterruptedException {
+ Path inputDir = new Path(ioPath, "input");
final Configuration conf = getConf();
- final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
- submitter.add(genData);
+ final GridmixJob genData = new GenerateData(conf, inputDir, genbytes);
LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
" of test data...");
+ launchGridmixJob(genData);
+
+ FsShell shell = new FsShell(conf);
+ try {
+ LOG.info("Changing the permissions for inputPath " + inputDir.toString());
+ shell.run(new String[] {"-chmod","-R","777", inputDir.toString()});
+ } catch (Exception e) {
+ LOG.error("Couldnt change the file permissions " , e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Write random bytes in the distributed cache files that will be used by all
+ * simulated jobs of current gridmix run, if files are to be generated.
+ * Do this as part of the MapReduce job {@link GenerateDistCacheData#JOB_NAME}
+ * @see org.apache.hadoop.mapred.gridmix.GenerateDistCacheData
+ */
+ protected void writeDistCacheData(Configuration conf)
+ throws IOException, InterruptedException {
+ int fileCount =
+ conf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+ if (fileCount > 0) {// generate distributed cache files
+ final GridmixJob genDistCacheData = new GenerateDistCacheData(conf);
+ LOG.info("Generating distributed cache data of size " + conf.getLong(
+ GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+ launchGridmixJob(genDistCacheData);
+ }
+ }
+
+ // Launch Input/DistCache Data Generation job and wait for completion
+ void launchGridmixJob(GridmixJob job)
+ throws IOException, InterruptedException {
+ submitter.add(job);
+
// TODO add listeners, use for job dependencies
TimeUnit.SECONDS.sleep(10);
try {
- genData.getJob().waitForCompletion(false);
+ job.getJob().waitForCompletion(false);
} catch (ClassNotFoundException e) {
throw new IOException("Internal error", e);
}
- if (!genData.getJob().isSuccessful()) {
- throw new IOException("Data generation failed!");
+ if (!job.getJob().isSuccessful()) {
+ throw new IOException(job.getJob().getJobName() + " job failed!");
}
-
- FsShell shell = new FsShell(conf);
- try {
- LOG.info("Changing the permissions for inputPath " + ioPath.toString());
- shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
- } catch (Exception e) {
- LOG.error("Couldnt change the file permissions " , e);
- throw new IOException(e);
- }
- LOG.info("Done.");
}
/**
@@ -158,7 +185,9 @@ public class Gridmix extends Configured
* @param conf Configuration data, no keys specific to this context
* @param traceIn Either a Path to the trace data or "-" for
* stdin
- * @param ioPath Path from which input data is read
+ * @param ioPath <ioPath>/input/ is the dir from which input data is
+ * read and <ioPath>/distributedCache/ is the gridmix
+ * distributed cache directory.
* @param scratchDir Path into which job output is written
* @param startFlag Semaphore for starting job trace pipeline
*/
@@ -166,6 +195,7 @@ public class Gridmix extends Configured
Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
throws IOException {
try {
+ Path inputDir = new Path(ioPath, "input");
GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
conf, GridmixJobSubmissionPolicy.STRESS);
LOG.info(" Submission policy is " + policy.name());
@@ -179,11 +209,14 @@ public class Gridmix extends Configured
int numThreads = conf.getInt(GRIDMIX_SUB_THR, noOfSubmitterThreads);
int queueDep = conf.getInt(GRIDMIX_QUE_DEP, 5);
submitter = createJobSubmitter(monitor, numThreads, queueDep,
- new FilePool(conf, ioPath), userResolver,
+ new FilePool(conf, inputDir), userResolver,
statistics);
-
+ distCacheEmulator = new DistributedCacheEmulator(conf, ioPath);
+
factory = createJobFactory(submitter, traceIn, scratchDir, conf,
startFlag, userResolver);
+ factory.jobCreator.setDistCacheEmulator(distCacheEmulator);
+
if (policy == GridmixJobSubmissionPolicy.SERIAL) {
statistics.addJobStatsListeners(factory);
} else {
@@ -244,6 +277,9 @@ public class Gridmix extends Configured
printUsage(System.err);
return 1;
}
+
+ // Should gridmix generate distributed cache data ?
+ boolean generate = false;
long genbytes = -1L;
String traceIn = null;
Path ioPath = null;
@@ -257,6 +293,7 @@ public class Gridmix extends Configured
for (int i = 0; i < argv.length - 2; ++i) {
if ("-generate".equals(argv[i])) {
genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
+ generate = true;
} else if ("-users".equals(argv[i])) {
userRsrc = new URI(argv[++i]);
} else {
@@ -287,12 +324,33 @@ public class Gridmix extends Configured
printUsage(System.err);
return 1;
}
- return start(conf, traceIn, ioPath, genbytes, userResolver);
+ return start(conf, traceIn, ioPath, genbytes, userResolver, generate);
}
+ /**
+ *
+ * @param conf gridmix configuration
+ * @param traceIn trace file path(if it is '-', then trace comes from the
+ * stream stdin)
+ * @param ioPath Working directory for gridmix. GenerateData job
+ * will generate data in the directory <ioPath>/input/ and
+ * distributed cache data is generated in the directory
+ * <ioPath>/distributedCache/, if -generate option is
+ * specified.
+ * @param genbytes size of input data to be generated under the directory
+ * <ioPath>/input/
+ * @param userResolver gridmix user resolver
+ * @param generate true if -generate option was specified
+ * @return exit code
+ * @throws IOException
+ * @throws InterruptedException
+ */
int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
- UserResolver userResolver) throws IOException, InterruptedException {
+ UserResolver userResolver, boolean generate)
+ throws IOException, InterruptedException {
InputStream trace = null;
+ ioPath = ioPath.makeQualified(ioPath.getFileSystem(conf));
+
try {
Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
@@ -310,10 +368,12 @@ public class Gridmix extends Configured
// scan input dir contents
submitter.refreshFilePool();
- // create scratch directory(output path of gridmix)
- final FileSystem scratchFs = scratchDir.getFileSystem(conf);
- FileSystem.mkdirs(scratchFs, scratchDir,
- new FsPermission((short) 0777));
+ // set up the needed things for emulation of various loads
+ int exitCode = setupEmulation(conf, traceIn, scratchDir, ioPath,
+ generate);
+ if (exitCode != 0) {
+ return exitCode;
+ }
factory.start();
statistics.start();
@@ -350,6 +410,64 @@ public class Gridmix extends Configured
}
/**
+ * Create gridmix output directory. Setup things for emulation of
+ * various loads, if needed.
+ * @param conf gridmix configuration
+ * @param traceIn trace file path(if it is '-', then trace comes from the
+ * stream stdin)
+ * @param scratchDir gridmix output directory
+ * @param ioPath Working directory for gridmix.
+ * @param generate true if -generate option was specified
+ * @return exit code
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private int setupEmulation(Configuration conf, String traceIn,
+ Path scratchDir, Path ioPath, boolean generate)
+ throws IOException, InterruptedException {
+ // create scratch directory(output directory of gridmix)
+ final FileSystem scratchFs = scratchDir.getFileSystem(conf);
+ FileSystem.mkdirs(scratchFs, scratchDir, new FsPermission((short) 0777));
+
+ // Setup things needed for emulation of distributed cache load
+ return setupDistCacheEmulation(conf, traceIn, ioPath, generate);
+ // Setup emulation of other loads like CPU load, Memory load
+ }
+
+ /**
+ * Setup gridmix for emulation of distributed cache load. This includes
+ * generation of distributed cache files, if needed.
+ * @param conf gridmix configuration
+ * @param traceIn trace file path(if it is '-', then trace comes from the
+ * stream stdin)
+ * @param ioPath <ioPath>/input/ is the dir where input data (a) exists
+ * or (b) is generated. <ioPath>/distributedCache/ is the
+ * folder where distributed cache data (a) exists or (b) is to be
+ * generated by gridmix.
+ * @param generate true if -generate option was specified
+ * @return exit code
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private int setupDistCacheEmulation(Configuration conf, String traceIn,
+ Path ioPath, boolean generate) throws IOException, InterruptedException {
+ distCacheEmulator.init(traceIn, factory.jobCreator, generate);
+ int exitCode = 0;
+ if (distCacheEmulator.shouldGenerateDistCacheData() ||
+ distCacheEmulator.shouldEmulateDistCacheLoad()) {
+
+ JobStoryProducer jsp = createJobStoryProducer(traceIn, conf);
+ exitCode = distCacheEmulator.setupGenerateDistCacheData(jsp);
+ if (exitCode == 0) {
+ // If there are files to be generated, run a MapReduce job to generate
+ // these distributed cache files of all the simulated jobs of this trace.
+ writeDistCacheData(conf);
+ }
+ }
+ return exitCode;
+ }
+
+ /**
* Handles orderly shutdown by requesting that each component in the
* pipeline abort its progress, waiting for each to exit and killing
* any jobs still running on the cluster.
@@ -446,6 +564,11 @@ public class Gridmix extends Configured
ToolRunner.printGenericCommandUsage(out);
out.println("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>");
out.println(" e.g. gridmix -generate 100m foo -");
+ out.println("Options:");
+ out.println(" -generate <MiB> : Generate input data of size MiB under "
+ + "<iopath>/input/ and generate\n\t\t distributed cache data under "
+ + "<iopath>/distributedCache/.");
+ out.println(" -users <usersResourceURI> : URI that contains the users list.");
out.println("Configuration parameters:");
out.println(" General parameters:");
out.printf(" %-48s : Output directory\n", GRIDMIX_OUT_DIR);
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Mon May 23 14:22:36 2011
@@ -18,31 +18,42 @@
package org.apache.hadoop.mapred.gridmix;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
public enum JobCreator {
LOADJOB {
@Override
public GridmixJob createGridmixJob(
- Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
- UserGroupInformation ugi, int seq) throws IOException {
+ Configuration gridmixConf, long submissionMillis, JobStory jobdesc,
+ Path outRoot, UserGroupInformation ugi, int seq) throws IOException {
+
+ // Build configuration for this simulated job
+ Configuration conf = new Configuration(gridmixConf);
+ dce.configureDistCacheFiles(conf, jobdesc.getJobConf());
return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
- }},
+ }
+
+ @Override
+ public boolean canEmulateDistCacheLoad() {
+ return true;
+ }
+ },
SLEEPJOB {
private String[] hosts;
@@ -72,12 +83,30 @@ public enum JobCreator {
}
return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
numLocations, hosts);
- }};
+ }
+
+ @Override
+ public boolean canEmulateDistCacheLoad() {
+ return false;
+ }
+ };
public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
public static final String SLEEPJOB_RANDOM_LOCATIONS =
"gridmix.sleep.fake-locations";
+ /**
+ * Create Gridmix simulated job.
+ * @param conf configuration of simulated job
+ * @param submissionMillis At what time submission of this simulated job be
+ * done
+ * @param jobdesc JobStory obtained from trace
+ * @param outRoot gridmix output directory
+ * @param ugi UGI of job submitter of this simulated job
+ * @param seq job sequence number
+ * @return the created simulated job
+ * @throws IOException
+ */
public abstract GridmixJob createGridmixJob(
final Configuration conf, long submissionMillis, final JobStory jobdesc,
Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
@@ -86,4 +115,21 @@ public enum JobCreator {
Configuration conf, JobCreator defaultPolicy) {
return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
}
+
+ /**
+ * @return true if gridmix simulated jobs of this job type can emulate
+ * distributed cache load
+ */
+ abstract boolean canEmulateDistCacheLoad();
+
+ DistributedCacheEmulator dce;
+ /**
+ * This method is to be called before calling any other method in JobCreator
+ * except canEmulateDistCacheLoad(), especially if canEmulateDistCacheLoad()
+ * returns true for that job type.
+ * @param e Distributed Cache Emulator
+ */
+ void setDistCacheEmulator(DistributedCacheEmulator e) {
+ this.dce = e;
+ }
}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java?rev=1126499&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java Mon May 23 14:22:36 2011
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Pseudo local file system that generates random data for any file on the fly
+ * instead of storing files on disk. So opening same file multiple times will
+ * not give same file content. There are no directories in this file system
+ * other than the root and all the files are under root i.e. "/". All file URIs
+ * on pseudo local file system should be of the format <code>
+ * pseudo:///<name>.<fileSize></code> where name is a unique name
+ * and <fileSize> is a number representing the size of the file in bytes.
+ */
+class PseudoLocalFs extends FileSystem {
+ Path home;
+ /**
+ * The creation time and modification time of all files in
+ * {@link PseudoLocalFs} is same.
+ */
+ private static final long TIME = System.currentTimeMillis();
+ private static final String HOME_DIR = "/";
+ private static final long BLOCK_SIZE = 4 * 1024 * 1024L; // 4 MB
+ private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1MB
+
+ static final URI NAME = URI.create("pseudo:///");
+
+ PseudoLocalFs() {
+ this(new Path(HOME_DIR));
+ }
+
+ PseudoLocalFs(Path home) {
+ super();
+ this.home = home;
+ }
+
+ @Override
+ public URI getUri() {
+ return NAME;
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return home;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return getHomeDirectory();
+ }
+
+ /**
+ * Generates a valid pseudo local file path from the given <code>fileId</code>
+ * and <code>fileSize</code>.
+ * @param fileId unique file id string
+ * @param fileSize file size
+ * @return the generated relative path
+ */
+ static Path generateFilePath(String fileId, long fileSize) {
+ return new Path(fileId + "." + fileSize);
+ }
+
+ /**
+ * Creating a pseudo local file is nothing but validating the file path.
+ * Actual data of the file is generated on the fly when client tries to open
+ * the file for reading.
+ * @param path file path to be created
+ */
+ @Override
+ public FSDataOutputStream create(Path path) throws IOException {
+ try {
+ validateFileNameFormat(path);
+ } catch (FileNotFoundException e) {
+ throw new IOException("File creation failed for " + path);
+ }
+ return null;
+ }
+
+ /**
+ * Validate if the path provided is of expected format of Pseudo Local File
+ * System based files.
+ * @param path file path
+ * @return the file size
+ * @throws FileNotFoundException
+ */
+ long validateFileNameFormat(Path path) throws FileNotFoundException {
+ path = path.makeQualified(this);
+ boolean valid = true;
+ long fileSize = 0;
+ if (!path.toUri().getScheme().equals(getUri().getScheme())) {
+ valid = false;
+ } else {
+ String[] parts = path.toUri().getPath().split("\\.");
+ try {
+ fileSize = Long.valueOf(parts[parts.length - 1]);
+ valid = (fileSize >= 0);
+ } catch (NumberFormatException e) {
+ valid = false;
+ }
+ }
+ if (!valid) {
+ throw new FileNotFoundException("File " + path
+ + " does not exist in pseudo local file system");
+ }
+ return fileSize;
+ }
+
+ /**
+ * @See create(Path) for details
+ */
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ long fileSize = validateFileNameFormat(path);
+ InputStream in = new RandomInputStream(fileSize, bufferSize);
+ return new FSDataInputStream(in);
+ }
+
+ /**
+ * @See create(Path) for details
+ */
+ @Override
+ public FSDataInputStream open(Path path) throws IOException {
+ return open(path, DEFAULT_BUFFER_SIZE);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ long fileSize = validateFileNameFormat(path);
+ return new FileStatus(fileSize, false, 1, BLOCK_SIZE, TIME, path);
+ }
+
+ @Override
+ public boolean exists(Path path) {
+ try{
+ validateFileNameFormat(path);
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return create(path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws FileNotFoundException,
+ IOException {
+ return new FileStatus[] {getFileStatus(path)};
+ }
+
+ /**
+ * Input Stream that generates specified number of random bytes.
+ */
+ static class RandomInputStream extends InputStream
+ implements Seekable, PositionedReadable {
+
+ private final Random r = new Random();
+ private BytesWritable val = null;
+ private int positionInVal = 0;// current position in the buffer 'val'
+
+ private long totalSize = 0;// total number of random bytes to be generated
+ private long curPos = 0;// current position in this stream
+
+ /**
+ * @param size total number of random bytes to be generated in this stream
+ * @param bufferSize the buffer size. An internal buffer array of length
+ * <code>bufferSize</code> is created. If <code>bufferSize</code> is not a
+ * positive number, then a default value of 1MB is used.
+ */
+ RandomInputStream(long size, int bufferSize) {
+ totalSize = size;
+ if (bufferSize <= 0) {
+ bufferSize = DEFAULT_BUFFER_SIZE;
+ }
+ val = new BytesWritable(new byte[bufferSize]);
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ if (curPos < totalSize) {
+ if (positionInVal < val.getLength()) {// use buffered byte
+ b[0] = val.getBytes()[positionInVal++];
+ ++curPos;
+ } else {// generate data
+ int num = read(b);
+ if (num < 0) {
+ return num;
+ }
+ }
+ } else {
+ return -1;
+ }
+ return b[0];
+ }
+
+ @Override
+ public int read(byte[] bytes) throws IOException {
+ return read(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public int read(byte[] bytes, int off, int len) throws IOException {
+ if (curPos == totalSize) {
+ return -1;// EOF
+ }
+ int numBytes = len;
+ if (numBytes > (totalSize - curPos)) {// position in file is close to EOF
+ numBytes = (int)(totalSize - curPos);
+ }
+ if (numBytes > (val.getLength() - positionInVal)) {
+ // need to generate data into val
+ r.nextBytes(val.getBytes());
+ positionInVal = 0;
+ }
+
+ System.arraycopy(val.getBytes(), positionInVal, bytes, off, numBytes);
+ curPos += numBytes;
+ positionInVal += numBytes;
+ return numBytes;
+ }
+
+ @Override
+ public int available() {
+ return (int)(val.getLength() - positionInVal);
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Get the current position in this stream/pseudo-file
+ * @return the position in this stream/pseudo-file
+ * @throws IOException
+ */
+ @Override
+ public long getPos() throws IOException {
+ return curPos;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int bufferSize,
+ Progressable progress) throws IOException {
+ throw new UnsupportedOperationException("Append is not supported"
+ + " in pseudo local file system.");
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ throw new UnsupportedOperationException("Mkdirs is not supported"
+ + " in pseudo local file system.");
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new UnsupportedOperationException("Rename is not supported"
+ + " in pseudo local file system.");
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) {
+ throw new UnsupportedOperationException("File deletion is not supported "
+ + "in pseudo local file system.");
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ throw new UnsupportedOperationException("SetWorkingDirectory "
+ + "is not supported in pseudo local file system.");
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1126499&r1=1126498&r2=1126499&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Mon May 23 14:22:36 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.tools.rumen.Pre
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -209,9 +210,14 @@ public class DebugJobProducer implements
@Override
public String getUser() {
- String s = String.format("foobar%d", id);
- GridmixTestUtils.createHomeAndStagingDirectory(s, (JobConf)conf);
- return s;
+ // Obtain user name from job configuration, if available.
+ // Otherwise use dummy user names.
+ String user = conf.get(MRJobConfig.USER_NAME);
+ if (user == null) {
+ user = String.format("foobar%d", id);
+ }
+ GridmixTestUtils.createHomeAndStagingDirectory(user, (JobConf)conf);
+ return user;
}
@Override
@@ -289,7 +295,7 @@ public class DebugJobProducer implements
@Override
public org.apache.hadoop.mapred.JobConf getJobConf() {
- throw new UnsupportedOperationException();
+ return new JobConf(conf);
}
@Override