You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:08:07 UTC

svn commit: r1077370 [2/2] - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src: java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar  4 04:08:06 2011
@@ -17,262 +17,90 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.tools.rumen.JobStoryProducer;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
-import org.apache.hadoop.tools.rumen.TaskInfo;
-import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Component generating random job traces for testing on a single node.
  */
-class DebugJobFactory extends JobFactory {
-
-  public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs,
-      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
-      throws IOException {
-    super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
-        startFlag, userResolver);
-  }
-
-  ArrayList<JobStory> getSubmitted() {
-    return ((DebugJobProducer)jobProducer).submitted;
-  }
+class DebugJobFactory {
 
-  private static class DebugJobProducer implements JobStoryProducer {
-    final ArrayList<JobStory> submitted;
-    private final Configuration conf;
-    private final AtomicInteger numJobs;
-
-    public DebugJobProducer(int numJobs, Configuration conf) {
-      super();
-      this.conf = conf;
-      this.numJobs = new AtomicInteger(numJobs);
-      this.submitted = new ArrayList<JobStory>();
-    }
-
-    @Override
-    public JobStory getNextJob() throws IOException {
-      if (numJobs.getAndDecrement() > 0) {
-        final MockJob ret = new MockJob(conf);
-        submitted.add(ret);
-        return ret;
-      }
-      return null;
-    }
-
-    @Override
-    public void close() { }
+  interface Debuggable {
+    ArrayList<JobStory> getSubmitted();
   }
 
-  static double[] getDistr(Random r, double mindist, int size) {
-    assert 0.0 <= mindist && mindist <= 1.0;
-    final double min = mindist / size;
-    final double rem = 1.0 - min * size;
-    final double[] tmp = new double[size];
-    for (int i = 0; i < tmp.length - 1; ++i) {
-      tmp[i] = r.nextDouble() * rem;
-    }
-    tmp[tmp.length - 1] = rem;
-    Arrays.sort(tmp);
+  public static JobFactory getFactory(
+    JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+    CountDownLatch startFlag,UserResolver resolver) throws IOException {
+    GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
+      conf, GridmixJobSubmissionPolicy.STRESS);
+    if (policy.name().equalsIgnoreCase("REPLAY")) {
+      return new DebugReplayJobFactory(
+        submitter, scratch, numJobs, conf, startFlag,resolver);
+    } else if (policy.name().equalsIgnoreCase("STRESS")) {
+      return new DebugStressJobFactory(
+        submitter, scratch, numJobs, conf, startFlag,resolver);
+    } else if (policy.name().equalsIgnoreCase("SERIAL")) {
+      return new DebugSerialJobFactory(
+        submitter, scratch, numJobs, conf, startFlag,resolver);
 
-    final double[] ret = new double[size];
-    ret[0] = tmp[0] + min;
-    for (int i = 1; i < size; ++i) {
-      ret[i] = tmp[i] - tmp[i-1] + min;
     }
-    return ret;
+    return null;
   }
 
-  /**
-   * Generate random task data for a synthetic job.
-   */
-  static class MockJob implements JobStory {
-
-    static final int MIN_REC = 1 << 14;
-    static final int MIN_BYTES = 1 << 20;
-    static final int VAR_REC = 1 << 14;
-    static final int VAR_BYTES = 4 << 20;
-    static final int MAX_MAP = 5;
-    static final int MAX_RED = 3;
-
-    static void initDist(Random r, double min, int[] recs, long[] bytes,
-        long tot_recs, long tot_bytes) {
-      final double[] recs_dist = getDistr(r, min, recs.length);
-      final double[] bytes_dist = getDistr(r, min, recs.length);
-      long totalbytes = 0L;
-      int totalrecs = 0;
-      for (int i = 0; i < recs.length; ++i) {
-        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
-        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
-        totalrecs += recs[i];
-        totalbytes += bytes[i];
-      }
-      // Add/remove excess
-      recs[0] += totalrecs - tot_recs;
-      bytes[0] += totalbytes - tot_bytes;
-      if (LOG.isInfoEnabled()) {
-        LOG.info("DIST: " + Arrays.toString(recs) + " " +
-            tot_recs + "/" + totalrecs + " " +
-            Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
-      }
-    }
-
-    private static final AtomicInteger seq = new AtomicInteger(0);
-    // set timestamps in the past
-    private static final AtomicLong timestamp =
-      new AtomicLong(System.currentTimeMillis() -
-        TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
-
-    private final int id;
-    private final String name;
-    private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
-    private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
-    private final long submitTime;
-
-    public MockJob(Configuration conf) {
-      final Random r = new Random();
-      final long seed = r.nextLong();
-      r.setSeed(seed);
-      id = seq.getAndIncrement();
-      name = String.format("MOCKJOB%05d", id);
-      LOG.info(name + " (" + seed + ")");
-      submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
-            r.nextInt(10), TimeUnit.SECONDS));
-
-      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
-      m_bytesIn = new long[m_recsIn.length];
-      m_recsOut = new int[m_recsIn.length];
-      m_bytesOut = new long[m_recsIn.length];
-
-      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
-      r_bytesIn = new long[r_recsIn.length];
-      r_recsOut = new int[r_recsIn.length];
-      r_bytesOut = new long[r_recsIn.length];
-
-      // map input
-      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
-      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
-      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
-
-      // shuffle
-      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
-      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
-      initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
-      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
-
-      // reduce output
-      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
-      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
-      initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
-
-      if (LOG.isDebugEnabled()) {
-        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
-        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
-        for (int i = 0; i < m_recsIn.length; ++i) {
-          iMapRTotal += m_recsIn[i];
-          iMapBTotal += m_bytesIn[i];
-          oMapRTotal += m_recsOut[i];
-          oMapBTotal += m_bytesOut[i];
-        }
-        for (int i = 0; i < r_recsIn.length; ++i) {
-          iRedRTotal += r_recsIn[i];
-          iRedBTotal += r_bytesIn[i];
-          oRedRTotal += r_recsOut[i];
-          oRedBTotal += r_bytesOut[i];
-        }
-        LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
-                                   " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
-            m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
-            r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
-            submitTime));
-      }
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public String getUser() {
-      return String.format("foobar%d", id);
-    }
-
-    @Override
-    public JobID getJobID() {
-      return new JobID("job_mock_" + name, id);
-    }
-
-    @Override
-    public Values getOutcome() {
-      return Values.SUCCESS;
-    }
-
-    @Override
-    public long getSubmissionTime() {
-      return submitTime;
+  static class DebugReplayJobFactory extends ReplayJobFactory
+    implements Debuggable {
+    public DebugReplayJobFactory(
+      JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+      CountDownLatch startFlag,UserResolver resolver) throws IOException {
+      super(
+        submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag,resolver);
     }
 
     @Override
-    public int getNumberMaps() {
-      return m_bytesIn.length;
+    public ArrayList<JobStory> getSubmitted() {
+      return ((DebugJobProducer) jobProducer).submitted;
     }
 
-    @Override
-    public int getNumberReduces() {
-      return r_bytesIn.length;
-    }
+  }
 
-    @Override
-    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
-      switch (taskType) {
-        case MAP:
-          return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
-              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
-        case REDUCE:
-          return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
-              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
-        default:
-          throw new IllegalArgumentException("Not interested");
-      }
+  static class DebugSerialJobFactory extends SerialJobFactory
+    implements Debuggable {
+    public DebugSerialJobFactory(
+      JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+      CountDownLatch startFlag,UserResolver resolver) throws IOException {
+      super(
+        submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag,resolver);
     }
 
     @Override
-    public InputSplit[] getInputSplits() {
-      throw new UnsupportedOperationException();
+    public ArrayList<JobStory> getSubmitted() {
+      return ((DebugJobProducer) jobProducer).submitted;
     }
 
-    @Override
-    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
-        int taskNumber, int taskAttemptNumber) {
-      throw new UnsupportedOperationException();
-    }
+  }
 
-    @Override
-    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
-        int taskAttemptNumber, int locality) {
-      throw new UnsupportedOperationException();
+  static class DebugStressJobFactory extends StressJobFactory
+    implements Debuggable {
+    public DebugStressJobFactory(
+      JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+      CountDownLatch startFlag,UserResolver resolver) throws IOException {
+      super(
+        submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag,resolver);
     }
 
     @Override
-    public org.apache.hadoop.mapred.JobConf getJobConf() {
-      throw new UnsupportedOperationException();
+    public ArrayList<JobStory> getSubmitted() {
+      return ((DebugJobProducer) jobProducer).submitted;
     }
   }
+
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class DebugJobProducer implements JobStoryProducer {
+  public static final Log LOG = LogFactory.getLog(DebugJobProducer.class);
+  final ArrayList<JobStory> submitted;
+  private final Configuration conf;
+  private final AtomicInteger numJobs;
+
+  public DebugJobProducer(int numJobs, Configuration conf) {
+    super();
+    MockJob.reset();
+    this.conf = conf;
+    this.numJobs = new AtomicInteger(numJobs);
+    this.submitted = new ArrayList<JobStory>();
+  }
+
+  @Override
+  public JobStory getNextJob() throws IOException {
+    if (numJobs.getAndDecrement() > 0) {
+      final MockJob ret = new MockJob(conf);
+      submitted.add(ret);
+      return ret;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+  }
+
+
+  static double[] getDistr(Random r, double mindist, int size) {
+    assert 0.0 <= mindist && mindist <= 1.0;
+    final double min = mindist / size;
+    final double rem = 1.0 - min * size;
+    final double[] tmp = new double[size];
+    for (int i = 0; i < tmp.length - 1; ++i) {
+      tmp[i] = r.nextDouble() * rem;
+    }
+    tmp[tmp.length - 1] = rem;
+    Arrays.sort(tmp);
+
+    final double[] ret = new double[size];
+    ret[0] = tmp[0] + min;
+    for (int i = 1; i < size; ++i) {
+      ret[i] = tmp[i] - tmp[i - 1] + min;
+    }
+    return ret;
+  }
+
+
+  /**
+   * Generate random task data for a synthetic job.
+   */
+  static class MockJob implements JobStory {
+
+    static final int MIN_REC = 1 << 14;
+    static final int MIN_BYTES = 1 << 20;
+    static final int VAR_REC = 1 << 14;
+    static final int VAR_BYTES = 4 << 20;
+    static final int MAX_MAP = 5;
+    static final int MAX_RED = 3;
+    final Configuration conf;
+
+    static void initDist(
+      Random r, double min, int[] recs, long[] bytes, long tot_recs,
+      long tot_bytes) {
+      final double[] recs_dist = getDistr(r, min, recs.length);
+      final double[] bytes_dist = getDistr(r, min, recs.length);
+      long totalbytes = 0L;
+      int totalrecs = 0;
+      for (int i = 0; i < recs.length; ++i) {
+        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+        totalrecs += recs[i];
+        totalbytes += bytes[i];
+      }
+      // Add/remove excess
+      recs[0] += totalrecs - tot_recs;
+      bytes[0] += totalbytes - tot_bytes;
+      if (LOG.isInfoEnabled()) {
+        LOG.info(
+          "DIST: " + Arrays.toString(recs) + " " + tot_recs + "/" + totalrecs +
+            " " + Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+      }
+    }
+
+    private static final AtomicInteger seq = new AtomicInteger(0);
+    // set timestamp in the past
+    private static final AtomicLong timestamp = new AtomicLong(
+      System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(
+        60, TimeUnit.DAYS));
+
+    private final int id;
+    private final String name;
+    private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
+    private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
+    private final long submitTime;
+
+    public MockJob(Configuration conf) {
+      final Random r = new Random();
+      final long seed = r.nextLong();
+      r.setSeed(seed);
+      id = seq.getAndIncrement();
+      name = String.format("MOCKJOB%05d", id);
+      this.conf = conf;
+      LOG.info(name + " (" + seed + ")");
+      submitTime = timestamp.addAndGet(
+        TimeUnit.MILLISECONDS.convert(
+          r.nextInt(10), TimeUnit.SECONDS));
+
+      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+      m_bytesIn = new long[m_recsIn.length];
+      m_recsOut = new int[m_recsIn.length];
+      m_bytesOut = new long[m_recsIn.length];
+
+      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+      r_bytesIn = new long[r_recsIn.length];
+      r_recsOut = new int[r_recsIn.length];
+      r_bytesOut = new long[r_recsIn.length];
+
+      // map input
+      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+      // shuffle
+      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+      // reduce output
+      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, r_recsOut, r_bytesOut, red_recs, red_bytes);
+
+      if (LOG.isDebugEnabled()) {
+        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+        for (int i = 0; i < m_recsIn.length; ++i) {
+          iMapRTotal += m_recsIn[i];
+          iMapBTotal += m_bytesIn[i];
+          oMapRTotal += m_recsOut[i];
+          oMapBTotal += m_bytesOut[i];
+        }
+        for (int i = 0; i < r_recsIn.length; ++i) {
+          iRedRTotal += r_recsIn[i];
+          iRedBTotal += r_bytesIn[i];
+          oRedRTotal += r_recsOut[i];
+          oRedBTotal += r_bytesOut[i];
+        }
+        LOG.debug(
+          String.format(
+            "%s: M (%03d) %6d/%10d -> %6d/%10d" +
+              " R (%03d) %6d/%10d -> %6d/%10d @%d", name, m_bytesIn.length,
+            iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal, r_bytesIn.length,
+            iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal, submitTime));
+      }
+    }
+    @Override
+   public String getName() {
+     return name;
+    }
+
+   @Override
+   public String getUser() {
+     String s = String.format("foobar%d", id);
+     GridmixTestUtils.createHomeAndStagingDirectory(s,(JobConf)conf);
+     return s;
+   }
+
+   @Override
+   public JobID getJobID() {
+     return new JobID("job_mock_" + name, id);
+    }
+
+    @Override
+   public Values getOutcome() {
+     return Values.SUCCESS;
+    }
+
+   @Override
+   public long getSubmissionTime() {
+     return submitTime;
+   }
+
+   @Override
+   public int getNumberMaps() {
+     return m_bytesIn.length;
+   }
+
+   @Override
+   public int getNumberReduces() {
+     return r_bytesIn.length;
+   }
+    
+    @Override
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      switch (taskType) {
+        case MAP:
+          return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
+              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
+        case REDUCE:
+          return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
+              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
+        default:
+          throw new IllegalArgumentException("Not interested");
+      }
+    }
+
+    @Override
+    public InputSplit[] getInputSplits() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getTaskAttemptInfo(
+      TaskType taskType, int taskNumber, int taskAttemptNumber) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+      int taskNumber, int taskAttemptNumber, int locality) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.JobConf getJobConf() {
+      throw new UnsupportedOperationException();
+    }
+
+    public static void reset() {
+      seq.set(0);
+      timestamp.set(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(
+        60, TimeUnit.DAYS));
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,90 @@
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.Groups;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class GridmixTestUtils {
+  static final Path DEST = new Path("/gridmix");
+  static FileSystem dfs = null;
+  static MiniDFSCluster dfsCluster = null;
+  static MiniMRCluster mrCluster = null;
+
+  public static void initCluster() throws IOException {
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+    dfs = dfsCluster.getFileSystem();
+    mrCluster = new MiniMRCluster(
+      3, dfs.getUri().toString(), 1, null, null, new JobConf(conf));
+  }
+
+  public static void shutdownCluster() throws IOException {
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  /**
+   * Methods to generate the home directory for dummy users.
+   *
+   * @param conf
+   */
+  public static void createHomeAndStagingDirectory(String user, JobConf conf) {
+    try {
+      FileSystem fs = dfsCluster.getFileSystem();
+      String path = "/user/" + user;
+      Path homeDirectory = new Path(path);
+      if(fs.exists(homeDirectory)) {
+        fs.delete(homeDirectory,true);
+      }
+      TestGridmixSubmission.LOG.info(
+        "Creating Home directory : " + homeDirectory);
+      fs.mkdirs(homeDirectory);
+      changePermission(user,homeDirectory, fs);
+      Path stagingArea = new Path(
+        conf.get(
+          "mapreduce.jobtracker.staging.root.dir",
+          "/tmp/hadoop/mapred/staging"));
+      TestGridmixSubmission.LOG.info(
+        "Creating Staging root directory : " + stagingArea);
+      fs.mkdirs(stagingArea);
+      fs.setPermission(stagingArea, new FsPermission((short) 0777));
+    } catch (IOException ioe) {
+      ioe.printStackTrace();
+    }
+  }
+
+  static void changePermission(String user, Path homeDirectory, FileSystem fs)
+    throws IOException {
+    fs.setOwner(homeDirectory, user, "");
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar  4 04:08:06 2011
@@ -28,15 +28,14 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapreduce.Job;
@@ -46,16 +45,22 @@ import org.apache.hadoop.tools.rumen.Tas
 import org.apache.hadoop.util.ToolRunner;
 import static org.apache.hadoop.mapred.Task.Counter.*;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
 import static org.junit.Assert.*;
 
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.log4j.Level;
+import org.apache.hadoop.security.UserGroupInformation;
+import java.security.PrivilegedExceptionAction;
 
 public class TestGridmixSubmission {
+  static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
   {
     ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
         ).getLogger().setLevel(Level.DEBUG);
@@ -72,22 +77,13 @@ public class TestGridmixSubmission {
   private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
 
   @BeforeClass
-  public static void initCluster() throws IOException {
-    Configuration conf = new Configuration();
-    dfsCluster = new MiniDFSCluster(conf, 3, true, null);
-    dfs = dfsCluster.getFileSystem();
-    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
-        new JobConf(conf));
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster();
   }
 
   @AfterClass
-  public static void shutdownCluster() throws IOException {
-    if (mrCluster != null) {
-      mrCluster.shutdown();
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-    }
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
   }
 
   static class TestMonitor extends JobMonitor {
@@ -96,8 +92,8 @@ public class TestGridmixSubmission {
     private final int expected;
     private final BlockingQueue<Job> retiredJobs;
 
-    public TestMonitor(int expected) {
-      super();
+    public TestMonitor(int expected, Statistics stats) {
+      super(stats);
       this.expected = expected;
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
@@ -109,17 +105,17 @@ public class TestGridmixSubmission {
       for (JobStory spec : submitted) {
         sub.put(spec.getName(), spec);
       }
-      final JobClient client = new JobClient(mrCluster.createJobConf());
+      final JobClient client = new JobClient(GridmixTestUtils.mrCluster.createJobConf());
       for (Job job : succeeded) {
         final String jobname = job.getJobName();
         if ("GRIDMIX_GENDATA".equals(jobname)) {
-          final Path in = new Path("foo").makeQualified(dfs);
-          final Path out = new Path("/gridmix").makeQualified(dfs);
-          final ContentSummary generated = dfs.getContentSummary(in);
+          final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+          final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
+          final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
           assertTrue("Mismatched data gen", // +/- 100k for logs
               (GENDATA << 20) < generated.getLength() + GENSLOP ||
               (GENDATA << 20) > generated.getLength() - GENSLOP);
-          FileStatus[] outstat = dfs.listStatus(out);
+          FileStatus[] outstat = GridmixTestUtils.dfs.listStatus(out);
           assertEquals("Mismatched job count", NJOBS, outstat.length);
           continue;
         }
@@ -128,7 +124,8 @@ public class TestGridmixSubmission {
         assertNotNull("No spec for " + job.getJobName(), spec);
         assertNotNull("No counters for " + job.getJobName(), job.getCounters());
         final String specname = spec.getName();
-        final FileStatus stat = dfs.getFileStatus(new Path(DEST, "" +
+        final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path(
+          GridmixTestUtils.DEST, "" +
               Integer.valueOf(specname.substring(specname.length() - 5))));
         assertEquals("Wrong owner for " + job.getJobName(), spec.getUser(),
             stat.getOwner());
@@ -287,35 +284,62 @@ public class TestGridmixSubmission {
 
   static class DebugGridmix extends Gridmix {
 
-    private DebugJobFactory factory;
+    private JobFactory factory;
     private TestMonitor monitor;
 
     public void checkMonitor() throws Exception {
-      monitor.verify(factory.getSubmitted());
+       monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted());
     }
 
     @Override
-    protected JobMonitor createJobMonitor() {
-      monitor = new TestMonitor(NJOBS + 1); // include data generation job
+    protected JobMonitor createJobMonitor(Statistics stats) {
+      Configuration conf = new Configuration();
+      conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
+      monitor = new TestMonitor(NJOBS + 1, stats);
       return monitor;
     }
-
+    
     @Override
-    protected JobFactory createJobFactory(JobSubmitter submitter,
-        String traceIn, Path scratchDir, Configuration conf,
-        CountDownLatch startFlag, UserResolver userResolver)
+    protected JobFactory createJobFactory(
+      JobSubmitter submitter, String traceIn, Path scratchDir, Configuration conf,
+      CountDownLatch startFlag, UserResolver userResolver)
         throws IOException {
-      factory =
-        new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag,
-            userResolver);
+      factory = DebugJobFactory.getFactory(
+        submitter, scratchDir, NJOBS, conf, startFlag, userResolver);
       return factory;
     }
   }
 
   @Test
-  public void testSubmit() throws Exception {
-    final Path in = new Path("foo").makeQualified(dfs);
-    final Path out = DEST.makeQualified(dfs);
+  public void testReplaySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.REPLAY;
+    System.out.println(" Replay started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println(" Replay ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testStressSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    System.out.println(" Stress started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println(" Stress ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testSerialSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.SERIAL;
+    System.out.println("Serial started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println("Serial ended at " + System.currentTimeMillis());
+  }
+
+  private void doSubmission() throws Exception {
+    final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+    final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
+    final Path root = new Path("/user");
+    Configuration conf = null;
+    try{
     final String[] argv = {
       "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
       "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
@@ -325,14 +349,24 @@ public class TestGridmixSubmission {
       "-" // ignored by DebugGridmix
     };
     DebugGridmix client = new DebugGridmix();
-    final Configuration conf = mrCluster.createJobConf();
+    conf = new Configuration();
+      conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name());
+    conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+//    GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
     // allow synthetic users to create home directories
-    final Path root = new Path("/user");
-    dfs.mkdirs(root, new FsPermission((short)0777));
-    dfs.setPermission(root, new FsPermission((short)0777));
+    GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
+    GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
     int res = ToolRunner.run(conf, client, argv);
     assertEquals("Client exited with nonzero status", 0, res);
     client.checkMonitor();
-  }
+     } catch (Exception e) {
+       e.printStackTrace();
+     } finally {
+       in.getFileSystem(conf).delete(in, true);
+       out.getFileSystem(conf).delete(out, true);
+       root.getFileSystem(conf).delete(root,true);
+     }
+   }
+
 
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java Fri Mar  4 04:08:06 2011
@@ -72,17 +72,13 @@ public class TestUserResolve {
     assertTrue("User list required for RoundRobinUserResolver", fail);
 
     rslv.setTargetUsers(new URI(userlist.toString()), conf);
-    assertEquals("user0", rslv.getTargetUgi("hfre0").getUserName());
-    assertEquals("user1", rslv.getTargetUgi("hfre1").getUserName());
-    assertEquals("user2", rslv.getTargetUgi("hfre2").getUserName());
-    assertEquals("user0", rslv.getTargetUgi("hfre0").getUserName());
-    assertEquals("user3", rslv.getTargetUgi("hfre3").getUserName());
-    assertEquals("user0", rslv.getTargetUgi("hfre0").getUserName());
-    assertEquals("user0", rslv.getTargetUgi("hfre4").getUserName());
-    assertArrayEquals(new String[] { "groupA", "groupB", "groupC" },
-        rslv.getTargetUgi("hfre0").getGroupNames());
-    assertArrayEquals(new String[] { "groupB" },
-        rslv.getTargetUgi("hfre2").getGroupNames());
+    assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre0")).getUserName());
+    assertEquals("user1", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre1")).getUserName());
+    assertEquals("user2", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre2")).getUserName());
+    assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre0")).getUserName());
+    assertEquals("user3", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre3")).getUserName());
+    assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre0")).getUserName());
+    assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre4")).getUserName());
   }
 
   @Test
@@ -90,8 +86,12 @@ public class TestUserResolve {
     final Configuration conf = new Configuration();
     final UserResolver rslv = new SubmitterUserResolver();
     rslv.setTargetUsers(null, conf);
-    assertEquals(UnixUserGroupInformation.login(),
-        rslv.getTargetUgi((UserGroupInformation)null));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    assertEquals(ugi, rslv.getTargetUgi((UserGroupInformation)null));
+    System.out.println(" Submitter current user " + ugi);
+    System.out.println(
+      " Target ugi " + rslv.getTargetUgi(
+        (UserGroupInformation) null));
   }
 
 }